From 0a2a6b3d6b0ddf2fd25b0b9a8538a33c7b7fcec5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Sun, 21 Oct 2018 22:58:07 -0400 Subject: [PATCH] new: Allow to pass csv to return_format in search --- pymisp/aping.py | 32 +++++++++++----- tests/testlive_comprehensive.py | 65 ++++++++++++++++++++++++++++++++- 2 files changed, 85 insertions(+), 12 deletions(-) diff --git a/pymisp/aping.py b/pymisp/aping.py index b4f556f..954099e 100644 --- a/pymisp/aping.py +++ b/pymisp/aping.py @@ -141,6 +141,7 @@ class ExpandedPyMISP(PyMISP): sg_reference_only: Optional[bool]=None, eventinfo: Optional[str]=None, searchall: Optional[bool]=None, + include_context: Optional[bool]=None, includeContext: Optional[bool]=None, pythonify: Optional[bool]=False, **kwargs): ''' @@ -172,6 +173,7 @@ class ExpandedPyMISP(PyMISP): :param sg_reference_only: If this flag is set, sharing group objects will not be included, instead only the sharing group ID is set. :param eventinfo: Filter on the event's info field. :param searchall: Search for a full or a substring (delimited by % for substrings) in the event info, event tags, attribute tags, attribute values or attribute comment fields. + :param include_context: [CSV Only] Include the event data with each attribute. :param pythonify: Returns a list of PyMISP Objects the the plain json output. Warning: it might use a lot of RAM Deprecated: @@ -180,6 +182,7 @@ class ExpandedPyMISP(PyMISP): :param last: synonym for publish_timestamp :param enforceWarninglist: synonym for enforce_warninglist :param includeEventUuid: synonym for include_event_uuid + :param includeContext: synonym for include_context ''' @@ -197,6 +200,8 @@ class ExpandedPyMISP(PyMISP): enforce_warninglist = enforceWarninglist if includeEventUuid is not None: include_event_uuid = includeEventUuid + if includeContext is not None: + include_context = includeContext # Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized. # They are passed as-is. @@ -266,12 +271,16 @@ class ExpandedPyMISP(PyMISP): query['eventinfo'] = eventinfo if searchall is not None: query['searchall'] = searchall + if include_context is not None: + query['includeContext'] = include_context url = urljoin(self.root_url, f'{controller}/restSearch') response = self._prepare_request('POST', url, data=json.dumps(query)) normalized_response = self._check_response(response) - if isinstance(normalized_response, str) or (isinstance(normalized_response, dict) and - normalized_response.get('errors')): + if return_format == 'csv' and pythonify: + return self._csv_to_dict(normalized_response) + elif isinstance(normalized_response, str) or (isinstance(normalized_response, dict) and + normalized_response.get('errors')): return normalized_response elif return_format == 'json' and pythonify: # The response is in json, we can convert it to a list of pythonic MISP objects @@ -362,14 +371,7 @@ class ExpandedPyMISP(PyMISP): normalized_response = self._check_response(response) if isinstance(normalized_response, str): if pythonify and not headerless: - # Make it a list of dict - fieldnames, lines = normalized_response.split('\n', 1) - fieldnames = fieldnames.split(',') - to_return = [] - for line in csv.reader(lines.split('\n')): - if line: - to_return.append({fname: value for fname, value in zip(fieldnames, line)}) - return to_return + return self._csv_to_dict(normalized_response) return normalized_response elif isinstance(normalized_response, dict): @@ -379,3 +381,13 @@ class ExpandedPyMISP(PyMISP): else: # Should not happen... raise PyMISPUnexpectedResponse(f'The server should have returned a CSV file as text. instead it returned:\n{normalized_response}') + + def _csv_to_dict(self, csv_content): + '''Makes a list of dict out of a csv file (requires headers)''' + fieldnames, lines = csv_content.split('\n', 1) + fieldnames = fieldnames.split(',') + to_return = [] + for line in csv.reader(lines.split('\n')): + if line: + to_return.append({fname: value for fname, value in zip(fieldnames, line)}) + return to_return diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 1d6ea0e..0a48361 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -604,11 +604,9 @@ class TestComprehensive(unittest.TestCase): # Page / limit attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, page=1, limit=3, pythonify=True) - print(attributes) self.assertEqual(len(attributes), 3) attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, page=2, limit=3, pythonify=True) - print(attributes) self.assertEqual(len(attributes), 1) time.sleep(1) # make sure the next attribute is added one at least one second later @@ -719,6 +717,69 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(first.id) self.admin_misp_connector.delete_event(second.id) + def test_search_csv(self): + first = self.create_simple_event() + first.attributes[0].comment = 'This is the original comment' + second = self.create_simple_event() + second.info = 'foo blah' + second.set_date('2018-09-01') + second.add_attribute('ip-src', '8.8.8.8') + try: + second = self.user_misp_connector.add_event(second) + first = self.user_misp_connector.add_event(first) + + response = self.user_misp_connector.fast_publish(first.id, alert=False) + self.assertEqual(response['errors'][0][1]['message'], 'You do not have permission to use this functionality.') + + # Default search, attribute with to_ids == True + first.attributes[0].to_ids = True + first = self.user_misp_connector.update_event(first) + self.admin_misp_connector.fast_publish(first.id, alert=False) + csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), pythonify=True) + self.assertEqual(len(csv), 1) + self.assertEqual(csv[0]['value'], first.attributes[0].value) + + # eventid + csv = self.user_misp_connector.search(return_format='csv', eventid=first.id, pythonify=True) + self.assertEqual(len(csv), 1) + self.assertEqual(csv[0]['value'], first.attributes[0].value) + + # category + csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), category='Other', pythonify=True) + self.assertEqual(len(csv), 1) + self.assertEqual(csv[0]['value'], first.attributes[0].value) + csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), category='Person', pythonify=True) + self.assertEqual(len(csv), 0) + + # type_attribute + csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), type_attribute='text', pythonify=True) + self.assertEqual(len(csv), 1) + self.assertEqual(csv[0]['value'], first.attributes[0].value) + csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), type_attribute='ip-src', pythonify=True) + self.assertEqual(len(csv), 0) + + # context + csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), include_context=True, pythonify=True) + self.assertEqual(len(csv), 1) + self.assertTrue('event_info' in csv[0]) + + # date_from date_to + csv = self.user_misp_connector.search(return_format='csv', date_from=date.today().isoformat(), pythonify=True) + self.assertEqual(len(csv), 1) + self.assertEqual(csv[0]['value'], first.attributes[0].value) + csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', pythonify=True) + self.assertEqual(len(csv), 2) + + # headerless + csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', headerless=True) + # FIXME: The header is here. + # print(csv) + + finally: + # Delete event + self.admin_misp_connector.delete_event(first.id) + self.admin_misp_connector.delete_event(second.id) + def test_upload_sample(self): first = self.create_simple_event() second = self.create_simple_event()