From 495af1fd9c63598bc33529bfd6af6ac843a30127 Mon Sep 17 00:00:00 2001 From: Jakub Onderka Date: Fri, 30 Oct 2020 20:21:56 +0100 Subject: [PATCH 1/3] new: Method to check event existence --- pymisp/api.py | 17 +++++++++++++++++ tests/testlive_comprehensive.py | 2 ++ 2 files changed, 19 insertions(+) diff --git a/pymisp/api.py b/pymisp/api.py index dce3627..396913e 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -307,6 +307,15 @@ class PyMISP: e.load(event_r) return e + def event_exists(self, event: Union[MISPEvent, int, str, UUID]) -> bool: + """Fast check if event exists. + + :param event: Event to check + """ + event_id = get_uuid_or_id_from_abstract_misp(event) + r = self._prepare_request('HEAD', f'events/view/{event_id}') + return self._check_head_response(r) + def add_event(self, event: MISPEvent, pythonify: bool = False) -> Union[Dict, MISPEvent]: """Add a new event on a MISP instance @@ -2965,6 +2974,14 @@ class PyMISP: return r # Else: an exception was raised anyway + def _check_head_response(self, response: requests.Response) -> bool: + if response.status_code == 200: + return True + elif response.status_code == 404: + return False + else: + raise MISPServerError(f'Error code {response.status_code} for HEAD request') + def _check_response(self, response: requests.Response, lenient_response_type: bool = False, expect_json: bool = False) -> Union[Dict, str]: """Check if the response from the server is not an unexpected error""" if response.status_code >= 500: diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index d95e687..3cb996c 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -713,7 +713,9 @@ class TestComprehensive(unittest.TestCase): second.add_attribute('ip-src', '8.8.8.8') # second has two attributes: text and ip-src try: + self.assertFalse(self.user_misp_connector.event_exists(first)) first = self.user_misp_connector.add_event(first) + self.assertTrue(self.user_misp_connector.event_exists(first)) second = self.user_misp_connector.add_event(second) timeframe = [first.timestamp.timestamp() - 5, first.timestamp.timestamp() + 5] # Search event we just created in multiple ways. Make sure it doesn't catch it when it shouldn't From 5e4dd2b974569032087d0035c8085f3865b3f3e8 Mon Sep 17 00:00:00 2001 From: Jakub Onderka Date: Fri, 30 Oct 2020 20:24:52 +0100 Subject: [PATCH 2/3] new: Allow to get just event metadata after add_event and edit_event --- pymisp/api.py | 11 +++++++---- tests/testlive_comprehensive.py | 13 +++++++++++++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/pymisp/api.py b/pymisp/api.py index 396913e..ae527ab 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -316,13 +316,14 @@ class PyMISP: r = self._prepare_request('HEAD', f'events/view/{event_id}') return self._check_head_response(r) - def add_event(self, event: MISPEvent, pythonify: bool = False) -> Union[Dict, MISPEvent]: + def add_event(self, event: MISPEvent, pythonify: bool = False, metadata: bool = False) -> Union[Dict, MISPEvent]: """Add a new event on a MISP instance :param event: event to add :param pythonify: Returns a PyMISP Object instead of the plain json output + :param metadata: Return just event metadata after successful creating """ - r = self._prepare_request('POST', 'events/add', data=event) + r = self._prepare_request('POST', 'events/add' + '/metadata:1' if metadata else '', data=event) new_event = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in new_event: return new_event @@ -330,18 +331,20 @@ class PyMISP: e.load(new_event) return e - def update_event(self, event: MISPEvent, event_id: Optional[int] = None, pythonify: bool = False) -> Union[Dict, MISPEvent]: + def update_event(self, event: MISPEvent, event_id: Optional[int] = None, pythonify: bool = False, + metadata: bool = False) -> Union[Dict, MISPEvent]: """Update an event on a MISP instance''' :param event: event to update :param event_id: ID of event to update :param pythonify: Returns a PyMISP Object instead of the plain json output + :param metadata: Return just event metadata after successful update """ if event_id is None: eid = get_uuid_or_id_from_abstract_misp(event) else: eid = get_uuid_or_id_from_abstract_misp(event_id) - r = self._prepare_request('POST', f'events/edit/{eid}', data=event) + r = self._prepare_request('POST', f'events/edit/{eid}' + '/metadata:1' if metadata else '', data=event) updated_event = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in updated_event: return updated_event diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 3cb996c..695206a 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -886,6 +886,19 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(first) self.admin_misp_connector.delete_event(second) + def test_event_add_update_metadata(self): + event = self.create_simple_event() + event.add_attribute('ip-src', '9.9.9.9') + try: + response = self.user_misp_connector.add_event(event, metadata=True) + self.assertEqual(len(response.attributes), 0) # response should contains zero attributes + + event.info = "New name" + response = self.user_misp_connector.update_event(event, metadata=True) + self.assertEqual(len(response.attributes), 0) # response should contains zero attributes + finally: # cleanup + self.admin_misp_connector.delete_event(event) + def test_extend_event(self): first = self.create_simple_event() first.info = 'parent event' From 6c1f476bdd869b265774d7dc14738d6b2aa1a00a Mon Sep 17 00:00:00 2001 From: Jakub Onderka Date: Sat, 7 Nov 2020 10:17:16 +0100 Subject: [PATCH 3/3] new: Method to check attribute and object existence --- pymisp/api.py | 18 ++++++++++++++++ tests/testlive_comprehensive.py | 38 +++++++++++++++++++++++++++++++-- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/pymisp/api.py b/pymisp/api.py index ae527ab..314c9ac 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -404,6 +404,15 @@ class PyMISP: o.from_dict(**misp_object_r) return o + def object_exists(self, misp_object: Union[MISPObject, int, str, UUID]) -> bool: + """Fast check if object exists. + + :param misp_object: Attribute to check + """ + object_id = get_uuid_or_id_from_abstract_misp(misp_object) + r = self._prepare_request('HEAD', f'objects/view/{object_id}') + return self._check_head_response(r) + def add_object(self, event: Union[MISPEvent, int, str, UUID], misp_object: MISPObject, pythonify: bool = False) -> Union[Dict, MISPObject]: """Add a MISP Object to an existing MISP event @@ -544,6 +553,15 @@ class PyMISP: a.from_dict(**attribute_r) return a + def attribute_exists(self, attribute: Union[MISPAttribute, int, str, UUID]) -> bool: + """Fast check if attribute exists. + + :param attribute: Attribute to check + """ + attribute_id = get_uuid_or_id_from_abstract_misp(attribute) + r = self._prepare_request('HEAD', f'attributes/view/{attribute_id}') + return self._check_head_response(r) + def add_attribute(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool = False) -> Union[Dict, MISPAttribute, MISPShadowAttribute]: """Add an attribute to an existing MISP event diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 695206a..8318b37 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -686,6 +686,42 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) + def test_exists(self): + """Check event, attribute and object existence""" + event = self.create_simple_event() + misp_object = MISPObject('domain-ip') + attribute = misp_object.add_attribute('domain', value='google.fr') + misp_object.add_attribute('ip', value='8.8.8.8') + event.add_object(misp_object) + + # Event, attribute and object should not exists before event deletion + self.assertFalse(self.user_misp_connector.event_exists(event)) + self.assertFalse(self.user_misp_connector.attribute_exists(attribute)) + self.assertFalse(self.user_misp_connector.object_exists(misp_object)) + + try: + self.user_misp_connector.add_event(event) + + self.assertTrue(self.user_misp_connector.event_exists(event)) + self.assertTrue(self.user_misp_connector.event_exists(event.uuid)) + self.assertTrue(self.user_misp_connector.event_exists(event.id)) + self.assertTrue(self.user_misp_connector.attribute_exists(attribute)) + self.assertTrue(self.user_misp_connector.attribute_exists(attribute.uuid)) + self.assertTrue(self.user_misp_connector.attribute_exists(attribute.id)) + self.assertTrue(self.user_misp_connector.object_exists(misp_object)) + self.assertTrue(self.user_misp_connector.object_exists(misp_object.id)) + self.assertTrue(self.user_misp_connector.object_exists(misp_object.uuid)) + finally: + self.admin_misp_connector.delete_event(event) + + # Event, attribute and object should not exists after event deletion + self.assertFalse(self.user_misp_connector.event_exists(event)) + self.assertFalse(self.user_misp_connector.event_exists(event.id)) + self.assertFalse(self.user_misp_connector.attribute_exists(attribute)) + self.assertFalse(self.user_misp_connector.attribute_exists(attribute.id)) + self.assertFalse(self.user_misp_connector.object_exists(misp_object)) + self.assertFalse(self.user_misp_connector.object_exists(misp_object.id)) + def test_simple_event(self): '''Search a bunch of parameters: * Value not existing @@ -713,9 +749,7 @@ class TestComprehensive(unittest.TestCase): second.add_attribute('ip-src', '8.8.8.8') # second has two attributes: text and ip-src try: - self.assertFalse(self.user_misp_connector.event_exists(first)) first = self.user_misp_connector.add_event(first) - self.assertTrue(self.user_misp_connector.event_exists(first)) second = self.user_misp_connector.add_event(second) timeframe = [first.timestamp.timestamp() - 5, first.timestamp.timestamp() + 5] # Search event we just created in multiple ways. Make sure it doesn't catch it when it shouldn't