From 18049212a5e24764b74bd16f00a0fa9d1d233538 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Thu, 22 Apr 2021 10:47:51 +0200 Subject: [PATCH] new: Support for correlation exclusion list Fix #732 --- pymisp/__init__.py | 7 +++- pymisp/api.py | 65 ++++++++++++++++++++++++++++++++- pymisp/mispevent.py | 8 ++++ tests/testlive_comprehensive.py | 17 ++++++++- 4 files changed, 94 insertions(+), 3 deletions(-) diff --git a/pymisp/__init__.py b/pymisp/__init__.py index a2b2e12..be022ac 100644 --- a/pymisp/__init__.py +++ b/pymisp/__init__.py @@ -28,7 +28,12 @@ try: warning_2022() from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa - from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, MISPGalaxyCluster, MISPGalaxyClusterElement, MISPGalaxyClusterRelation # noqa + from .mispevent import (MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, # noqa + MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, + MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, + MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, + MISPEventReport, MISPGalaxyCluster, MISPGalaxyClusterElement, MISPGalaxyClusterRelation, + MISPCorrelationExclusion) from .tools import AbstractMISPObjectGenerator # noqa from .tools import Neo4j # noqa from .tools import openioc # noqa diff --git a/pymisp/api.py b/pymisp/api.py index 2c22b87..82eb2ca 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -25,7 +25,7 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \ MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \ MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, \ - MISPGalaxyCluster, MISPGalaxyClusterRelation + MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types SearchType = TypeVar('SearchType', str, int) @@ -1301,6 +1301,69 @@ class PyMISP: # ## END Noticelist ### + # ## BEGIN Correlation Exclusions ### + + def correlation_exclusions(self, pythonify: bool = False) -> Union[Dict, List[MISPCorrelationExclusion]]: + """Get all the correlation exclusions + + :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM + """ + r = self._prepare_request('GET', 'correlation_exclusions') + correlation_exclusions = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in correlation_exclusions: + return correlation_exclusions + to_return = [] + for correlation_exclusion in correlation_exclusions: + c = MISPCorrelationExclusion() + c.from_dict(**correlation_exclusion) + to_return.append(c) + return to_return + + def get_correlation_exclusion(self, correlation_exclusion: Union[MISPCorrelationExclusion, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPCorrelationExclusion]: + """Get a correlation exclusion by ID + + :param correlation_exclusion: Correlation exclusion to get + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + exclusion_id = get_uuid_or_id_from_abstract_misp(correlation_exclusion) + r = self._prepare_request('GET', f'correlation_exclusions/view/{exclusion_id}') + correlation_exclusion_j = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in correlation_exclusion_j: + return correlation_exclusion_j + c = MISPCorrelationExclusion() + c.from_dict(**correlation_exclusion_j) + return c + + def add_correlation_exclusion(self, correlation_exclusion: MISPCorrelationExclusion, pythonify: bool = False) -> Union[Dict, MISPCorrelationExclusion]: + """Add a new correlation exclusion + + :param correlation_exclusion: correlation exclusion to add + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + r = self._prepare_request('POST', 'correlation_exclusions/add', data=correlation_exclusion) + new_correlation_exclusion = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in new_correlation_exclusion: + return new_correlation_exclusion + c = MISPCorrelationExclusion() + c.from_dict(**new_correlation_exclusion) + return c + + def delete_correlation_exclusion(self, correlation_exclusion: Union[MISPCorrelationExclusion, int, str, UUID]) -> Dict: + """Delete a correlation exclusion + + :param correlation_exclusion: The MISPCorrelationExclusion you wish to delete from MISP + """ + exclusion_id = get_uuid_or_id_from_abstract_misp(correlation_exclusion) + r = self._prepare_request('POST', f'correlation_exclusions/delete/{exclusion_id}') + return self._check_json_response(r) + + def clean_correlation_exclusions(self): + """Initiate correlation exclusions cleanup""" + r = self._prepare_request('POST', 'correlation_exclusions/clean') + return self._check_json_response(r) + + # ## END Correlation Exclusions ### + # ## BEGIN Galaxy ### def galaxies(self, pythonify: bool = False) -> Union[Dict, List[MISPGalaxy]]: diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 8a23a1d..7ffe792 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -2071,6 +2071,14 @@ class MISPNoticelist(AbstractMISP): super().from_dict(**kwargs) +class MISPCorrelationExclusion(AbstractMISP): + + def from_dict(self, **kwargs): + if 'CorrelationExclusion' in kwargs: + kwargs = kwargs['CorrelationExclusion'] + super().from_dict(**kwargs) + + class MISPRole(AbstractMISP): def __init__(self, **kwargs): diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 5c27529..c6c1013 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -27,7 +27,7 @@ logger = logging.getLogger('pymisp') try: - from pymisp import register_user, PyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting, MISPEventBlocklist, MISPEventReport, MISPGalaxyCluster + from pymisp import register_user, PyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting, MISPEventBlocklist, MISPEventReport, MISPCorrelationExclusion, MISPGalaxyCluster from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator from pymisp.exceptions import MISPServerError except ImportError: @@ -1633,6 +1633,21 @@ class TestComprehensive(unittest.TestCase): r = self.admin_misp_connector.disable_noticelist(testnl) self.assertFalse(r['Noticelist']['enabled'], r) + def test_correlation_exclusions(self): + newce = MISPCorrelationExclusion() + newce.value = "test-correlation-exclusion" + r = self.admin_misp_connector.add_correlation_exclusion(newce, pythonify=True) + self.assertEqual(r.value, newce.value) + correlation_exclusions = self.admin_misp_connector.correlation_exclusions(pythonify=True) + self.assertTrue(isinstance(correlation_exclusions, list)) + testce = correlation_exclusions[0] + r = self.admin_misp_connector.get_correlation_exclusion(testce, pythonify=True) + self.assertEqual(r.value, testce.value) + r = self.admin_misp_connector.delete_correlation_exclusion(r) + self.assertTrue(r['success']) + r = self.admin_misp_connector.clean_correlation_exclusions() + self.assertTrue(r['success']) + def test_galaxies(self): # Make sure we're up-to-date r = self.admin_misp_connector.update_galaxies()