mirror of https://github.com/MISP/PyMISP
commit
b646f0c5da
|
@ -308,13 +308,23 @@ class PyMISP:
|
|||
e.load(event_r)
|
||||
return e
|
||||
|
||||
def add_event(self, event: MISPEvent, pythonify: bool = False) -> Union[Dict, MISPEvent]:
|
||||
def event_exists(self, event: Union[MISPEvent, int, str, UUID]) -> bool:
|
||||
"""Fast check if event exists.
|
||||
|
||||
:param event: Event to check
|
||||
"""
|
||||
event_id = get_uuid_or_id_from_abstract_misp(event)
|
||||
r = self._prepare_request('HEAD', f'events/view/{event_id}')
|
||||
return self._check_head_response(r)
|
||||
|
||||
def add_event(self, event: MISPEvent, pythonify: bool = False, metadata: bool = False) -> Union[Dict, MISPEvent]:
|
||||
"""Add a new event on a MISP instance
|
||||
|
||||
:param event: event to add
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
:param metadata: Return just event metadata after successful creating
|
||||
"""
|
||||
r = self._prepare_request('POST', 'events/add', data=event)
|
||||
r = self._prepare_request('POST', 'events/add' + '/metadata:1' if metadata else '', data=event)
|
||||
new_event = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in new_event:
|
||||
return new_event
|
||||
|
@ -322,18 +332,20 @@ class PyMISP:
|
|||
e.load(new_event)
|
||||
return e
|
||||
|
||||
def update_event(self, event: MISPEvent, event_id: Optional[int] = None, pythonify: bool = False) -> Union[Dict, MISPEvent]:
|
||||
def update_event(self, event: MISPEvent, event_id: Optional[int] = None, pythonify: bool = False,
|
||||
metadata: bool = False) -> Union[Dict, MISPEvent]:
|
||||
"""Update an event on a MISP instance'''
|
||||
|
||||
:param event: event to update
|
||||
:param event_id: ID of event to update
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
:param metadata: Return just event metadata after successful update
|
||||
"""
|
||||
if event_id is None:
|
||||
eid = get_uuid_or_id_from_abstract_misp(event)
|
||||
else:
|
||||
eid = get_uuid_or_id_from_abstract_misp(event_id)
|
||||
r = self._prepare_request('POST', f'events/edit/{eid}', data=event)
|
||||
r = self._prepare_request('POST', f'events/edit/{eid}' + '/metadata:1' if metadata else '', data=event)
|
||||
updated_event = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in updated_event:
|
||||
return updated_event
|
||||
|
@ -393,6 +405,15 @@ class PyMISP:
|
|||
o.from_dict(**misp_object_r)
|
||||
return o
|
||||
|
||||
def object_exists(self, misp_object: Union[MISPObject, int, str, UUID]) -> bool:
|
||||
"""Fast check if object exists.
|
||||
|
||||
:param misp_object: Attribute to check
|
||||
"""
|
||||
object_id = get_uuid_or_id_from_abstract_misp(misp_object)
|
||||
r = self._prepare_request('HEAD', f'objects/view/{object_id}')
|
||||
return self._check_head_response(r)
|
||||
|
||||
def add_object(self, event: Union[MISPEvent, int, str, UUID], misp_object: MISPObject, pythonify: bool = False) -> Union[Dict, MISPObject]:
|
||||
"""Add a MISP Object to an existing MISP event
|
||||
|
||||
|
@ -533,6 +554,15 @@ class PyMISP:
|
|||
a.from_dict(**attribute_r)
|
||||
return a
|
||||
|
||||
def attribute_exists(self, attribute: Union[MISPAttribute, int, str, UUID]) -> bool:
|
||||
"""Fast check if attribute exists.
|
||||
|
||||
:param attribute: Attribute to check
|
||||
"""
|
||||
attribute_id = get_uuid_or_id_from_abstract_misp(attribute)
|
||||
r = self._prepare_request('HEAD', f'attributes/view/{attribute_id}')
|
||||
return self._check_head_response(r)
|
||||
|
||||
def add_attribute(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool = False) -> Union[Dict, MISPAttribute, MISPShadowAttribute]:
|
||||
"""Add an attribute to an existing MISP event
|
||||
|
||||
|
@ -2971,6 +3001,14 @@ class PyMISP:
|
|||
return r
|
||||
# Else: an exception was raised anyway
|
||||
|
||||
def _check_head_response(self, response: requests.Response) -> bool:
|
||||
if response.status_code == 200:
|
||||
return True
|
||||
elif response.status_code == 404:
|
||||
return False
|
||||
else:
|
||||
raise MISPServerError(f'Error code {response.status_code} for HEAD request')
|
||||
|
||||
def _check_response(self, response: requests.Response, lenient_response_type: bool = False, expect_json: bool = False) -> Union[Dict, str]:
|
||||
"""Check if the response from the server is not an unexpected error"""
|
||||
if response.status_code >= 500:
|
||||
|
|
|
@ -686,6 +686,42 @@ class TestComprehensive(unittest.TestCase):
|
|||
# Delete event
|
||||
self.admin_misp_connector.delete_event(first)
|
||||
|
||||
def test_exists(self):
|
||||
"""Check event, attribute and object existence"""
|
||||
event = self.create_simple_event()
|
||||
misp_object = MISPObject('domain-ip')
|
||||
attribute = misp_object.add_attribute('domain', value='google.fr')
|
||||
misp_object.add_attribute('ip', value='8.8.8.8')
|
||||
event.add_object(misp_object)
|
||||
|
||||
# Event, attribute and object should not exists before event deletion
|
||||
self.assertFalse(self.user_misp_connector.event_exists(event))
|
||||
self.assertFalse(self.user_misp_connector.attribute_exists(attribute))
|
||||
self.assertFalse(self.user_misp_connector.object_exists(misp_object))
|
||||
|
||||
try:
|
||||
self.user_misp_connector.add_event(event)
|
||||
|
||||
self.assertTrue(self.user_misp_connector.event_exists(event))
|
||||
self.assertTrue(self.user_misp_connector.event_exists(event.uuid))
|
||||
self.assertTrue(self.user_misp_connector.event_exists(event.id))
|
||||
self.assertTrue(self.user_misp_connector.attribute_exists(attribute))
|
||||
self.assertTrue(self.user_misp_connector.attribute_exists(attribute.uuid))
|
||||
self.assertTrue(self.user_misp_connector.attribute_exists(attribute.id))
|
||||
self.assertTrue(self.user_misp_connector.object_exists(misp_object))
|
||||
self.assertTrue(self.user_misp_connector.object_exists(misp_object.id))
|
||||
self.assertTrue(self.user_misp_connector.object_exists(misp_object.uuid))
|
||||
finally:
|
||||
self.admin_misp_connector.delete_event(event)
|
||||
|
||||
# Event, attribute and object should not exists after event deletion
|
||||
self.assertFalse(self.user_misp_connector.event_exists(event))
|
||||
self.assertFalse(self.user_misp_connector.event_exists(event.id))
|
||||
self.assertFalse(self.user_misp_connector.attribute_exists(attribute))
|
||||
self.assertFalse(self.user_misp_connector.attribute_exists(attribute.id))
|
||||
self.assertFalse(self.user_misp_connector.object_exists(misp_object))
|
||||
self.assertFalse(self.user_misp_connector.object_exists(misp_object.id))
|
||||
|
||||
def test_simple_event(self):
|
||||
'''Search a bunch of parameters:
|
||||
* Value not existing
|
||||
|
@ -884,6 +920,19 @@ class TestComprehensive(unittest.TestCase):
|
|||
self.admin_misp_connector.delete_event(first)
|
||||
self.admin_misp_connector.delete_event(second)
|
||||
|
||||
def test_event_add_update_metadata(self):
|
||||
event = self.create_simple_event()
|
||||
event.add_attribute('ip-src', '9.9.9.9')
|
||||
try:
|
||||
response = self.user_misp_connector.add_event(event, metadata=True)
|
||||
self.assertEqual(len(response.attributes), 0) # response should contains zero attributes
|
||||
|
||||
event.info = "New name"
|
||||
response = self.user_misp_connector.update_event(event, metadata=True)
|
||||
self.assertEqual(len(response.attributes), 0) # response should contains zero attributes
|
||||
finally: # cleanup
|
||||
self.admin_misp_connector.delete_event(event)
|
||||
|
||||
def test_extend_event(self):
|
||||
first = self.create_simple_event()
|
||||
first.info = 'parent event'
|
||||
|
|
Loading…
Reference in New Issue