new: Allow to pass csv to return_format in search

pull/291/head
Raphaël Vinot 2018-10-21 22:58:07 -04:00
parent a8a7193059
commit 0a2a6b3d6b
2 changed files with 85 additions and 12 deletions

View File

@ -141,6 +141,7 @@ class ExpandedPyMISP(PyMISP):
sg_reference_only: Optional[bool]=None,
eventinfo: Optional[str]=None,
searchall: Optional[bool]=None,
include_context: Optional[bool]=None, includeContext: Optional[bool]=None,
pythonify: Optional[bool]=False,
**kwargs):
'''
@ -172,6 +173,7 @@ class ExpandedPyMISP(PyMISP):
:param sg_reference_only: If this flag is set, sharing group objects will not be included, instead only the sharing group ID is set.
:param eventinfo: Filter on the event's info field.
:param searchall: Search for a full or a substring (delimited by % for substrings) in the event info, event tags, attribute tags, attribute values or attribute comment fields.
:param include_context: [CSV Only] Include the event data with each attribute.
:param pythonify: Returns a list of PyMISP Objects the the plain json output. Warning: it might use a lot of RAM
Deprecated:
@ -180,6 +182,7 @@ class ExpandedPyMISP(PyMISP):
:param last: synonym for publish_timestamp
:param enforceWarninglist: synonym for enforce_warninglist
:param includeEventUuid: synonym for include_event_uuid
:param includeContext: synonym for include_context
'''
@ -197,6 +200,8 @@ class ExpandedPyMISP(PyMISP):
enforce_warninglist = enforceWarninglist
if includeEventUuid is not None:
include_event_uuid = includeEventUuid
if includeContext is not None:
include_context = includeContext
# Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized.
# They are passed as-is.
@ -266,11 +271,15 @@ class ExpandedPyMISP(PyMISP):
query['eventinfo'] = eventinfo
if searchall is not None:
query['searchall'] = searchall
if include_context is not None:
query['includeContext'] = include_context
url = urljoin(self.root_url, f'{controller}/restSearch')
response = self._prepare_request('POST', url, data=json.dumps(query))
normalized_response = self._check_response(response)
if isinstance(normalized_response, str) or (isinstance(normalized_response, dict) and
if return_format == 'csv' and pythonify:
return self._csv_to_dict(normalized_response)
elif isinstance(normalized_response, str) or (isinstance(normalized_response, dict) and
normalized_response.get('errors')):
return normalized_response
elif return_format == 'json' and pythonify:
@ -362,14 +371,7 @@ class ExpandedPyMISP(PyMISP):
normalized_response = self._check_response(response)
if isinstance(normalized_response, str):
if pythonify and not headerless:
# Make it a list of dict
fieldnames, lines = normalized_response.split('\n', 1)
fieldnames = fieldnames.split(',')
to_return = []
for line in csv.reader(lines.split('\n')):
if line:
to_return.append({fname: value for fname, value in zip(fieldnames, line)})
return to_return
return self._csv_to_dict(normalized_response)
return normalized_response
elif isinstance(normalized_response, dict):
@ -379,3 +381,13 @@ class ExpandedPyMISP(PyMISP):
else:
# Should not happen...
raise PyMISPUnexpectedResponse(f'The server should have returned a CSV file as text. instead it returned:\n{normalized_response}')
def _csv_to_dict(self, csv_content):
'''Makes a list of dict out of a csv file (requires headers)'''
fieldnames, lines = csv_content.split('\n', 1)
fieldnames = fieldnames.split(',')
to_return = []
for line in csv.reader(lines.split('\n')):
if line:
to_return.append({fname: value for fname, value in zip(fieldnames, line)})
return to_return

View File

@ -604,11 +604,9 @@ class TestComprehensive(unittest.TestCase):
# Page / limit
attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, page=1, limit=3, pythonify=True)
print(attributes)
self.assertEqual(len(attributes), 3)
attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, page=2, limit=3, pythonify=True)
print(attributes)
self.assertEqual(len(attributes), 1)
time.sleep(1) # make sure the next attribute is added one at least one second later
@ -719,6 +717,69 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
def test_search_csv(self):
first = self.create_simple_event()
first.attributes[0].comment = 'This is the original comment'
second = self.create_simple_event()
second.info = 'foo blah'
second.set_date('2018-09-01')
second.add_attribute('ip-src', '8.8.8.8')
try:
second = self.user_misp_connector.add_event(second)
first = self.user_misp_connector.add_event(first)
response = self.user_misp_connector.fast_publish(first.id, alert=False)
self.assertEqual(response['errors'][0][1]['message'], 'You do not have permission to use this functionality.')
# Default search, attribute with to_ids == True
first.attributes[0].to_ids = True
first = self.user_misp_connector.update_event(first)
self.admin_misp_connector.fast_publish(first.id, alert=False)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), pythonify=True)
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
# eventid
csv = self.user_misp_connector.search(return_format='csv', eventid=first.id, pythonify=True)
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
# category
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), category='Other', pythonify=True)
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), category='Person', pythonify=True)
self.assertEqual(len(csv), 0)
# type_attribute
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), type_attribute='text', pythonify=True)
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), type_attribute='ip-src', pythonify=True)
self.assertEqual(len(csv), 0)
# context
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), include_context=True, pythonify=True)
self.assertEqual(len(csv), 1)
self.assertTrue('event_info' in csv[0])
# date_from date_to
csv = self.user_misp_connector.search(return_format='csv', date_from=date.today().isoformat(), pythonify=True)
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', pythonify=True)
self.assertEqual(len(csv), 2)
# headerless
csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', headerless=True)
# FIXME: The header is here.
# print(csv)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
def test_upload_sample(self):
first = self.create_simple_event()
second = self.create_simple_event()