From 7d4cfc40b7349022a3af85d0cad5cb75fddea006 Mon Sep 17 00:00:00 2001 From: Tom King Date: Sat, 30 Jan 2021 15:34:29 +0000 Subject: [PATCH] chg: Add in nosetests for MISP Galaxy functions, check default key as a dict attribute not MISPAbstract attribute --- pymisp/api.py | 9 +-- pymisp/mispevent.py | 7 ++- tests/testlive_comprehensive.py | 105 +++++++++++++++++++++++++++++++- 3 files changed, 115 insertions(+), 6 deletions(-) diff --git a/pymisp/api.py b/pymisp/api.py index 6f39a24..5475705 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -1359,7 +1359,7 @@ class PyMISP: :param pythonify: Returns a PyMISP Object instead of the plain json output """ - if galaxy_cluster.default: + if getattr(galaxy_cluster, "default", False): # We can't add default galaxies raise PyMISPError('You are not able add a default galaxy cluster') galaxy_id = get_uuid_or_id_from_abstract_misp(galaxy) @@ -1378,10 +1378,11 @@ class PyMISP: :param pythonify: Returns a PyMISP Object instead of the plain json output """ - if galaxy_cluster.default: + if getattr(galaxy_cluster, "default", False): # We can't edit default galaxies raise PyMISPError('You are not able to update a default galaxy cluster') cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster) + print(cluster_id) r = self._prepare_request('POST', f'galaxy_clusters/edit/{cluster_id}', data=galaxy_cluster) cluster_j = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in cluster_j: @@ -1395,7 +1396,7 @@ class PyMISP: :param galaxy_cluster: The galaxy cluster you wish to publish """ - if isinstance(galaxy_cluster, MISPGalaxyCluster) and galaxy_cluster.default: + if isinstance(galaxy_cluster, MISPGalaxyCluster) and getattr(galaxy_cluster, "default", False): raise PyMISPError('You are not able to publish a default galaxy cluster') cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster) r = self._prepare_request('POST', f'galaxy_clusters/publish/{cluster_id}') @@ -1433,7 +1434,7 @@ class PyMISP: :param hard: flag for hard delete """ - if isinstance(galaxy_cluster, MISPGalaxyCluster) and galaxy_cluster.default: + if isinstance(galaxy_cluster, MISPGalaxyCluster) and getattr(galaxy_cluster, "default", False): raise PyMISPError('You are not able to delete a default galaxy cluster') data = {} if hard: diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index a3a2437..3665f57 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -9,6 +9,7 @@ import sys from io import BytesIO, BufferedIOBase, TextIOBase from zipfile import ZipFile import uuid +from uuid import UUID from collections import defaultdict import logging import hashlib @@ -1294,7 +1295,7 @@ class MISPGalaxyCluster(AbstractMISP): self.cluster_elements.append(cluster_element) return cluster_element - def add_cluster_relation(self, referenced_galaxy_cluster_uuid: uuid, referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: str = None, **kwargs) -> MISPGalaxyClusterRelation: + def add_cluster_relation(self, referenced_galaxy_cluster_uuid: Union["MISPGalaxyCluster", str, UUID], referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: str = None, **kwargs) -> MISPGalaxyClusterRelation: """Add a cluster relation to a MISPGalaxyCluster. :param referenced_galaxy_cluster_uuid: UUID of the related cluster @@ -1308,6 +1309,10 @@ class MISPGalaxyCluster(AbstractMISP): if not getattr(self, "uuid", None): raise PyMISPError("The cluster does not have a UUID, make sure it is a valid galaxy cluster") cluster_relation = MISPGalaxyClusterRelation() + + if isinstance(referenced_galaxy_cluster_uuid, MISPGalaxyCluster): + referenced_galaxy_cluster_uuid = referenced_galaxy_cluster_uuid.uuid + cluster_relation.from_dict( referenced_galaxy_cluster_uuid=referenced_galaxy_cluster_uuid, referenced_galaxy_cluster_type=referenced_galaxy_cluster_type, diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 1478686..92e97b5 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -27,7 +27,7 @@ logger = logging.getLogger('pymisp') try: - from pymisp import register_user, PyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting, MISPEventBlocklist, MISPEventReport + from pymisp import register_user, PyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting, MISPEventBlocklist, MISPEventReport, MISPGalaxyCluster from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator from pymisp.exceptions import MISPServerError except ImportError: @@ -2690,6 +2690,109 @@ class TestComprehensive(unittest.TestCase): self.user_misp_connector.delete_event(event) self.user_misp_connector.delete_event_report(new_event_report) + def test_galaxy_cluster(self): + self.admin_misp_connector.toggle_global_pythonify() + galaxy = self.admin_misp_connector.galaxies()[0] + new_galaxy_cluster = MISPGalaxyCluster() + new_galaxy_cluster.value = "Test Cluster" + new_galaxy_cluster.authors = ["MISP"] + new_galaxy_cluster.distribution = 1 + new_galaxy_cluster.description = "Example test cluster" + try: + galaxy = self.admin_misp_connector.get_galaxy(galaxy.id, withCluster=True) + existing_galaxy_cluster = galaxy.clusters[0] + + new_galaxy_cluster = self.admin_misp_connector.add_galaxy_cluster(galaxy.id, new_galaxy_cluster) + # The new galaxy cluster should be under the selected galaxy + self.assertEqual(galaxy.id, new_galaxy_cluster.galaxy_id) + # The cluster should have the right value + self.assertEqual(new_galaxy_cluster.value, "Test Cluster") + + new_galaxy_cluster.add_cluster_element("synonyms", "Test2") + new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) + + # The cluster should have one element that is a synonym + self.assertEqual(len(new_galaxy_cluster.cluster_elements), 1) + element = new_galaxy_cluster.cluster_elements[0] + self.assertEqual(element.key, "synonyms") + self.assertEqual(element.value, "Test2") + + # The cluster should have the old meta as a prop + self.assertEqual(new_galaxy_cluster.meta, {'synonyms': ['Test2']}) + + # The cluster element should be updatable + element.value = "Test3" + new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) + element = new_galaxy_cluster.cluster_elements[0] + self.assertEqual(element.value, "Test3") + + new_galaxy_cluster.add_cluster_element("synonyms", "ToDelete") + new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) + # The cluster should have two elements + self.assertEqual(len(new_galaxy_cluster.cluster_elements), 2) + + new_galaxy_cluster.cluster_elements = [e for e in new_galaxy_cluster.cluster_elements if e.value != "ToDelete"] + new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) + # The cluster elements should be deletable + self.assertEqual(len(new_galaxy_cluster.cluster_elements), 1) + + new_galaxy_cluster.add_cluster_relation(existing_galaxy_cluster, "is-tested-by") + new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) + # The cluster should have a relationship + self.assertEqual(len(new_galaxy_cluster.cluster_relations), 1) + relation = new_galaxy_cluster.cluster_relations[0] + self.assertEqual(relation.referenced_galaxy_cluster_type, "is-tested-by") + self.assertEqual(relation.referenced_galaxy_cluster_uuid, existing_galaxy_cluster.uuid) + + relation.add_tag("tlp:amber") + new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) + relation = new_galaxy_cluster.cluster_relations[0] + # The relationship should have a tag of tlp:amber + self.assertEqual(len(relation.tags), 1) + self.assertEqual(relation.tags[0].name, "tlp:amber") + + # The cluster relations should be deletable + resp = self.admin_misp_connector.delete_galaxy_cluster_relation(relation) + self.assertTrue(resp['success']) + # The cluster relation should no longer be present + new_galaxy_cluster = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster) + self.assertEqual(len(new_galaxy_cluster.cluster_relations), 0) + + resp = self.admin_misp_connector.delete_galaxy_cluster(new_galaxy_cluster) + # Galaxy clusters should be soft deletable + self.assertTrue(resp['success']) + new_galaxy_cluster = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster) + self.assertTrue(isinstance(new_galaxy_cluster, MISPGalaxyCluster)) + + resp = self.admin_misp_connector.delete_galaxy_cluster(new_galaxy_cluster, hard=True) + # Galaxy clusters should be hard deletable + self.assertTrue(resp['success']) + resp = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster) + self.assertTrue("errors" in resp) + finally: + self.admin_misp_connector.delete_galaxy_cluster(new_galaxy_cluster) + self.admin_misp_connector.toggle_global_pythonify() + + def test_event_galaxy(self): + self.admin_misp_connector.toggle_global_pythonify() + event = self.create_simple_event() + try: + galaxy = self.admin_misp_connector.galaxies()[0] + galaxy = self.admin_misp_connector.get_galaxy(galaxy.id, withCluster=True) + galaxy_cluster = galaxy.clusters[0] + event.add_tag(galaxy_cluster.tag_name) + event = self.admin_misp_connector.add_event(event) + # The event should have a galaxy attached + self.assertEqual(len(event.galaxies), 1) + event_galaxy = event.galaxies[0] + # The galaxy ID should equal the galaxy from which the cluster came from + self.assertEqual(event_galaxy.id, galaxy.id) + # The galaxy cluster should equal the cluster added + self.assertEqual(event_galaxy.clusters[0].id, galaxy_cluster.id) + finally: + self.admin_misp_connector.delete_event(event) + self.admin_misp_connector.toggle_global_pythonify() + @unittest.skip("Internal use only") def missing_methods(self): skip = [