chg: Cleanup blocklist methods

pull/612/head
Raphaël Vinot 2020-08-04 12:20:21 +02:00
parent 2bbf888ca7
commit be8c94e6e7
3 changed files with 68 additions and 27 deletions

View File

@ -51,6 +51,13 @@ def get_uuid_or_id_from_abstract_misp(obj: Union[AbstractMISP, int, str, UUID])
if isinstance(obj, MISPEventDelegation): if isinstance(obj, MISPEventDelegation):
# An EventDelegation doesn't have a uuid, we *need* to use the ID # An EventDelegation doesn't have a uuid, we *need* to use the ID
return obj['id'] return obj['id']
# For the blacklists, we want to return a specific key.
if isinstance(obj, MISPEventBlacklist):
return obj.event_uuid
if isinstance(obj, MISPOrganisationBlacklist):
return obj.org_uuid
if 'uuid' in obj: if 'uuid' in obj:
return obj['uuid'] return obj['uuid']
return obj['id'] return obj['id']
@ -2205,56 +2212,75 @@ class PyMISP:
to_return.append(obl) to_return.append(obl)
return to_return return to_return
def _add_entries_to_blacklist(self, blacklist_type: str, uuids: List[str], **kwargs) -> Dict: def _add_entries_to_blacklist(self, blacklist_type: str, uuids: Union[str, List[str]], **kwargs) -> Dict:
if blacklist_type == 'event': if blacklist_type == 'event':
url = 'eventBlacklists/add' url = 'eventBlacklists/add'
elif blacklist_type == 'organisation': elif blacklist_type == 'organisation':
url = 'orgBlacklists/add' url = 'orgBlacklists/add'
else: else:
raise PyMISPError('blacklist_type can only be "event" or "organisation"') raise PyMISPError('blacklist_type can only be "event" or "organisation"')
if isinstance(uuids, str):
uuids = [uuids]
data = {'uuids': uuids} data = {'uuids': uuids}
if kwargs: if kwargs:
data.update({k: v for k, v in kwargs.items() if v}) data.update({k: v for k, v in kwargs.items() if v})
r = self._prepare_request('POST', url, data=data) r = self._prepare_request('POST', url, data=data)
return self._check_json_response(r) return self._check_json_response(r)
def add_event_blacklist(self, uuids: List[str], comment: Optional[str] = None, def add_event_blacklist(self, uuids: Union[str, List[str]], comment: Optional[str] = None,
event_info: Optional[str] = None, event_orgc: Optional[str] = None) -> Dict: event_info: Optional[str] = None, event_orgc: Optional[str] = None) -> Dict:
'''Add a new event in the blacklist''' '''Add a new event in the blacklist'''
return self._add_entries_to_blacklist('event', uuids=uuids, comment=comment, event_info=event_info, event_orgc=event_orgc) return self._add_entries_to_blacklist('event', uuids=uuids, comment=comment, event_info=event_info, event_orgc=event_orgc)
def add_organisation_blacklist(self, uuids: List[str], comment: Optional[str] = None, def add_organisation_blacklist(self, uuids: Union[str, List[str]], comment: Optional[str] = None,
org_name: Optional[str] = None) -> Dict: org_name: Optional[str] = None) -> Dict:
'''Add a new organisation in the blacklist''' '''Add a new organisation in the blacklist'''
return self._add_entries_to_blacklist('organisation', uuids=uuids, comment=comment, org_name=org_name) return self._add_entries_to_blacklist('organisation', uuids=uuids, comment=comment, org_name=org_name)
""" def _update_entries_in_blacklist(self, blacklist_type: str, uuid, **kwargs) -> Dict:
# Not working yet if blacklist_type == 'event':
def update_event_blacklist(self, event_blacklist: MISPEventBlacklist, event_blacklist_id: Optional[int] = None, pythonify: bool = False) -> Union[Dict, MISPEventBlacklist]: url = f'eventBlacklists/edit/{uuid}'
elif blacklist_type == 'organisation':
url = f'orgBlacklists/edit/{uuid}'
else:
raise PyMISPError('blacklist_type can only be "event" or "organisation"')
data = {k: v for k, v in kwargs.items() if v}
r = self._prepare_request('POST', url, data=data)
return self._check_json_response(r)
def update_event_blacklist(self, event_blacklist: MISPEventBlacklist, event_blacklist_id: Optional[Union[int, str, UUID]] = None, pythonify: bool = False) -> Union[Dict, MISPEventBlacklist]:
'''Update an event in the blacklist''' '''Update an event in the blacklist'''
if event_blacklist_id is None: if event_blacklist_id is None:
eblid = get_uuid_or_id_from_abstract_misp(event_blacklist) eblid = get_uuid_or_id_from_abstract_misp(event_blacklist)
else: else:
eblid = get_uuid_or_id_from_abstract_misp(event_blacklist_id) eblid = get_uuid_or_id_from_abstract_misp(event_blacklist_id)
url = f'eventBlacklists/edit/{eblid}' updated_event_blacklist = self._update_entries_in_blacklist('event', eblid, **event_blacklist)
# event_blacklist.uuids = [event_blacklist.pop('event_uuid')]
print(event_blacklist.to_json(indent=2))
r = self._prepare_request('POST', url, data={'EventBlacklist': event_blacklist})
updated_event_blacklist = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in updated_event_blacklist: if not (self.global_pythonify or pythonify) or 'errors' in updated_event_blacklist:
return updated_event_blacklist return updated_event_blacklist
e = MISPEventBlacklist() e = MISPEventBlacklist()
e.from_dict(**updated_event_blacklist) e.from_dict(**updated_event_blacklist)
return e return e
"""
def delete_event_blacklist(self, event_blacklist: Union[MISPEventBlacklist, int, str, UUID]) -> Dict: def update_organisation_blacklist(self, organisation_blacklist: MISPOrganisationBlacklist, organisation_blacklist_id: Optional[Union[int, str, UUID]] = None, pythonify: bool = False) -> Union[Dict, MISPOrganisationBlacklist]:
'''Update an organisation in the blacklist'''
if organisation_blacklist_id is None:
oblid = get_uuid_or_id_from_abstract_misp(organisation_blacklist)
else:
oblid = get_uuid_or_id_from_abstract_misp(organisation_blacklist_id)
updated_organisation_blacklist = self._update_entries_in_blacklist('organisation', oblid, **organisation_blacklist)
if not (self.global_pythonify or pythonify) or 'errors' in updated_organisation_blacklist:
return updated_organisation_blacklist
o = MISPOrganisationBlacklist()
o.from_dict(**updated_organisation_blacklist)
return o
def delete_event_blacklist(self, event_blacklist: Union[MISPEventBlacklist, str, UUID]) -> Dict:
'''Delete a blacklisted event''' '''Delete a blacklisted event'''
event_blacklist_id = get_uuid_or_id_from_abstract_misp(event_blacklist) event_blacklist_id = get_uuid_or_id_from_abstract_misp(event_blacklist)
response = self._prepare_request('POST', f'eventBlacklists/delete/{event_blacklist_id}') response = self._prepare_request('POST', f'eventBlacklists/delete/{event_blacklist_id}')
return self._check_json_response(response) return self._check_json_response(response)
def delete_organisation_blacklist(self, organisation_blacklist: Union[MISPOrganisationBlacklist, int, str, UUID]) -> Dict: def delete_organisation_blacklist(self, organisation_blacklist: Union[MISPOrganisationBlacklist, str, UUID]) -> Dict:
'''Delete a blacklisted organisation''' '''Delete a blacklisted organisation'''
org_blacklist_id = get_uuid_or_id_from_abstract_misp(organisation_blacklist) org_blacklist_id = get_uuid_or_id_from_abstract_misp(organisation_blacklist)
response = self._prepare_request('POST', f'orgBlacklists/delete/{org_blacklist_id}') response = self._prepare_request('POST', f'orgBlacklists/delete/{org_blacklist_id}')
@ -2351,12 +2377,12 @@ class PyMISP:
with open(__file__) as f: with open(__file__) as f:
content = f.read() content = f.read()
not_implemented = [] not_implemented_paths: List[str] = []
for path in paths: for path in paths:
if path not in content: if path not in content:
not_implemented.append(path) not_implemented_paths.append(path)
return not_implemented return not_implemented_paths
# ## Internal methods ### # ## Internal methods ###

View File

@ -1701,6 +1701,10 @@ class MISPInbox(AbstractMISP):
class MISPEventBlacklist(AbstractMISP): class MISPEventBlacklist(AbstractMISP):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.event_uuid: str
def from_dict(self, **kwargs): def from_dict(self, **kwargs):
if 'EventBlacklist' in kwargs: if 'EventBlacklist' in kwargs:
kwargs = kwargs['EventBlacklist'] kwargs = kwargs['EventBlacklist']
@ -1712,6 +1716,10 @@ class MISPEventBlacklist(AbstractMISP):
class MISPOrganisationBlacklist(AbstractMISP): class MISPOrganisationBlacklist(AbstractMISP):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.org_uuid: str
def from_dict(self, **kwargs): def from_dict(self, **kwargs):
if 'OrgBlacklist' in kwargs: if 'OrgBlacklist' in kwargs:
kwargs = kwargs['OrgBlacklist'] kwargs = kwargs['OrgBlacklist']

View File

@ -2389,16 +2389,17 @@ class TestComprehensive(unittest.TestCase):
raise Exception('Unable to find UUID in Events blacklist') raise Exception('Unable to find UUID in Events blacklist')
first = self.user_misp_connector.add_event(first, pythonify=True) first = self.user_misp_connector.add_event(first, pythonify=True)
self.assertEqual(first['errors'][1]['message'], 'Could not add Event', first) self.assertEqual(first['errors'][1]['message'], 'Could not add Event', first)
# ble.comment = 'This is a test' ble.comment = 'This is a test'
# ble.event_info = 'foo' ble.event_info = 'foo'
# ble.event_orgc = 'bar' ble.event_orgc = 'bar'
# ble = self.admin_misp_connector.update_event_blacklist(ble) ble = self.admin_misp_connector.update_event_blacklist(ble, pythonify=True)
# print(ble.to_json(indent=2)) self.assertEqual(ble.comment, 'This is a test')
# self.assertEqual(ble.comment, 'This is a test') r = self.admin_misp_connector.delete_event_blacklist(ble)
self.assertTrue(r['success'])
# test Org BL # test Org BL
obl = self.admin_misp_connector.add_organisation_blacklist(uuids=[self.test_org.uuid]) obl = self.admin_misp_connector.add_organisation_blacklist(uuids=self.test_org.uuid)
self.assertEqual(ebl['result']['successes'][0], self.test_org.uuid, obl) self.assertEqual(obl['result']['successes'][0], self.test_org.uuid, obl)
bl_orgs = self.admin_misp_connector.organisation_blacklists(pythonify=True) bl_orgs = self.admin_misp_connector.organisation_blacklists(pythonify=True)
for blo in bl_orgs: for blo in bl_orgs:
if blo.org_uuid == self.test_org.uuid: if blo.org_uuid == self.test_org.uuid:
@ -2408,6 +2409,14 @@ class TestComprehensive(unittest.TestCase):
raise Exception('Unable to find UUID in Orgs blacklist') raise Exception('Unable to find UUID in Orgs blacklist')
first = self.user_misp_connector.add_event(first, pythonify=True) first = self.user_misp_connector.add_event(first, pythonify=True)
self.assertEqual(first['errors'][1]['message'], 'Could not add Event', first) self.assertEqual(first['errors'][1]['message'], 'Could not add Event', first)
blo.comment = 'This is a test'
blo.org_name = 'bar'
blo = self.admin_misp_connector.update_organisation_blacklist(blo, pythonify=True)
self.assertEqual(blo.org_name, 'bar')
r = self.admin_misp_connector.delete_organisation_blacklist(blo)
self.assertTrue(r['success'])
finally: finally:
for ble in to_delete['bl_events']: for ble in to_delete['bl_events']:
self.admin_misp_connector.delete_event_blacklist(ble) self.admin_misp_connector.delete_event_blacklist(ble)
@ -2452,7 +2461,6 @@ class TestComprehensive(unittest.TestCase):
"attributes/exportSearch", "attributes/exportSearch",
'dashboards', 'dashboards',
'decayingModel', 'decayingModel',
"eventBlacklists/edit",
"eventBlacklists/massDelete", "eventBlacklists/massDelete",
"eventDelegations/view", "eventDelegations/view",
"eventDelegations/index", "eventDelegations/index",
@ -2585,7 +2593,6 @@ class TestComprehensive(unittest.TestCase):
"organisations/fetchSGOrgRow", "organisations/fetchSGOrgRow",
"organisations/getUUIDs", "organisations/getUUIDs",
"admin/organisations/merge", "admin/organisations/merge",
'orgBlacklists/edit',
"pages/display", "pages/display",
"posts/pushMessageToZMQ", "posts/pushMessageToZMQ",
"posts/add", "posts/add",