mirror of https://github.com/MISP/PyMISP
Merge branch 'main' into wip_analystdata
commit
a7b36d1edf
|
@ -771,6 +771,40 @@ class PyMISP:
|
|||
|
||||
# ## END Analyst Relationship ###
|
||||
|
||||
# ## BEGIN Galaxy Cluster ###
|
||||
def attach_galaxy_cluster(self, misp_entity: MISPEvent | MISPAttribute, galaxy_cluster: MISPGalaxyCluster | int | str, local: bool = False, pythonify: bool = False) -> dict[str, Any] | list[dict[str, Any]]:
|
||||
"""Attach a galaxy cluster to an event or an attribute
|
||||
|
||||
:param misp_entity: a MISP Event or a MISP Attribute
|
||||
:param galaxy_cluster: Galaxy cluster to attach
|
||||
:param local: whether the object should be attached locally or not to the target
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
if isinstance(misp_entity, MISPEvent):
|
||||
attach_target_type = 'event'
|
||||
elif isinstance(misp_entity, MISPAttribute):
|
||||
attach_target_type = 'attribute'
|
||||
else:
|
||||
raise PyMISPError('The misp_entity must be MISPEvent or MISPAttribute')
|
||||
|
||||
attach_target_id = misp_entity.id
|
||||
local = 1 if local else 0
|
||||
|
||||
if isinstance(galaxy_cluster, MISPGalaxyCluster):
|
||||
cluster_id = galaxy_cluster.id
|
||||
elif isinstance(galaxy_cluster, (int, str)):
|
||||
cluster_id = galaxy_cluster
|
||||
else:
|
||||
raise PyMISPError('The galaxy_cluster must be MISPGalaxyCluster or the id associated with the cluster (int or str)')
|
||||
|
||||
to_post = { 'Galaxy': { 'target_id': cluster_id } }
|
||||
url = f'galaxies/attachCluster/{attach_target_id}/{attach_target_type}/local:{local}'
|
||||
|
||||
r = self._prepare_request('POST', url, data=to_post)
|
||||
return self._check_json_response(r)
|
||||
# ## END Galaxy Cluster ###
|
||||
|
||||
|
||||
# ## BEGIN Object ###
|
||||
|
||||
def get_object(self, misp_object: MISPObject | int | str | UUID, pythonify: bool = False) -> dict[str, Any] | MISPObject:
|
||||
|
|
|
@ -3198,6 +3198,7 @@ class TestComprehensive(unittest.TestCase):
|
|||
self.admin_misp_connector.delete_event(event)
|
||||
self.admin_misp_connector.toggle_global_pythonify()
|
||||
|
||||
|
||||
def test_analyst_data_CRUD(self) -> None:
|
||||
event = self.create_simple_event()
|
||||
self.admin_misp_connector.toggle_global_pythonify()
|
||||
|
@ -3334,6 +3335,43 @@ class TestComprehensive(unittest.TestCase):
|
|||
except Exception:
|
||||
pass
|
||||
|
||||
def test_attach_galaxy_cluster(self) -> None:
|
||||
event = self.create_simple_event()
|
||||
event = self.admin_misp_connector.add_event(event, pythonify=True)
|
||||
try:
|
||||
galaxies: list[MISPGalaxy] = self.admin_misp_connector.galaxies(pythonify=True)
|
||||
galaxy: MISPGalaxy = galaxies[0]
|
||||
if gid := galaxy.id:
|
||||
galaxy = self.admin_misp_connector.get_galaxy(gid, withCluster=True, pythonify=True)
|
||||
else:
|
||||
raise Exception("No galaxy found")
|
||||
galaxy_cluster: MISPGalaxyCluster = galaxy.clusters[0]
|
||||
self.admin_misp_connector.attach_galaxy_cluster(event, galaxy_cluster)
|
||||
event = self.admin_misp_connector.get_event(event.id, pythonify=True)
|
||||
|
||||
self.assertEqual(len(event.galaxies), 1)
|
||||
event_galaxy = event.galaxies[0]
|
||||
# The galaxy ID should equal the galaxy from which the cluster came from
|
||||
self.assertEqual(event_galaxy.id, galaxy.id)
|
||||
# The galaxy cluster should equal the cluster added
|
||||
self.assertEqual(event_galaxy.clusters[0].id, galaxy_cluster.id)
|
||||
|
||||
galaxy_cluster: MISPGalaxyCluster = galaxy.clusters[1]
|
||||
|
||||
# Test on attribute
|
||||
attribute = event.Attribute[0]
|
||||
event = self.admin_misp_connector.get_event(event.id, pythonify=True)
|
||||
attribute = event.Attribute[0]
|
||||
self.assertEqual(len(attribute.galaxies), 1)
|
||||
attribute_galaxy = attribute.galaxies[0]
|
||||
# The galaxy ID should equal the galaxy from which the cluster came from
|
||||
self.assertEqual(attribute_galaxy.id, galaxy.id)
|
||||
# The galaxy cluster should equal the cluster added
|
||||
self.assertEqual(attribute_galaxy.clusters[0].id, galaxy_cluster.id)
|
||||
finally:
|
||||
self.admin_misp_connector.delete_event(event)
|
||||
self.admin_misp_connector.toggle_global_pythonify()
|
||||
|
||||
@unittest.skip("Internal use only")
|
||||
def missing_methods(self) -> None:
|
||||
skip = [
|
||||
|
|
Loading…
Reference in New Issue