mirror of https://github.com/MISP/PyMISP
Merge pull request #958 from righel/attrs-break-on-dup
[new]: support breakOnDuplicate option for attributes:add()pull/970/head
commit
a9885d5e71
|
@ -588,7 +588,7 @@ class PyMISP:
|
|||
:param break_on_duplicate: if True, check and reject if this object's attributes match an existing object's attributes; may require much time
|
||||
"""
|
||||
event_id = get_uuid_or_id_from_abstract_misp(event)
|
||||
params = {'breakOnDuplicate': True} if break_on_duplicate else {}
|
||||
params = {'breakOnDuplicate': 1} if break_on_duplicate else {}
|
||||
r = self._prepare_request('POST', f'objects/add/{event_id}', data=misp_object, kw_params=params)
|
||||
new_object = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in new_object:
|
||||
|
@ -741,7 +741,7 @@ class PyMISP:
|
|||
r = self._prepare_request('HEAD', f'attributes/view/{attribute_id}')
|
||||
return self._check_head_response(r)
|
||||
|
||||
def add_attribute(self, event: Union[MISPEvent, int, str, UUID], attribute: Union[MISPAttribute, Iterable], pythonify: bool = False) -> Union[Dict, MISPAttribute, MISPShadowAttribute]:
|
||||
def add_attribute(self, event: Union[MISPEvent, int, str, UUID], attribute: Union[MISPAttribute, Iterable], pythonify: bool = False, break_on_duplicate: bool = True) -> Union[Dict, MISPAttribute, MISPShadowAttribute]:
|
||||
"""Add an attribute to an existing MISP event: https://www.misp-project.org/openapi/#tag/Attributes/operation/addAttribute
|
||||
|
||||
:param event: event to extend
|
||||
|
@ -749,9 +749,11 @@ class PyMISP:
|
|||
If a list is passed, the pythonified response is a dict with the following structure:
|
||||
{'attributes': [MISPAttribute], 'errors': {errors by attributes}}
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
:param break_on_duplicate: if False, do not fail if the attribute already exists, updates existing attribute instead (timestamp will be always updated)
|
||||
"""
|
||||
params = {'breakOnDuplicate': 0} if break_on_duplicate is not True else {}
|
||||
event_id = get_uuid_or_id_from_abstract_misp(event)
|
||||
r = self._prepare_request('POST', f'attributes/add/{event_id}', data=attribute)
|
||||
r = self._prepare_request('POST', f'attributes/add/{event_id}', data=attribute, kw_params=params)
|
||||
new_attribute = self._check_json_response(r)
|
||||
if isinstance(attribute, list):
|
||||
# Multiple attributes were passed at once, the handling is totally different
|
||||
|
|
|
@ -1901,6 +1901,15 @@ class TestComprehensive(unittest.TestCase):
|
|||
similar_error = self.user_misp_connector.add_attribute(first, new_similar)
|
||||
self.assertEqual(similar_error['errors'][1]['errors']['value'][0], 'A similar attribute already exists for this event.')
|
||||
|
||||
# Test add attribute break_on_duplicate=False
|
||||
time.sleep(5)
|
||||
new_similar = MISPAttribute()
|
||||
new_similar.value = '1.2.3.4'
|
||||
new_similar.type = 'ip-dst'
|
||||
new_similar = self.user_misp_connector.add_attribute(first, new_similar, break_on_duplicate=False)
|
||||
self.assertTrue(isinstance(new_similar, MISPAttribute), new_similar)
|
||||
self.assertGreater(new_similar.timestamp, new_attribute.timestamp)
|
||||
|
||||
# Test add multiple attributes at once
|
||||
attr0 = MISPAttribute()
|
||||
attr0.value = '0.0.0.0'
|
||||
|
|
Loading…
Reference in New Issue