From 7324ae72e60df8a6f738117a50f5d15949efa871 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Fri, 13 Sep 2019 16:12:40 +0200 Subject: [PATCH] new: Better handling of delete(d) attributes * Hard delete on attribute * Get the deleted attributes within an event --- pymisp/api.py | 2 +- pymisp/aping.py | 19 ++++++++++++++----- tests/testlive_comprehensive.py | 11 +++++++++-- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/pymisp/api.py b/pymisp/api.py index eda9e50..b15424b 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -446,7 +446,7 @@ class PyMISP(object): # pragma: no cover url = urljoin(self.root_url, 'attributes/delete/{}/1'.format(attribute_id)) else: url = urljoin(self.root_url, 'attributes/delete/{}'.format(attribute_id)) - response = self._prepare_request('GET', url) + response = self._prepare_request('POST', url) return self._check_response(response) @deprecated(reason="Use ExpandedPyMISP.push_event_to_ZMQ", action='default') diff --git a/pymisp/aping.py b/pymisp/aping.py index 1e81e6f..f810525 100644 --- a/pymisp/aping.py +++ b/pymisp/aping.py @@ -154,7 +154,9 @@ class ExpandedPyMISP(PyMISP): def update_misp(self): response = self._prepare_request('POST', '/servers/update') - return self._check_response(response, lenient_response_type=True) + if self._old_misp((2, 4, 116), '2020-01-01', sys._getframe().f_code.co_name): + return self._check_response(response, lenient_response_type=True) + return self._check_response(response, expect_json=True) def set_server_setting(self, setting: str, value: Union[str, int, bool], force: bool=False): data = {'value': value, 'force': force} @@ -190,10 +192,14 @@ class ExpandedPyMISP(PyMISP): to_return.append(e) return to_return - def get_event(self, event: Union[MISPEvent, int, str, UUID], pythonify: bool=False): + def get_event(self, event: Union[MISPEvent, int, str, UUID], deleted: [bool, int, list]=False, pythonify: bool=False): '''Get an event from a MISP instance''' event_id = self.__get_uuid_or_id_from_abstract_misp(event) - event = self._prepare_request('GET', f'events/{event_id}') + if deleted: + data = {'deleted': deleted} + event = self._prepare_request('POST', f'events/view/{event_id}', data=data) + else: + event = self._prepare_request('GET', f'events/view/{event_id}') event = self._check_response(event, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in event: return event @@ -423,10 +429,13 @@ class ExpandedPyMISP(PyMISP): a.from_dict(**updated_attribute) return a - def delete_attribute(self, attribute: Union[MISPAttribute, int, str, UUID]): + def delete_attribute(self, attribute: Union[MISPAttribute, int, str, UUID], hard: bool=False): '''Delete an attribute from a MISP instance''' attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute) - response = self._prepare_request('POST', f'attributes/delete/{attribute_id}') + data = {} + if hard: + data['hard'] = 1 + response = self._prepare_request('POST', f'attributes/delete/{attribute_id}', data=data) response = self._check_response(response, expect_json=True) if ('errors' in response and response['errors'][0] == 403 and response['errors'][1]['message'] == 'You do not have permission to do that.'): diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 3131af2..628de3f 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -1503,16 +1503,23 @@ class TestComprehensive(unittest.TestCase): attribute = self.user_misp_connector.update_attribute({'comment': 'blah'}, second.attributes[0].id) self.assertTrue(isinstance(attribute, MISPShadowAttribute), attribute) self.assertEqual(attribute.value, second.attributes[0].value) + second = self.admin_misp_connector.get_event(second, pythonify=True) + self.assertEqual(len(second.attributes), 3) # Delete attribute owned by someone else response = self.user_misp_connector.delete_attribute(second.attributes[1]) self.assertTrue(response['success']) # Delete attribute owned by user response = self.admin_misp_connector.delete_attribute(second.attributes[1]) self.assertEqual(response['message'], 'Attribute deleted.') + # Hard delete + response = self.admin_misp_connector.delete_attribute(second.attributes[0], hard=True) + self.assertEqual(response['message'], 'Attribute deleted.') + new_second = self.admin_misp_connector.get_event(second, deleted=[0, 1], pythonify=True) + self.assertEqual(len(new_second.attributes), 2) # Test attribute*S* attributes = self.admin_misp_connector.attributes() - self.assertEqual(len(attributes), 7) + self.assertEqual(len(attributes), 6) # attributes = self.user_misp_connector.attributes() # self.assertEqual(len(attributes), 5) # Test event*S* @@ -1555,7 +1562,7 @@ class TestComprehensive(unittest.TestCase): r = self.admin_misp_connector.update_user({'email': 'testusr@user.local'}, self.test_usr) def test_live_acl(self): - missing_acls = self.admin_misp_connector.remote_acl + missing_acls = self.admin_misp_connector.remote_acl() self.assertEqual(missing_acls, [], msg=missing_acls) def test_roles(self):