diff --git a/pymisp/__init__.py b/pymisp/__init__.py index b93f00e..c944af5 100644 --- a/pymisp/__init__.py +++ b/pymisp/__init__.py @@ -33,7 +33,7 @@ try: MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, MISPGalaxyCluster, MISPGalaxyClusterElement, MISPGalaxyClusterRelation, - MISPCorrelationExclusion, MISPGalaxy) + MISPCorrelationExclusion, MISPGalaxy, MISPDecayingModel) from .tools import AbstractMISPObjectGenerator # noqa from .tools import Neo4j # noqa from .tools import stix # noqa diff --git a/pymisp/api.py b/pymisp/api.py index 5129f4e..cd2a74a 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -25,7 +25,7 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \ MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \ MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, \ - MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion + MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types @@ -2420,6 +2420,49 @@ class PyMISP: # ## END Role ### + # ## BEGIN Decaying Models ### + + def update_decaying_models(self) -> Dict: + """Update all the Decaying models""" + response = self._prepare_request('POST', 'decayingModel/update') + return self._check_json_response(response) + + def decaying_models(self, pythonify: bool = False) -> Union[Dict, List[MISPDecayingModel]]: + """Get all the decaying models + + :param pythonify: Returns a list of PyMISP Objects instead of the plain json output + """ + r = self._prepare_request('GET', 'decayingModel/index') + models = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in models: + return models + to_return = [] + for model in models: + n = MISPDecayingModel() + n.from_dict(**model) + to_return.append(n) + return to_return + + def enable_decaying_model(self, decaying_model: Union[MISPDecayingModel, int, str]) -> Dict: + """Enable a decaying Model""" + if isinstance(decaying_model, MISPDecayingModel): + decaying_model_id = decaying_model.id + else: + decaying_model_id = int(decaying_model) + response = self._prepare_request('POST', f'decayingModel/enable/{decaying_model_id}') + return self._check_json_response(response) + + def disable_decaying_model(self, decaying_model: Union[MISPDecayingModel, int, str]) -> Dict: + """Disable a decaying Model""" + if isinstance(decaying_model, MISPDecayingModel): + decaying_model_id = decaying_model.id + else: + decaying_model_id = int(decaying_model) + response = self._prepare_request('POST', f'decayingModel/disable/{decaying_model_id}') + return self._check_json_response(response) + + # ## END Decaying Models ### + # ## BEGIN Search methods ### def search(self, controller: str = 'events', return_format: str = 'json', diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index d1847c4..3bc9d13 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -2349,3 +2349,19 @@ class MISPOrganisationBlocklist(AbstractMISP): def __repr__(self): return f'<{self.__class__.__name__}(org_uuid={self.org_uuid}' + + +class MISPDecayingModel(AbstractMISP): + + def __init__(self, **kwargs: Dict) -> None: + super().__init__(**kwargs) + self.uuid: str + self.id: int + + def from_dict(self, **kwargs): + if 'DecayingModel' in kwargs: + kwargs = kwargs['DecayingModel'] + super().from_dict(**kwargs) + + def __repr__(self): + return f'<{self.__class__.__name__}(uuid={self.uuid})>' diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 4e9370a..b12d216 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -679,6 +679,31 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(first) self.admin_misp_connector.delete_event(second) + def test_search_decay(self): + # Creating event 1 + first = self.create_simple_event() + first.add_attribute('ip-dst', '8.8.8.8') + first.publish() + try: + r = self.admin_misp_connector.update_decaying_models() + self.assertTrue(r['success'], r) + simple_decaying_model = None + models = self.admin_misp_connector.decaying_models(pythonify=True) + for model in models: + if model.name == 'NIDS Simple Decaying Model': + simple_decaying_model = model + self.assertTrue(simple_decaying_model, models) + self.admin_misp_connector.enable_decaying_model(simple_decaying_model) + # TODO: check the response, it is curently an empty list + first = self.pub_misp_connector.add_event(first, pythonify=True) + result = self.pub_misp_connector.search('attributes', to_ids=1, includeDecayScore=True, pythonify=True) + self.assertTrue(result[0].decay_score, result[0].to_json(indent=2)) + self.admin_misp_connector.disable_decaying_model(simple_decaying_model) + # TODO: check the response, it is curently a list of all the models + finally: + # Delete event + self.admin_misp_connector.delete_event(first) + def test_default_distribution(self): '''The default distributions on the VM are This community only for the events and Inherit from event for attr/obj)''' first = self.create_simple_event()