diff --git a/pymisp/api.py b/pymisp/api.py index 41ed157..038840a 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -550,7 +550,7 @@ class PyMISP: o.from_dict(**new_object) return o - def update_object(self, misp_object: MISPObject, object_id: Optional[int] = None, pythonify: bool = False) -> Union[Dict, MISPObject]: + def update_object(self, misp_object: MISPObject, object_id: Optional[int] = None, pythonify: bool = False, break_on_duplicate: bool = False) -> Union[Dict, MISPObject]: """Update an object on a MISP instance :param misp_object: object to update @@ -561,7 +561,8 @@ class PyMISP: oid = get_uuid_or_id_from_abstract_misp(misp_object) else: oid = get_uuid_or_id_from_abstract_misp(object_id) - r = self._prepare_request('POST', f'objects/edit/{oid}', data=misp_object) + params = {'breakOnDuplicate': True} if break_on_duplicate else {} + r = self._prepare_request('POST', f'objects/edit/{oid}', data=misp_object, kw_params=params) updated_object = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in updated_object: return updated_object diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 926d7cc..a2c684d 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -1480,16 +1480,36 @@ class TestComprehensive(unittest.TestCase): self.assertEqual(len(obj_attrs), 1, obj_attrs) self.assertEqual(r.name, 'file', r) - # Test break_on_duplicate at object level + # Test break_on_duplicate at object level for add_object fo_dup, peo_dup, _ = make_binary_objects('tests/viper-test-files/test_files/whoami.exe') r = self.user_misp_connector.add_object(first, peo_dup, break_on_duplicate=True) self.assertTrue("Duplicate object found" in r['errors'][1]['errors'], r) - # Test break on duplicate with breakOnDuplicate key in object + # Test break on duplicate with breakOnDuplicate key in object for add_object fo_dup.breakOnDuplicate = True r = self.user_misp_connector.add_object(first, fo_dup) self.assertTrue("Duplicate object found" in r['errors'][1]['errors'], r) + # Test break_on_duplicate at object level for update_object + obj1 = MISPObject('domain-ip') + obj1.add_attribute('ip', value="8.8.8.8") + obj1.add_attribute('domain', value="google.lu") + obj2 = MISPObject('domain-ip') + obj2.add_attribute('ip', value="8.8.8.8") + obj2.add_attribute('domain', value="google.fr") + o1 = self.user_misp_connector.add_object(first, obj1, pythonify=True) + self.assertEqual(o1.attributes[1].value, 'google.lu') + o2 = self.user_misp_connector.add_object(first, obj2, pythonify=True) + self.assertEqual(o2.attributes[1].value, 'google.fr') + obj2.attributes[1].value = 'google.lu' + r = self.user_misp_connector.update_object(obj2, o2.uuid, break_on_duplicate=True) + self.assertTrue("Duplicate object found" in r['errors'][1]['errors'], r) + + # Test break on duplicate with breakOnDuplicate key in object for update_object + obj2.breakOnDuplicate = True + r = self.user_misp_connector.update_object(obj2, o2.uuid) + self.assertTrue("Duplicate object found" in r['errors'][1]['errors'], r) + # Test refs r = self.user_misp_connector.add_object_reference(fo.ObjectReference[0]) self.assertEqual(r.object_uuid, fo.uuid, r.to_json())