mirror of https://github.com/MISP/PyMISP
fix: Exception when posting multiple attributes on attributes/add
Fix #433 Few cleanups in code.pull/434/head
parent
549e3a5a84
commit
e993886dd7
|
@ -347,20 +347,38 @@ class ExpandedPyMISP(PyMISP):
|
|||
return a
|
||||
|
||||
def add_attribute(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool=False):
|
||||
'''Add an attribute to an existing MISP event'''
|
||||
'''Add an attribute to an existing MISP event
|
||||
NOTE MISP 2.4.113+: you can pass a list of attributes.
|
||||
In that case, the pythonified response is the following: {'attributes': [MISPAttribute], 'errors': {errors by attributes}}'''
|
||||
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
|
||||
new_attribute = self._prepare_request('POST', f'attributes/add/{event_id}', data=attribute)
|
||||
new_attribute = self._check_response(new_attribute, expect_json=True)
|
||||
if ('errors' in new_attribute and new_attribute['errors'][0] == 403
|
||||
and new_attribute['errors'][1]['message'] == 'You do not have permission to do that.'):
|
||||
# At this point, we assume the user tried to add an attribute on an event they don't own
|
||||
# Re-try with a proposal
|
||||
return self.add_attribute_proposal(event_id, attribute, pythonify)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in new_attribute:
|
||||
return new_attribute
|
||||
a = MISPAttribute()
|
||||
a.from_dict(**new_attribute)
|
||||
return a
|
||||
if isinstance(attribute, list):
|
||||
# Multiple attributes were passed at once, the handling is totally different
|
||||
if self._old_misp((2, 4, 113), '2020-01-01', sys._getframe().f_code.co_name):
|
||||
return new_attribute
|
||||
if not (self.global_pythonify or pythonify):
|
||||
return new_attribute
|
||||
to_return = {'attributes': []}
|
||||
if 'errors' in new_attribute:
|
||||
to_return['errors'] = new_attribute['errors']
|
||||
|
||||
for attribute in new_attribute['Attribute']:
|
||||
a = MISPAttribute()
|
||||
a.from_dict(**attribute)
|
||||
to_return['attributes'].append(a)
|
||||
return to_return
|
||||
else:
|
||||
if ('errors' in new_attribute and new_attribute['errors'][0] == 403
|
||||
and new_attribute['errors'][1]['message'] == 'You do not have permission to do that.'):
|
||||
# At this point, we assume the user tried to add an attribute on an event they don't own
|
||||
# Re-try with a proposal
|
||||
return self.add_attribute_proposal(event_id, attribute, pythonify)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in new_attribute:
|
||||
return new_attribute
|
||||
a = MISPAttribute()
|
||||
a.from_dict(**new_attribute)
|
||||
return a
|
||||
|
||||
def update_attribute(self, attribute: MISPAttribute, attribute_id: int=None, pythonify: bool=False):
|
||||
'''Update an attribute on a MISP instance'''
|
||||
|
@ -1402,9 +1420,11 @@ class ExpandedPyMISP(PyMISP):
|
|||
|
||||
if return_format == 'csv' and (self.global_pythonify or pythonify) and not headerless:
|
||||
return self._csv_to_dict(normalized_response)
|
||||
elif 'errors' in normalized_response:
|
||||
|
||||
if 'errors' in normalized_response:
|
||||
return normalized_response
|
||||
elif return_format == 'json' and self.global_pythonify or pythonify:
|
||||
|
||||
if return_format == 'json' and self.global_pythonify or pythonify:
|
||||
# The response is in json, we can convert it to a list of pythonic MISP objects
|
||||
to_return = []
|
||||
if controller == 'events':
|
||||
|
@ -1443,8 +1463,8 @@ class ExpandedPyMISP(PyMISP):
|
|||
elif controller == 'objects':
|
||||
raise PyMISPNotImplementedYet('Not implemented yet')
|
||||
return to_return
|
||||
else:
|
||||
return normalized_response
|
||||
|
||||
return normalized_response
|
||||
|
||||
def search_index(self, published: Optional[bool]=None, eventid: Optional[SearchType]=None,
|
||||
tags: Optional[SearchParameterTypes]=None,
|
||||
|
@ -1559,7 +1579,8 @@ class ExpandedPyMISP(PyMISP):
|
|||
normalized_response = self._check_response(response, expect_json=True)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in normalized_response:
|
||||
return normalized_response
|
||||
elif self.global_pythonify or pythonify:
|
||||
|
||||
if self.global_pythonify or pythonify:
|
||||
to_return = []
|
||||
for s in normalized_response:
|
||||
entries = {}
|
||||
|
@ -1579,8 +1600,7 @@ class ExpandedPyMISP(PyMISP):
|
|||
entries['sighting'] = ms
|
||||
to_return.append(entries)
|
||||
return to_return
|
||||
else:
|
||||
return normalized_response
|
||||
return normalized_response
|
||||
|
||||
def search_logs(self, limit: Optional[int]=None, page: Optional[int]=None,
|
||||
log_id: Optional[int]=None, title: Optional[str]=None,
|
||||
|
@ -1743,12 +1763,14 @@ class ExpandedPyMISP(PyMISP):
|
|||
misp_entity.sharing_group_id = sharing_group_id # Set new sharing group id
|
||||
if isinstance(misp_entity, MISPEvent):
|
||||
return self.update_event(misp_entity, pythonify=pythonify)
|
||||
elif isinstance(misp_entity, MISPObject):
|
||||
|
||||
if isinstance(misp_entity, MISPObject):
|
||||
return self.update_object(misp_entity, pythonify=pythonify)
|
||||
elif isinstance(misp_entity, MISPAttribute):
|
||||
|
||||
if isinstance(misp_entity, MISPAttribute):
|
||||
return self.update_attribute(misp_entity, pythonify=pythonify)
|
||||
else:
|
||||
raise PyMISPError('The misp_entity must be MISPEvent, MISPObject or MISPAttribute')
|
||||
|
||||
raise PyMISPError('The misp_entity must be MISPEvent, MISPObject or MISPAttribute')
|
||||
|
||||
def tag(self, misp_entity: Union[AbstractMISP, str], tag: str):
|
||||
"""Tag an event or an attribute. misp_entity can be a UUID"""
|
||||
|
@ -1792,7 +1814,7 @@ class ExpandedPyMISP(PyMISP):
|
|||
return str(obj)
|
||||
if isinstance(obj, (int, str)):
|
||||
return obj
|
||||
elif 'id' in obj:
|
||||
if 'id' in obj:
|
||||
return obj['id']
|
||||
return obj['uuid']
|
||||
|
||||
|
@ -1806,27 +1828,28 @@ class ExpandedPyMISP(PyMISP):
|
|||
'''Catch-all method to normalize anything that can be converted to a timestamp'''
|
||||
if isinstance(value, datetime):
|
||||
return value.timestamp()
|
||||
elif isinstance(value, date):
|
||||
|
||||
if isinstance(value, date):
|
||||
return datetime.combine(value, datetime.max.time()).timestamp()
|
||||
elif isinstance(value, str):
|
||||
|
||||
if isinstance(value, str):
|
||||
if value.isdigit():
|
||||
return value
|
||||
else:
|
||||
try:
|
||||
float(value)
|
||||
return value
|
||||
except ValueError:
|
||||
# The value can also be '1d', '10h', ...
|
||||
return value
|
||||
else:
|
||||
return value
|
||||
try:
|
||||
float(value)
|
||||
return value
|
||||
except ValueError:
|
||||
# The value can also be '1d', '10h', ...
|
||||
return value
|
||||
return value
|
||||
|
||||
def _check_response(self, response, lenient_response_type=False, expect_json=False):
|
||||
"""Check if the response from the server is not an unexpected error"""
|
||||
if response.status_code >= 500:
|
||||
logger.critical(everything_broken.format(response.request.headers, response.request.body, response.text))
|
||||
raise MISPServerError(f'Error code 500:\n{response.text}')
|
||||
elif 400 <= response.status_code < 500:
|
||||
|
||||
if 400 <= response.status_code < 500:
|
||||
# The server returns a json message with the error details
|
||||
error_message = response.json()
|
||||
logger.error(f'Something went wrong ({response.status_code}): {error_message}')
|
||||
|
@ -1849,7 +1872,7 @@ class ExpandedPyMISP(PyMISP):
|
|||
raise PyMISPUnexpectedResponse(f'Unexpected response from server: {response.text}')
|
||||
if lenient_response_type and not response.headers.get('content-type').startswith('application/json'):
|
||||
return response.text
|
||||
if not len(response.content):
|
||||
if not response.content:
|
||||
# Empty response
|
||||
logger.error('Got an empty response.')
|
||||
return {'errors': 'The response is empty.'}
|
||||
|
|
|
@ -1327,6 +1327,37 @@ class TestComprehensive(unittest.TestCase):
|
|||
new_attribute.type = 'ip-dst'
|
||||
new_attribute = self.user_misp_connector.add_attribute(first.id, new_attribute)
|
||||
self.assertEqual(new_attribute.value, '1.2.3.4')
|
||||
# Test attribute already in event
|
||||
# new_attribute.uuid = str(uuid4())
|
||||
# new_attribute = self.user_misp_connector.add_attribute(first.id, new_attribute)
|
||||
new_similar = MISPAttribute()
|
||||
new_similar.value = '1.2.3.4'
|
||||
new_similar.type = 'ip-dst'
|
||||
similar_error = self.user_misp_connector.add_attribute(first.id, new_similar)
|
||||
self.assertEqual(similar_error['errors'][1]['errors']['value'][0], 'A similar attribute already exists for this event.')
|
||||
|
||||
# Test add multiple attributes at once
|
||||
attr1 = MISPAttribute()
|
||||
attr1.value = '1.2.3.4'
|
||||
attr1.type = 'ip-dst'
|
||||
attr2 = MISPAttribute()
|
||||
attr2.value = '1.2.3.5'
|
||||
attr2.type = 'ip-dst'
|
||||
attr3 = MISPAttribute()
|
||||
attr3.value = first.attributes[0].value
|
||||
attr3.type = first.attributes[0].type
|
||||
attr4 = MISPAttribute()
|
||||
attr4.value = '1.2.3.6'
|
||||
attr4.type = 'ip-dst'
|
||||
attr4.add_tag('tlp:amber___test')
|
||||
response = self.user_misp_connector.add_attribute(first.id, [attr1, attr2, attr3, attr4])
|
||||
# FIXME: https://github.com/MISP/MISP/issues/4959
|
||||
# self.assertEqual(response['attributes'][0].value, '1.2.3.5')
|
||||
# self.assertEqual(response['attributes'][1].value, '1.2.3.6')
|
||||
# self.assertEqual(response['attributes'][1].tags[0].name, 'tlp:amber___test')
|
||||
# self.assertEqual(response['errors']['attribute_0']['value'][0], 'A similar attribute already exists for this event.')
|
||||
# self.assertEqual(response['errors']['attribute_2']['value'][0], 'A similar attribute already exists for this event.')
|
||||
|
||||
# Add attribute as proposal
|
||||
new_proposal = MISPAttribute()
|
||||
new_proposal.value = '5.2.3.4'
|
||||
|
@ -1406,7 +1437,7 @@ class TestComprehensive(unittest.TestCase):
|
|||
|
||||
# Test attribute*S*
|
||||
attributes = self.admin_misp_connector.attributes()
|
||||
self.assertEqual(len(attributes), 5)
|
||||
self.assertEqual(len(attributes), 7)
|
||||
# attributes = self.user_misp_connector.attributes()
|
||||
# self.assertEqual(len(attributes), 5)
|
||||
# Test event*S*
|
||||
|
|
Loading…
Reference in New Issue