From e916b332f8644fbcdbb5d822b5235067be15a042 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Mon, 18 Jan 2021 09:37:22 +0100 Subject: [PATCH] new: Fail if a duplicate object is added to an event. --- pymisp/api.py | 6 ++++-- tests/testlive_comprehensive.py | 7 +++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/pymisp/api.py b/pymisp/api.py index a386e0d..3c31cce 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -509,15 +509,17 @@ class PyMISP: r = self._prepare_request('HEAD', f'objects/view/{object_id}') return self._check_head_response(r) - def add_object(self, event: Union[MISPEvent, int, str, UUID], misp_object: MISPObject, pythonify: bool = False) -> Union[Dict, MISPObject]: + def add_object(self, event: Union[MISPEvent, int, str, UUID], misp_object: MISPObject, pythonify: bool = False, break_on_duplicate: bool = False) -> Union[Dict, MISPObject]: """Add a MISP Object to an existing MISP event :param event: event to extend :param misp_object: object to add :param pythonify: Returns a PyMISP Object instead of the plain json output + :param break_on_duplicate: if True, check and reject if this object's attributes match an existing object's attributes; may require much time """ event_id = get_uuid_or_id_from_abstract_misp(event) - r = self._prepare_request('POST', f'objects/add/{event_id}', data=misp_object) + params = {'breakOnDuplicate': True} if break_on_duplicate else {} + r = self._prepare_request('POST', f'objects/add/{event_id}', data=misp_object, kw_params=params) new_object = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in new_object: return new_object diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 6c8f153..95dbaa5 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -1444,6 +1444,13 @@ class TestComprehensive(unittest.TestCase): obj_attrs = r.get_attributes_by_relation('ssdeep') self.assertEqual(len(obj_attrs), 1, obj_attrs) self.assertEqual(r.name, 'file', r) + + # Test break_on_duplicate at object level + fo_dup, peo_dup, _ = make_binary_objects('tests/viper-test-files/test_files/whoami.exe') + r = self.user_misp_connector.add_object(first, peo_dup, break_on_duplicate=True) + self.assertTrue("Duplicate object found" in r['errors'][1]['errors'], r) + + # Test refs r = self.user_misp_connector.add_object_reference(fo.ObjectReference[0]) self.assertEqual(r.object_uuid, fo.uuid, r.to_json()) self.assertEqual(r.referenced_uuid, peo.uuid, r.to_json())