new: Basic support for listing, enabling and disabling decaying models

pull/880/head
Raphaël Vinot 2022-11-22 14:48:23 +01:00
parent 6748ad8a62
commit 2de22871d1
4 changed files with 86 additions and 2 deletions

View File

@ -33,7 +33,7 @@ try:
MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed,
MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist,
MISPEventReport, MISPGalaxyCluster, MISPGalaxyClusterElement, MISPGalaxyClusterRelation,
MISPCorrelationExclusion, MISPGalaxy)
MISPCorrelationExclusion, MISPGalaxy, MISPDecayingModel)
from .tools import AbstractMISPObjectGenerator # noqa
from .tools import Neo4j # noqa
from .tools import stix # noqa

View File

@ -25,7 +25,7 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje
MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, \
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types
@ -2420,6 +2420,49 @@ class PyMISP:
# ## END Role ###
# ## BEGIN Decaying Models ###
def update_decaying_models(self) -> Dict:
"""Update all the Decaying models"""
response = self._prepare_request('POST', 'decayingModel/update')
return self._check_json_response(response)
def decaying_models(self, pythonify: bool = False) -> Union[Dict, List[MISPDecayingModel]]:
"""Get all the decaying models
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output
"""
r = self._prepare_request('GET', 'decayingModel/index')
models = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in models:
return models
to_return = []
for model in models:
n = MISPDecayingModel()
n.from_dict(**model)
to_return.append(n)
return to_return
def enable_decaying_model(self, decaying_model: Union[MISPDecayingModel, int, str]) -> Dict:
"""Enable a decaying Model"""
if isinstance(decaying_model, MISPDecayingModel):
decaying_model_id = decaying_model.id
else:
decaying_model_id = int(decaying_model)
response = self._prepare_request('POST', f'decayingModel/enable/{decaying_model_id}')
return self._check_json_response(response)
def disable_decaying_model(self, decaying_model: Union[MISPDecayingModel, int, str]) -> Dict:
"""Disable a decaying Model"""
if isinstance(decaying_model, MISPDecayingModel):
decaying_model_id = decaying_model.id
else:
decaying_model_id = int(decaying_model)
response = self._prepare_request('POST', f'decayingModel/disable/{decaying_model_id}')
return self._check_json_response(response)
# ## END Decaying Models ###
# ## BEGIN Search methods ###
def search(self, controller: str = 'events', return_format: str = 'json',

View File

@ -2349,3 +2349,19 @@ class MISPOrganisationBlocklist(AbstractMISP):
def __repr__(self):
return f'<{self.__class__.__name__}(org_uuid={self.org_uuid}'
class MISPDecayingModel(AbstractMISP):
def __init__(self, **kwargs: Dict) -> None:
super().__init__(**kwargs)
self.uuid: str
self.id: int
def from_dict(self, **kwargs):
if 'DecayingModel' in kwargs:
kwargs = kwargs['DecayingModel']
super().from_dict(**kwargs)
def __repr__(self):
return f'<{self.__class__.__name__}(uuid={self.uuid})>'

View File

@ -679,6 +679,31 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
def test_search_decay(self):
# Creating event 1
first = self.create_simple_event()
first.add_attribute('ip-dst', '8.8.8.8')
first.publish()
try:
r = self.admin_misp_connector.update_decaying_models()
self.assertTrue(r['success'], r)
simple_decaying_model = None
models = self.admin_misp_connector.decaying_models(pythonify=True)
for model in models:
if model.name == 'NIDS Simple Decaying Model':
simple_decaying_model = model
self.assertTrue(simple_decaying_model, models)
self.admin_misp_connector.enable_decaying_model(simple_decaying_model)
# TODO: check the response, it is curently an empty list
first = self.pub_misp_connector.add_event(first, pythonify=True)
result = self.pub_misp_connector.search('attributes', to_ids=1, includeDecayScore=True, pythonify=True)
self.assertTrue(result[0].decay_score, result[0].to_json(indent=2))
self.admin_misp_connector.disable_decaying_model(simple_decaying_model)
# TODO: check the response, it is curently a list of all the models
finally:
# Delete event
self.admin_misp_connector.delete_event(first)
def test_default_distribution(self):
'''The default distributions on the VM are This community only for the events and Inherit from event for attr/obj)'''
first = self.create_simple_event()