From 495af1fd9c63598bc33529bfd6af6ac843a30127 Mon Sep 17 00:00:00 2001 From: Jakub Onderka Date: Fri, 30 Oct 2020 20:21:56 +0100 Subject: [PATCH] new: Method to check event existence --- pymisp/api.py | 17 +++++++++++++++++ tests/testlive_comprehensive.py | 2 ++ 2 files changed, 19 insertions(+) diff --git a/pymisp/api.py b/pymisp/api.py index dce3627..396913e 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -307,6 +307,15 @@ class PyMISP: e.load(event_r) return e + def event_exists(self, event: Union[MISPEvent, int, str, UUID]) -> bool: + """Fast check if event exists. + + :param event: Event to check + """ + event_id = get_uuid_or_id_from_abstract_misp(event) + r = self._prepare_request('HEAD', f'events/view/{event_id}') + return self._check_head_response(r) + def add_event(self, event: MISPEvent, pythonify: bool = False) -> Union[Dict, MISPEvent]: """Add a new event on a MISP instance @@ -2965,6 +2974,14 @@ class PyMISP: return r # Else: an exception was raised anyway + def _check_head_response(self, response: requests.Response) -> bool: + if response.status_code == 200: + return True + elif response.status_code == 404: + return False + else: + raise MISPServerError(f'Error code {response.status_code} for HEAD request') + def _check_response(self, response: requests.Response, lenient_response_type: bool = False, expect_json: bool = False) -> Union[Dict, str]: """Check if the response from the server is not an unexpected error""" if response.status_code >= 500: diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index d95e687..3cb996c 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -713,7 +713,9 @@ class TestComprehensive(unittest.TestCase): second.add_attribute('ip-src', '8.8.8.8') # second has two attributes: text and ip-src try: + self.assertFalse(self.user_misp_connector.event_exists(first)) first = self.user_misp_connector.add_event(first) + self.assertTrue(self.user_misp_connector.event_exists(first)) second = self.user_misp_connector.add_event(second) timeframe = [first.timestamp.timestamp() - 5, first.timestamp.timestamp() + 5] # Search event we just created in multiple ways. Make sure it doesn't catch it when it shouldn't