new: Fail if a duplicate object is added to an event.

pull/694/head
Raphaël Vinot 2021-01-18 09:37:22 +01:00
parent 3d372859e1
commit e916b332f8
2 changed files with 11 additions and 2 deletions

View File

@ -509,15 +509,17 @@ class PyMISP:
r = self._prepare_request('HEAD', f'objects/view/{object_id}') r = self._prepare_request('HEAD', f'objects/view/{object_id}')
return self._check_head_response(r) return self._check_head_response(r)
def add_object(self, event: Union[MISPEvent, int, str, UUID], misp_object: MISPObject, pythonify: bool = False) -> Union[Dict, MISPObject]: def add_object(self, event: Union[MISPEvent, int, str, UUID], misp_object: MISPObject, pythonify: bool = False, break_on_duplicate: bool = False) -> Union[Dict, MISPObject]:
"""Add a MISP Object to an existing MISP event """Add a MISP Object to an existing MISP event
:param event: event to extend :param event: event to extend
:param misp_object: object to add :param misp_object: object to add
:param pythonify: Returns a PyMISP Object instead of the plain json output :param pythonify: Returns a PyMISP Object instead of the plain json output
:param break_on_duplicate: if True, check and reject if this object's attributes match an existing object's attributes; may require much time
""" """
event_id = get_uuid_or_id_from_abstract_misp(event) event_id = get_uuid_or_id_from_abstract_misp(event)
r = self._prepare_request('POST', f'objects/add/{event_id}', data=misp_object) params = {'breakOnDuplicate': True} if break_on_duplicate else {}
r = self._prepare_request('POST', f'objects/add/{event_id}', data=misp_object, kw_params=params)
new_object = self._check_json_response(r) new_object = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in new_object: if not (self.global_pythonify or pythonify) or 'errors' in new_object:
return new_object return new_object

View File

@ -1444,6 +1444,13 @@ class TestComprehensive(unittest.TestCase):
obj_attrs = r.get_attributes_by_relation('ssdeep') obj_attrs = r.get_attributes_by_relation('ssdeep')
self.assertEqual(len(obj_attrs), 1, obj_attrs) self.assertEqual(len(obj_attrs), 1, obj_attrs)
self.assertEqual(r.name, 'file', r) self.assertEqual(r.name, 'file', r)
# Test break_on_duplicate at object level
fo_dup, peo_dup, _ = make_binary_objects('tests/viper-test-files/test_files/whoami.exe')
r = self.user_misp_connector.add_object(first, peo_dup, break_on_duplicate=True)
self.assertTrue("Duplicate object found" in r['errors'][1]['errors'], r)
# Test refs
r = self.user_misp_connector.add_object_reference(fo.ObjectReference[0]) r = self.user_misp_connector.add_object_reference(fo.ObjectReference[0])
self.assertEqual(r.object_uuid, fo.uuid, r.to_json()) self.assertEqual(r.object_uuid, fo.uuid, r.to_json())
self.assertEqual(r.referenced_uuid, peo.uuid, r.to_json()) self.assertEqual(r.referenced_uuid, peo.uuid, r.to_json())