mirror of https://github.com/MISP/PyMISP
new: Fail if a duplicate object is added to an event.
parent
a802ddd6ed
commit
c3d6c3cc73
|
@ -417,15 +417,17 @@ class PyMISP:
|
||||||
r = self._prepare_request('HEAD', f'objects/view/{object_id}')
|
r = self._prepare_request('HEAD', f'objects/view/{object_id}')
|
||||||
return self._check_head_response(r)
|
return self._check_head_response(r)
|
||||||
|
|
||||||
def add_object(self, event: Union[MISPEvent, int, str, UUID], misp_object: MISPObject, pythonify: bool = False) -> Union[Dict, MISPObject]:
|
def add_object(self, event: Union[MISPEvent, int, str, UUID], misp_object: MISPObject, pythonify: bool = False, break_on_duplicate: bool = False) -> Union[Dict, MISPObject]:
|
||||||
"""Add a MISP Object to an existing MISP event
|
"""Add a MISP Object to an existing MISP event
|
||||||
|
|
||||||
:param event: event to extend
|
:param event: event to extend
|
||||||
:param misp_object: object to add
|
:param misp_object: object to add
|
||||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||||
|
:param break_on_duplicate: if True, check and reject if this object's attributes match an existing object's attributes; may require much time
|
||||||
"""
|
"""
|
||||||
event_id = get_uuid_or_id_from_abstract_misp(event)
|
event_id = get_uuid_or_id_from_abstract_misp(event)
|
||||||
r = self._prepare_request('POST', f'objects/add/{event_id}', data=misp_object)
|
params = {'breakOnDuplicate': True} if break_on_duplicate else {}
|
||||||
|
r = self._prepare_request('POST', f'objects/add/{event_id}', data=misp_object, kw_params=params)
|
||||||
new_object = self._check_json_response(r)
|
new_object = self._check_json_response(r)
|
||||||
if not (self.global_pythonify or pythonify) or 'errors' in new_object:
|
if not (self.global_pythonify or pythonify) or 'errors' in new_object:
|
||||||
return new_object
|
return new_object
|
||||||
|
|
|
@ -1444,6 +1444,13 @@ class TestComprehensive(unittest.TestCase):
|
||||||
obj_attrs = r.get_attributes_by_relation('ssdeep')
|
obj_attrs = r.get_attributes_by_relation('ssdeep')
|
||||||
self.assertEqual(len(obj_attrs), 1, obj_attrs)
|
self.assertEqual(len(obj_attrs), 1, obj_attrs)
|
||||||
self.assertEqual(r.name, 'file', r)
|
self.assertEqual(r.name, 'file', r)
|
||||||
|
|
||||||
|
# Test break_on_duplicate at object level
|
||||||
|
fo_dup, peo_dup, _ = make_binary_objects('tests/viper-test-files/test_files/whoami.exe')
|
||||||
|
r = self.user_misp_connector.add_object(first, peo_dup, break_on_duplicate=True)
|
||||||
|
self.assertTrue("Duplicate object found" in r['errors'][1]['errors'], r)
|
||||||
|
|
||||||
|
# Test refs
|
||||||
r = self.user_misp_connector.add_object_reference(fo.ObjectReference[0])
|
r = self.user_misp_connector.add_object_reference(fo.ObjectReference[0])
|
||||||
self.assertEqual(r.object_uuid, fo.uuid, r.to_json())
|
self.assertEqual(r.object_uuid, fo.uuid, r.to_json())
|
||||||
self.assertEqual(r.referenced_uuid, peo.uuid, r.to_json())
|
self.assertEqual(r.referenced_uuid, peo.uuid, r.to_json())
|
||||||
|
|
Loading…
Reference in New Issue