chg: Add in nosetests for MISP Galaxy functions, check default key as a dict attribute not MISPAbstract attribute

pull/682/head
Tom King 2021-01-30 15:34:29 +00:00
parent 96636639c4
commit 7d4cfc40b7
3 changed files with 115 additions and 6 deletions

View File

@ -1359,7 +1359,7 @@ class PyMISP:
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
if galaxy_cluster.default:
if getattr(galaxy_cluster, "default", False):
# We can't add default galaxies
raise PyMISPError('You are not able add a default galaxy cluster')
galaxy_id = get_uuid_or_id_from_abstract_misp(galaxy)
@ -1378,10 +1378,11 @@ class PyMISP:
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
if galaxy_cluster.default:
if getattr(galaxy_cluster, "default", False):
# We can't edit default galaxies
raise PyMISPError('You are not able to update a default galaxy cluster')
cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster)
print(cluster_id)
r = self._prepare_request('POST', f'galaxy_clusters/edit/{cluster_id}', data=galaxy_cluster)
cluster_j = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in cluster_j:
@ -1395,7 +1396,7 @@ class PyMISP:
:param galaxy_cluster: The galaxy cluster you wish to publish
"""
if isinstance(galaxy_cluster, MISPGalaxyCluster) and galaxy_cluster.default:
if isinstance(galaxy_cluster, MISPGalaxyCluster) and getattr(galaxy_cluster, "default", False):
raise PyMISPError('You are not able to publish a default galaxy cluster')
cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster)
r = self._prepare_request('POST', f'galaxy_clusters/publish/{cluster_id}')
@ -1433,7 +1434,7 @@ class PyMISP:
:param hard: flag for hard delete
"""
if isinstance(galaxy_cluster, MISPGalaxyCluster) and galaxy_cluster.default:
if isinstance(galaxy_cluster, MISPGalaxyCluster) and getattr(galaxy_cluster, "default", False):
raise PyMISPError('You are not able to delete a default galaxy cluster')
data = {}
if hard:

View File

@ -9,6 +9,7 @@ import sys
from io import BytesIO, BufferedIOBase, TextIOBase
from zipfile import ZipFile
import uuid
from uuid import UUID
from collections import defaultdict
import logging
import hashlib
@ -1294,7 +1295,7 @@ class MISPGalaxyCluster(AbstractMISP):
self.cluster_elements.append(cluster_element)
return cluster_element
def add_cluster_relation(self, referenced_galaxy_cluster_uuid: uuid, referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: str = None, **kwargs) -> MISPGalaxyClusterRelation:
def add_cluster_relation(self, referenced_galaxy_cluster_uuid: Union["MISPGalaxyCluster", str, UUID], referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: str = None, **kwargs) -> MISPGalaxyClusterRelation:
"""Add a cluster relation to a MISPGalaxyCluster.
:param referenced_galaxy_cluster_uuid: UUID of the related cluster
@ -1308,6 +1309,10 @@ class MISPGalaxyCluster(AbstractMISP):
if not getattr(self, "uuid", None):
raise PyMISPError("The cluster does not have a UUID, make sure it is a valid galaxy cluster")
cluster_relation = MISPGalaxyClusterRelation()
if isinstance(referenced_galaxy_cluster_uuid, MISPGalaxyCluster):
referenced_galaxy_cluster_uuid = referenced_galaxy_cluster_uuid.uuid
cluster_relation.from_dict(
referenced_galaxy_cluster_uuid=referenced_galaxy_cluster_uuid,
referenced_galaxy_cluster_type=referenced_galaxy_cluster_type,

View File

@ -27,7 +27,7 @@ logger = logging.getLogger('pymisp')
try:
from pymisp import register_user, PyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting, MISPEventBlocklist, MISPEventReport
from pymisp import register_user, PyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting, MISPEventBlocklist, MISPEventReport, MISPGalaxyCluster
from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator
from pymisp.exceptions import MISPServerError
except ImportError:
@ -2690,6 +2690,109 @@ class TestComprehensive(unittest.TestCase):
self.user_misp_connector.delete_event(event)
self.user_misp_connector.delete_event_report(new_event_report)
def test_galaxy_cluster(self):
self.admin_misp_connector.toggle_global_pythonify()
galaxy = self.admin_misp_connector.galaxies()[0]
new_galaxy_cluster = MISPGalaxyCluster()
new_galaxy_cluster.value = "Test Cluster"
new_galaxy_cluster.authors = ["MISP"]
new_galaxy_cluster.distribution = 1
new_galaxy_cluster.description = "Example test cluster"
try:
galaxy = self.admin_misp_connector.get_galaxy(galaxy.id, withCluster=True)
existing_galaxy_cluster = galaxy.clusters[0]
new_galaxy_cluster = self.admin_misp_connector.add_galaxy_cluster(galaxy.id, new_galaxy_cluster)
# The new galaxy cluster should be under the selected galaxy
self.assertEqual(galaxy.id, new_galaxy_cluster.galaxy_id)
# The cluster should have the right value
self.assertEqual(new_galaxy_cluster.value, "Test Cluster")
new_galaxy_cluster.add_cluster_element("synonyms", "Test2")
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster)
# The cluster should have one element that is a synonym
self.assertEqual(len(new_galaxy_cluster.cluster_elements), 1)
element = new_galaxy_cluster.cluster_elements[0]
self.assertEqual(element.key, "synonyms")
self.assertEqual(element.value, "Test2")
# The cluster should have the old meta as a prop
self.assertEqual(new_galaxy_cluster.meta, {'synonyms': ['Test2']})
# The cluster element should be updatable
element.value = "Test3"
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster)
element = new_galaxy_cluster.cluster_elements[0]
self.assertEqual(element.value, "Test3")
new_galaxy_cluster.add_cluster_element("synonyms", "ToDelete")
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster)
# The cluster should have two elements
self.assertEqual(len(new_galaxy_cluster.cluster_elements), 2)
new_galaxy_cluster.cluster_elements = [e for e in new_galaxy_cluster.cluster_elements if e.value != "ToDelete"]
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster)
# The cluster elements should be deletable
self.assertEqual(len(new_galaxy_cluster.cluster_elements), 1)
new_galaxy_cluster.add_cluster_relation(existing_galaxy_cluster, "is-tested-by")
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster)
# The cluster should have a relationship
self.assertEqual(len(new_galaxy_cluster.cluster_relations), 1)
relation = new_galaxy_cluster.cluster_relations[0]
self.assertEqual(relation.referenced_galaxy_cluster_type, "is-tested-by")
self.assertEqual(relation.referenced_galaxy_cluster_uuid, existing_galaxy_cluster.uuid)
relation.add_tag("tlp:amber")
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster)
relation = new_galaxy_cluster.cluster_relations[0]
# The relationship should have a tag of tlp:amber
self.assertEqual(len(relation.tags), 1)
self.assertEqual(relation.tags[0].name, "tlp:amber")
# The cluster relations should be deletable
resp = self.admin_misp_connector.delete_galaxy_cluster_relation(relation)
self.assertTrue(resp['success'])
# The cluster relation should no longer be present
new_galaxy_cluster = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster)
self.assertEqual(len(new_galaxy_cluster.cluster_relations), 0)
resp = self.admin_misp_connector.delete_galaxy_cluster(new_galaxy_cluster)
# Galaxy clusters should be soft deletable
self.assertTrue(resp['success'])
new_galaxy_cluster = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster)
self.assertTrue(isinstance(new_galaxy_cluster, MISPGalaxyCluster))
resp = self.admin_misp_connector.delete_galaxy_cluster(new_galaxy_cluster, hard=True)
# Galaxy clusters should be hard deletable
self.assertTrue(resp['success'])
resp = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster)
self.assertTrue("errors" in resp)
finally:
self.admin_misp_connector.delete_galaxy_cluster(new_galaxy_cluster)
self.admin_misp_connector.toggle_global_pythonify()
def test_event_galaxy(self):
self.admin_misp_connector.toggle_global_pythonify()
event = self.create_simple_event()
try:
galaxy = self.admin_misp_connector.galaxies()[0]
galaxy = self.admin_misp_connector.get_galaxy(galaxy.id, withCluster=True)
galaxy_cluster = galaxy.clusters[0]
event.add_tag(galaxy_cluster.tag_name)
event = self.admin_misp_connector.add_event(event)
# The event should have a galaxy attached
self.assertEqual(len(event.galaxies), 1)
event_galaxy = event.galaxies[0]
# The galaxy ID should equal the galaxy from which the cluster came from
self.assertEqual(event_galaxy.id, galaxy.id)
# The galaxy cluster should equal the cluster added
self.assertEqual(event_galaxy.clusters[0].id, galaxy_cluster.id)
finally:
self.admin_misp_connector.delete_event(event)
self.admin_misp_connector.toggle_global_pythonify()
@unittest.skip("Internal use only")
def missing_methods(self):
skip = [