diff --git a/pymisp/aping.py b/pymisp/aping.py index 5dce4ca..4dd6489 100644 --- a/pymisp/aping.py +++ b/pymisp/aping.py @@ -347,20 +347,38 @@ class ExpandedPyMISP(PyMISP): return a def add_attribute(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool=False): - '''Add an attribute to an existing MISP event''' + '''Add an attribute to an existing MISP event + NOTE MISP 2.4.113+: you can pass a list of attributes. + In that case, the pythonified response is the following: {'attributes': [MISPAttribute], 'errors': {errors by attributes}}''' event_id = self.__get_uuid_or_id_from_abstract_misp(event) new_attribute = self._prepare_request('POST', f'attributes/add/{event_id}', data=attribute) new_attribute = self._check_response(new_attribute, expect_json=True) - if ('errors' in new_attribute and new_attribute['errors'][0] == 403 - and new_attribute['errors'][1]['message'] == 'You do not have permission to do that.'): - # At this point, we assume the user tried to add an attribute on an event they don't own - # Re-try with a proposal - return self.add_attribute_proposal(event_id, attribute, pythonify) - if not (self.global_pythonify or pythonify) or 'errors' in new_attribute: - return new_attribute - a = MISPAttribute() - a.from_dict(**new_attribute) - return a + if isinstance(attribute, list): + # Multiple attributes were passed at once, the handling is totally different + if self._old_misp((2, 4, 113), '2020-01-01', sys._getframe().f_code.co_name): + return new_attribute + if not (self.global_pythonify or pythonify): + return new_attribute + to_return = {'attributes': []} + if 'errors' in new_attribute: + to_return['errors'] = new_attribute['errors'] + + for attribute in new_attribute['Attribute']: + a = MISPAttribute() + a.from_dict(**attribute) + to_return['attributes'].append(a) + return to_return + else: + if ('errors' in new_attribute and new_attribute['errors'][0] == 403 + and new_attribute['errors'][1]['message'] == 'You do not have permission to do that.'): + # At this point, we assume the user tried to add an attribute on an event they don't own + # Re-try with a proposal + return self.add_attribute_proposal(event_id, attribute, pythonify) + if not (self.global_pythonify or pythonify) or 'errors' in new_attribute: + return new_attribute + a = MISPAttribute() + a.from_dict(**new_attribute) + return a def update_attribute(self, attribute: MISPAttribute, attribute_id: int=None, pythonify: bool=False): '''Update an attribute on a MISP instance''' @@ -1402,9 +1420,11 @@ class ExpandedPyMISP(PyMISP): if return_format == 'csv' and (self.global_pythonify or pythonify) and not headerless: return self._csv_to_dict(normalized_response) - elif 'errors' in normalized_response: + + if 'errors' in normalized_response: return normalized_response - elif return_format == 'json' and self.global_pythonify or pythonify: + + if return_format == 'json' and self.global_pythonify or pythonify: # The response is in json, we can convert it to a list of pythonic MISP objects to_return = [] if controller == 'events': @@ -1443,8 +1463,8 @@ class ExpandedPyMISP(PyMISP): elif controller == 'objects': raise PyMISPNotImplementedYet('Not implemented yet') return to_return - else: - return normalized_response + + return normalized_response def search_index(self, published: Optional[bool]=None, eventid: Optional[SearchType]=None, tags: Optional[SearchParameterTypes]=None, @@ -1559,7 +1579,8 @@ class ExpandedPyMISP(PyMISP): normalized_response = self._check_response(response, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in normalized_response: return normalized_response - elif self.global_pythonify or pythonify: + + if self.global_pythonify or pythonify: to_return = [] for s in normalized_response: entries = {} @@ -1579,8 +1600,7 @@ class ExpandedPyMISP(PyMISP): entries['sighting'] = ms to_return.append(entries) return to_return - else: - return normalized_response + return normalized_response def search_logs(self, limit: Optional[int]=None, page: Optional[int]=None, log_id: Optional[int]=None, title: Optional[str]=None, @@ -1743,12 +1763,14 @@ class ExpandedPyMISP(PyMISP): misp_entity.sharing_group_id = sharing_group_id # Set new sharing group id if isinstance(misp_entity, MISPEvent): return self.update_event(misp_entity, pythonify=pythonify) - elif isinstance(misp_entity, MISPObject): + + if isinstance(misp_entity, MISPObject): return self.update_object(misp_entity, pythonify=pythonify) - elif isinstance(misp_entity, MISPAttribute): + + if isinstance(misp_entity, MISPAttribute): return self.update_attribute(misp_entity, pythonify=pythonify) - else: - raise PyMISPError('The misp_entity must be MISPEvent, MISPObject or MISPAttribute') + + raise PyMISPError('The misp_entity must be MISPEvent, MISPObject or MISPAttribute') def tag(self, misp_entity: Union[AbstractMISP, str], tag: str): """Tag an event or an attribute. misp_entity can be a UUID""" @@ -1792,7 +1814,7 @@ class ExpandedPyMISP(PyMISP): return str(obj) if isinstance(obj, (int, str)): return obj - elif 'id' in obj: + if 'id' in obj: return obj['id'] return obj['uuid'] @@ -1806,27 +1828,28 @@ class ExpandedPyMISP(PyMISP): '''Catch-all method to normalize anything that can be converted to a timestamp''' if isinstance(value, datetime): return value.timestamp() - elif isinstance(value, date): + + if isinstance(value, date): return datetime.combine(value, datetime.max.time()).timestamp() - elif isinstance(value, str): + + if isinstance(value, str): if value.isdigit(): return value - else: - try: - float(value) - return value - except ValueError: - # The value can also be '1d', '10h', ... - return value - else: - return value + try: + float(value) + return value + except ValueError: + # The value can also be '1d', '10h', ... + return value + return value def _check_response(self, response, lenient_response_type=False, expect_json=False): """Check if the response from the server is not an unexpected error""" if response.status_code >= 500: logger.critical(everything_broken.format(response.request.headers, response.request.body, response.text)) raise MISPServerError(f'Error code 500:\n{response.text}') - elif 400 <= response.status_code < 500: + + if 400 <= response.status_code < 500: # The server returns a json message with the error details error_message = response.json() logger.error(f'Something went wrong ({response.status_code}): {error_message}') @@ -1849,7 +1872,7 @@ class ExpandedPyMISP(PyMISP): raise PyMISPUnexpectedResponse(f'Unexpected response from server: {response.text}') if lenient_response_type and not response.headers.get('content-type').startswith('application/json'): return response.text - if not len(response.content): + if not response.content: # Empty response logger.error('Got an empty response.') return {'errors': 'The response is empty.'} diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 30a127c..d7f321f 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -1327,6 +1327,37 @@ class TestComprehensive(unittest.TestCase): new_attribute.type = 'ip-dst' new_attribute = self.user_misp_connector.add_attribute(first.id, new_attribute) self.assertEqual(new_attribute.value, '1.2.3.4') + # Test attribute already in event + # new_attribute.uuid = str(uuid4()) + # new_attribute = self.user_misp_connector.add_attribute(first.id, new_attribute) + new_similar = MISPAttribute() + new_similar.value = '1.2.3.4' + new_similar.type = 'ip-dst' + similar_error = self.user_misp_connector.add_attribute(first.id, new_similar) + self.assertEqual(similar_error['errors'][1]['errors']['value'][0], 'A similar attribute already exists for this event.') + + # Test add multiple attributes at once + attr1 = MISPAttribute() + attr1.value = '1.2.3.4' + attr1.type = 'ip-dst' + attr2 = MISPAttribute() + attr2.value = '1.2.3.5' + attr2.type = 'ip-dst' + attr3 = MISPAttribute() + attr3.value = first.attributes[0].value + attr3.type = first.attributes[0].type + attr4 = MISPAttribute() + attr4.value = '1.2.3.6' + attr4.type = 'ip-dst' + attr4.add_tag('tlp:amber___test') + response = self.user_misp_connector.add_attribute(first.id, [attr1, attr2, attr3, attr4]) + # FIXME: https://github.com/MISP/MISP/issues/4959 + # self.assertEqual(response['attributes'][0].value, '1.2.3.5') + # self.assertEqual(response['attributes'][1].value, '1.2.3.6') + # self.assertEqual(response['attributes'][1].tags[0].name, 'tlp:amber___test') + # self.assertEqual(response['errors']['attribute_0']['value'][0], 'A similar attribute already exists for this event.') + # self.assertEqual(response['errors']['attribute_2']['value'][0], 'A similar attribute already exists for this event.') + # Add attribute as proposal new_proposal = MISPAttribute() new_proposal.value = '5.2.3.4' @@ -1406,7 +1437,7 @@ class TestComprehensive(unittest.TestCase): # Test attribute*S* attributes = self.admin_misp_connector.attributes() - self.assertEqual(len(attributes), 5) + self.assertEqual(len(attributes), 7) # attributes = self.user_misp_connector.attributes() # self.assertEqual(len(attributes), 5) # Test event*S*