diff --git a/pymisp/api.py b/pymisp/api.py index 316b841..55dd2bb 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -51,6 +51,13 @@ def get_uuid_or_id_from_abstract_misp(obj: Union[AbstractMISP, int, str, UUID]) if isinstance(obj, MISPEventDelegation): # An EventDelegation doesn't have a uuid, we *need* to use the ID return obj['id'] + + # For the blacklists, we want to return a specific key. + if isinstance(obj, MISPEventBlacklist): + return obj.event_uuid + if isinstance(obj, MISPOrganisationBlacklist): + return obj.org_uuid + if 'uuid' in obj: return obj['uuid'] return obj['id'] @@ -2205,56 +2212,75 @@ class PyMISP: to_return.append(obl) return to_return - def _add_entries_to_blacklist(self, blacklist_type: str, uuids: List[str], **kwargs) -> Dict: + def _add_entries_to_blacklist(self, blacklist_type: str, uuids: Union[str, List[str]], **kwargs) -> Dict: if blacklist_type == 'event': url = 'eventBlacklists/add' elif blacklist_type == 'organisation': url = 'orgBlacklists/add' else: raise PyMISPError('blacklist_type can only be "event" or "organisation"') + if isinstance(uuids, str): + uuids = [uuids] data = {'uuids': uuids} if kwargs: data.update({k: v for k, v in kwargs.items() if v}) r = self._prepare_request('POST', url, data=data) return self._check_json_response(r) - def add_event_blacklist(self, uuids: List[str], comment: Optional[str] = None, + def add_event_blacklist(self, uuids: Union[str, List[str]], comment: Optional[str] = None, event_info: Optional[str] = None, event_orgc: Optional[str] = None) -> Dict: '''Add a new event in the blacklist''' return self._add_entries_to_blacklist('event', uuids=uuids, comment=comment, event_info=event_info, event_orgc=event_orgc) - def add_organisation_blacklist(self, uuids: List[str], comment: Optional[str] = None, + def add_organisation_blacklist(self, uuids: Union[str, List[str]], comment: Optional[str] = None, org_name: Optional[str] = None) -> Dict: '''Add a new organisation in the blacklist''' return self._add_entries_to_blacklist('organisation', uuids=uuids, comment=comment, org_name=org_name) - """ - # Not working yet - def update_event_blacklist(self, event_blacklist: MISPEventBlacklist, event_blacklist_id: Optional[int] = None, pythonify: bool = False) -> Union[Dict, MISPEventBlacklist]: + def _update_entries_in_blacklist(self, blacklist_type: str, uuid, **kwargs) -> Dict: + if blacklist_type == 'event': + url = f'eventBlacklists/edit/{uuid}' + elif blacklist_type == 'organisation': + url = f'orgBlacklists/edit/{uuid}' + else: + raise PyMISPError('blacklist_type can only be "event" or "organisation"') + data = {k: v for k, v in kwargs.items() if v} + r = self._prepare_request('POST', url, data=data) + return self._check_json_response(r) + + def update_event_blacklist(self, event_blacklist: MISPEventBlacklist, event_blacklist_id: Optional[Union[int, str, UUID]] = None, pythonify: bool = False) -> Union[Dict, MISPEventBlacklist]: '''Update an event in the blacklist''' if event_blacklist_id is None: eblid = get_uuid_or_id_from_abstract_misp(event_blacklist) else: eblid = get_uuid_or_id_from_abstract_misp(event_blacklist_id) - url = f'eventBlacklists/edit/{eblid}' - # event_blacklist.uuids = [event_blacklist.pop('event_uuid')] - print(event_blacklist.to_json(indent=2)) - r = self._prepare_request('POST', url, data={'EventBlacklist': event_blacklist}) - updated_event_blacklist = self._check_json_response(r) + updated_event_blacklist = self._update_entries_in_blacklist('event', eblid, **event_blacklist) if not (self.global_pythonify or pythonify) or 'errors' in updated_event_blacklist: return updated_event_blacklist e = MISPEventBlacklist() e.from_dict(**updated_event_blacklist) return e - """ - def delete_event_blacklist(self, event_blacklist: Union[MISPEventBlacklist, int, str, UUID]) -> Dict: + def update_organisation_blacklist(self, organisation_blacklist: MISPOrganisationBlacklist, organisation_blacklist_id: Optional[Union[int, str, UUID]] = None, pythonify: bool = False) -> Union[Dict, MISPOrganisationBlacklist]: + '''Update an organisation in the blacklist''' + if organisation_blacklist_id is None: + oblid = get_uuid_or_id_from_abstract_misp(organisation_blacklist) + else: + oblid = get_uuid_or_id_from_abstract_misp(organisation_blacklist_id) + updated_organisation_blacklist = self._update_entries_in_blacklist('organisation', oblid, **organisation_blacklist) + if not (self.global_pythonify or pythonify) or 'errors' in updated_organisation_blacklist: + return updated_organisation_blacklist + o = MISPOrganisationBlacklist() + o.from_dict(**updated_organisation_blacklist) + return o + + def delete_event_blacklist(self, event_blacklist: Union[MISPEventBlacklist, str, UUID]) -> Dict: '''Delete a blacklisted event''' event_blacklist_id = get_uuid_or_id_from_abstract_misp(event_blacklist) response = self._prepare_request('POST', f'eventBlacklists/delete/{event_blacklist_id}') return self._check_json_response(response) - def delete_organisation_blacklist(self, organisation_blacklist: Union[MISPOrganisationBlacklist, int, str, UUID]) -> Dict: + def delete_organisation_blacklist(self, organisation_blacklist: Union[MISPOrganisationBlacklist, str, UUID]) -> Dict: '''Delete a blacklisted organisation''' org_blacklist_id = get_uuid_or_id_from_abstract_misp(organisation_blacklist) response = self._prepare_request('POST', f'orgBlacklists/delete/{org_blacklist_id}') @@ -2351,12 +2377,12 @@ class PyMISP: with open(__file__) as f: content = f.read() - not_implemented = [] + not_implemented_paths: List[str] = [] for path in paths: if path not in content: - not_implemented.append(path) + not_implemented_paths.append(path) - return not_implemented + return not_implemented_paths # ## Internal methods ### diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 25d9eed..3ab97d5 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -1701,6 +1701,10 @@ class MISPInbox(AbstractMISP): class MISPEventBlacklist(AbstractMISP): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.event_uuid: str + def from_dict(self, **kwargs): if 'EventBlacklist' in kwargs: kwargs = kwargs['EventBlacklist'] @@ -1712,6 +1716,10 @@ class MISPEventBlacklist(AbstractMISP): class MISPOrganisationBlacklist(AbstractMISP): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.org_uuid: str + def from_dict(self, **kwargs): if 'OrgBlacklist' in kwargs: kwargs = kwargs['OrgBlacklist'] diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 31ee091..3f456b7 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -2389,16 +2389,17 @@ class TestComprehensive(unittest.TestCase): raise Exception('Unable to find UUID in Events blacklist') first = self.user_misp_connector.add_event(first, pythonify=True) self.assertEqual(first['errors'][1]['message'], 'Could not add Event', first) - # ble.comment = 'This is a test' - # ble.event_info = 'foo' - # ble.event_orgc = 'bar' - # ble = self.admin_misp_connector.update_event_blacklist(ble) - # print(ble.to_json(indent=2)) - # self.assertEqual(ble.comment, 'This is a test') + ble.comment = 'This is a test' + ble.event_info = 'foo' + ble.event_orgc = 'bar' + ble = self.admin_misp_connector.update_event_blacklist(ble, pythonify=True) + self.assertEqual(ble.comment, 'This is a test') + r = self.admin_misp_connector.delete_event_blacklist(ble) + self.assertTrue(r['success']) # test Org BL - obl = self.admin_misp_connector.add_organisation_blacklist(uuids=[self.test_org.uuid]) - self.assertEqual(ebl['result']['successes'][0], self.test_org.uuid, obl) + obl = self.admin_misp_connector.add_organisation_blacklist(uuids=self.test_org.uuid) + self.assertEqual(obl['result']['successes'][0], self.test_org.uuid, obl) bl_orgs = self.admin_misp_connector.organisation_blacklists(pythonify=True) for blo in bl_orgs: if blo.org_uuid == self.test_org.uuid: @@ -2408,6 +2409,14 @@ class TestComprehensive(unittest.TestCase): raise Exception('Unable to find UUID in Orgs blacklist') first = self.user_misp_connector.add_event(first, pythonify=True) self.assertEqual(first['errors'][1]['message'], 'Could not add Event', first) + + blo.comment = 'This is a test' + blo.org_name = 'bar' + blo = self.admin_misp_connector.update_organisation_blacklist(blo, pythonify=True) + self.assertEqual(blo.org_name, 'bar') + r = self.admin_misp_connector.delete_organisation_blacklist(blo) + self.assertTrue(r['success']) + finally: for ble in to_delete['bl_events']: self.admin_misp_connector.delete_event_blacklist(ble) @@ -2452,7 +2461,6 @@ class TestComprehensive(unittest.TestCase): "attributes/exportSearch", 'dashboards', 'decayingModel', - "eventBlacklists/edit", "eventBlacklists/massDelete", "eventDelegations/view", "eventDelegations/index", @@ -2585,7 +2593,6 @@ class TestComprehensive(unittest.TestCase): "organisations/fetchSGOrgRow", "organisations/getUUIDs", "admin/organisations/merge", - 'orgBlacklists/edit', "pages/display", "posts/pushMessageToZMQ", "posts/add",