new: Better handling of delete(d) attributes

* Hard delete on attribute
* Get the deleted attributes within an event
pull/464/head
Raphaël Vinot 2019-09-13 16:12:40 +02:00
parent 266b7abfcd
commit 7324ae72e6
3 changed files with 24 additions and 8 deletions

View File

@ -446,7 +446,7 @@ class PyMISP(object): # pragma: no cover
url = urljoin(self.root_url, 'attributes/delete/{}/1'.format(attribute_id))
else:
url = urljoin(self.root_url, 'attributes/delete/{}'.format(attribute_id))
response = self._prepare_request('GET', url)
response = self._prepare_request('POST', url)
return self._check_response(response)
@deprecated(reason="Use ExpandedPyMISP.push_event_to_ZMQ", action='default')

View File

@ -154,7 +154,9 @@ class ExpandedPyMISP(PyMISP):
def update_misp(self):
response = self._prepare_request('POST', '/servers/update')
return self._check_response(response, lenient_response_type=True)
if self._old_misp((2, 4, 116), '2020-01-01', sys._getframe().f_code.co_name):
return self._check_response(response, lenient_response_type=True)
return self._check_response(response, expect_json=True)
def set_server_setting(self, setting: str, value: Union[str, int, bool], force: bool=False):
data = {'value': value, 'force': force}
@ -190,10 +192,14 @@ class ExpandedPyMISP(PyMISP):
to_return.append(e)
return to_return
def get_event(self, event: Union[MISPEvent, int, str, UUID], pythonify: bool=False):
def get_event(self, event: Union[MISPEvent, int, str, UUID], deleted: [bool, int, list]=False, pythonify: bool=False):
'''Get an event from a MISP instance'''
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
event = self._prepare_request('GET', f'events/{event_id}')
if deleted:
data = {'deleted': deleted}
event = self._prepare_request('POST', f'events/view/{event_id}', data=data)
else:
event = self._prepare_request('GET', f'events/view/{event_id}')
event = self._check_response(event, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in event:
return event
@ -423,10 +429,13 @@ class ExpandedPyMISP(PyMISP):
a.from_dict(**updated_attribute)
return a
def delete_attribute(self, attribute: Union[MISPAttribute, int, str, UUID]):
def delete_attribute(self, attribute: Union[MISPAttribute, int, str, UUID], hard: bool=False):
'''Delete an attribute from a MISP instance'''
attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute)
response = self._prepare_request('POST', f'attributes/delete/{attribute_id}')
data = {}
if hard:
data['hard'] = 1
response = self._prepare_request('POST', f'attributes/delete/{attribute_id}', data=data)
response = self._check_response(response, expect_json=True)
if ('errors' in response and response['errors'][0] == 403
and response['errors'][1]['message'] == 'You do not have permission to do that.'):

View File

@ -1503,16 +1503,23 @@ class TestComprehensive(unittest.TestCase):
attribute = self.user_misp_connector.update_attribute({'comment': 'blah'}, second.attributes[0].id)
self.assertTrue(isinstance(attribute, MISPShadowAttribute), attribute)
self.assertEqual(attribute.value, second.attributes[0].value)
second = self.admin_misp_connector.get_event(second, pythonify=True)
self.assertEqual(len(second.attributes), 3)
# Delete attribute owned by someone else
response = self.user_misp_connector.delete_attribute(second.attributes[1])
self.assertTrue(response['success'])
# Delete attribute owned by user
response = self.admin_misp_connector.delete_attribute(second.attributes[1])
self.assertEqual(response['message'], 'Attribute deleted.')
# Hard delete
response = self.admin_misp_connector.delete_attribute(second.attributes[0], hard=True)
self.assertEqual(response['message'], 'Attribute deleted.')
new_second = self.admin_misp_connector.get_event(second, deleted=[0, 1], pythonify=True)
self.assertEqual(len(new_second.attributes), 2)
# Test attribute*S*
attributes = self.admin_misp_connector.attributes()
self.assertEqual(len(attributes), 7)
self.assertEqual(len(attributes), 6)
# attributes = self.user_misp_connector.attributes()
# self.assertEqual(len(attributes), 5)
# Test event*S*
@ -1555,7 +1562,7 @@ class TestComprehensive(unittest.TestCase):
r = self.admin_misp_connector.update_user({'email': 'testusr@user.local'}, self.test_usr)
def test_live_acl(self):
missing_acls = self.admin_misp_connector.remote_acl
missing_acls = self.admin_misp_connector.remote_acl()
self.assertEqual(missing_acls, [], msg=missing_acls)
def test_roles(self):