diff --git a/pymisp/api.py b/pymisp/api.py index d361eb7..033d43f 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -771,6 +771,40 @@ class PyMISP: # ## END Analyst Relationship ### + # ## BEGIN Galaxy Cluster ### + def attach_galaxy_cluster(self, misp_entity: MISPEvent | MISPAttribute, galaxy_cluster: MISPGalaxyCluster | int | str, local: bool = False, pythonify: bool = False) -> dict[str, Any] | list[dict[str, Any]]: + """Attach a galaxy cluster to an event or an attribute + + :param misp_entity: a MISP Event or a MISP Attribute + :param galaxy_cluster: Galaxy cluster to attach + :param local: whether the object should be attached locally or not to the target + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + if isinstance(misp_entity, MISPEvent): + attach_target_type = 'event' + elif isinstance(misp_entity, MISPAttribute): + attach_target_type = 'attribute' + else: + raise PyMISPError('The misp_entity must be MISPEvent or MISPAttribute') + + attach_target_id = misp_entity.id + local = 1 if local else 0 + + if isinstance(galaxy_cluster, MISPGalaxyCluster): + cluster_id = galaxy_cluster.id + elif isinstance(galaxy_cluster, (int, str)): + cluster_id = galaxy_cluster + else: + raise PyMISPError('The galaxy_cluster must be MISPGalaxyCluster or the id associated with the cluster (int or str)') + + to_post = { 'Galaxy': { 'target_id': cluster_id } } + url = f'galaxies/attachCluster/{attach_target_id}/{attach_target_type}/local:{local}' + + r = self._prepare_request('POST', url, data=to_post) + return self._check_json_response(r) + # ## END Galaxy Cluster ### + + # ## BEGIN Object ### def get_object(self, misp_object: MISPObject | int | str | UUID, pythonify: bool = False) -> dict[str, Any] | MISPObject: diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 2c33b29..6f33db4 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -3198,6 +3198,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(event) self.admin_misp_connector.toggle_global_pythonify() + def test_analyst_data_CRUD(self) -> None: event = self.create_simple_event() self.admin_misp_connector.toggle_global_pythonify() @@ -3334,6 +3335,43 @@ class TestComprehensive(unittest.TestCase): except Exception: pass + def test_attach_galaxy_cluster(self) -> None: + event = self.create_simple_event() + event = self.admin_misp_connector.add_event(event, pythonify=True) + try: + galaxies: list[MISPGalaxy] = self.admin_misp_connector.galaxies(pythonify=True) + galaxy: MISPGalaxy = galaxies[0] + if gid := galaxy.id: + galaxy = self.admin_misp_connector.get_galaxy(gid, withCluster=True, pythonify=True) + else: + raise Exception("No galaxy found") + galaxy_cluster: MISPGalaxyCluster = galaxy.clusters[0] + self.admin_misp_connector.attach_galaxy_cluster(event, galaxy_cluster) + event = self.admin_misp_connector.get_event(event.id, pythonify=True) + + self.assertEqual(len(event.galaxies), 1) + event_galaxy = event.galaxies[0] + # The galaxy ID should equal the galaxy from which the cluster came from + self.assertEqual(event_galaxy.id, galaxy.id) + # The galaxy cluster should equal the cluster added + self.assertEqual(event_galaxy.clusters[0].id, galaxy_cluster.id) + + galaxy_cluster: MISPGalaxyCluster = galaxy.clusters[1] + + # Test on attribute + attribute = event.Attribute[0] + event = self.admin_misp_connector.get_event(event.id, pythonify=True) + attribute = event.Attribute[0] + self.assertEqual(len(attribute.galaxies), 1) + attribute_galaxy = attribute.galaxies[0] + # The galaxy ID should equal the galaxy from which the cluster came from + self.assertEqual(attribute_galaxy.id, galaxy.id) + # The galaxy cluster should equal the cluster added + self.assertEqual(attribute_galaxy.clusters[0].id, galaxy_cluster.id) + finally: + self.admin_misp_connector.delete_event(event) + self.admin_misp_connector.toggle_global_pythonify() + @unittest.skip("Internal use only") def missing_methods(self) -> None: skip = [