mirror of https://github.com/MISP/PyMISP
new: Method to check event existence
parent
7e84c36406
commit
495af1fd9c
|
@ -307,6 +307,15 @@ class PyMISP:
|
|||
e.load(event_r)
|
||||
return e
|
||||
|
||||
def event_exists(self, event: Union[MISPEvent, int, str, UUID]) -> bool:
|
||||
"""Fast check if event exists.
|
||||
|
||||
:param event: Event to check
|
||||
"""
|
||||
event_id = get_uuid_or_id_from_abstract_misp(event)
|
||||
r = self._prepare_request('HEAD', f'events/view/{event_id}')
|
||||
return self._check_head_response(r)
|
||||
|
||||
def add_event(self, event: MISPEvent, pythonify: bool = False) -> Union[Dict, MISPEvent]:
|
||||
"""Add a new event on a MISP instance
|
||||
|
||||
|
@ -2965,6 +2974,14 @@ class PyMISP:
|
|||
return r
|
||||
# Else: an exception was raised anyway
|
||||
|
||||
def _check_head_response(self, response: requests.Response) -> bool:
|
||||
if response.status_code == 200:
|
||||
return True
|
||||
elif response.status_code == 404:
|
||||
return False
|
||||
else:
|
||||
raise MISPServerError(f'Error code {response.status_code} for HEAD request')
|
||||
|
||||
def _check_response(self, response: requests.Response, lenient_response_type: bool = False, expect_json: bool = False) -> Union[Dict, str]:
|
||||
"""Check if the response from the server is not an unexpected error"""
|
||||
if response.status_code >= 500:
|
||||
|
|
|
@ -713,7 +713,9 @@ class TestComprehensive(unittest.TestCase):
|
|||
second.add_attribute('ip-src', '8.8.8.8')
|
||||
# second has two attributes: text and ip-src
|
||||
try:
|
||||
self.assertFalse(self.user_misp_connector.event_exists(first))
|
||||
first = self.user_misp_connector.add_event(first)
|
||||
self.assertTrue(self.user_misp_connector.event_exists(first))
|
||||
second = self.user_misp_connector.add_event(second)
|
||||
timeframe = [first.timestamp.timestamp() - 5, first.timestamp.timestamp() + 5]
|
||||
# Search event we just created in multiple ways. Make sure it doesn't catch it when it shouldn't
|
||||
|
|
Loading…
Reference in New Issue