mirror of https://github.com/MISP/PyMISP
add: more test cases
parent
be6e4ccf7b
commit
2607111b19
|
@ -301,6 +301,12 @@ class ExpandedPyMISP(PyMISP):
|
||||||
:param pythonify: Returns a list of dictionaries instead of the plain CSV
|
:param pythonify: Returns a list of dictionaries instead of the plain CSV
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
# Deprecated stuff / synonyms
|
||||||
|
if includeContext is not None:
|
||||||
|
include_context = includeContext
|
||||||
|
if enforceWarninglist is not None:
|
||||||
|
enforce_warninglist = enforceWarninglist
|
||||||
|
|
||||||
# Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized.
|
# Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized.
|
||||||
# They are passed as-is.
|
# They are passed as-is.
|
||||||
query = kwargs
|
query = kwargs
|
||||||
|
@ -316,8 +322,6 @@ class ExpandedPyMISP(PyMISP):
|
||||||
query['type'] = type_attribute
|
query['type'] = type_attribute
|
||||||
if include_context is not None:
|
if include_context is not None:
|
||||||
query['includeContext'] = include_context
|
query['includeContext'] = include_context
|
||||||
if includeContext is not None:
|
|
||||||
query['includeContext'] = includeContext
|
|
||||||
if date_from is not None:
|
if date_from is not None:
|
||||||
query['from'] = self.make_timestamp(date_from)
|
query['from'] = self.make_timestamp(date_from)
|
||||||
if date_to is not None:
|
if date_to is not None:
|
||||||
|
@ -331,9 +335,6 @@ class ExpandedPyMISP(PyMISP):
|
||||||
query['headerless'] = headerless
|
query['headerless'] = headerless
|
||||||
if enforce_warninglist is not None:
|
if enforce_warninglist is not None:
|
||||||
query['enforceWarninglist'] = enforce_warninglist
|
query['enforceWarninglist'] = enforce_warninglist
|
||||||
if enforceWarninglist is not None:
|
|
||||||
# Alias for enforce_warninglist
|
|
||||||
query['enforceWarninglist'] = enforceWarninglist
|
|
||||||
|
|
||||||
url = urljoin(self.root_url, '/events/csv/download/')
|
url = urljoin(self.root_url, '/events/csv/download/')
|
||||||
response = self._prepare_request('POST', url, data=json.dumps(query))
|
response = self._prepare_request('POST', url, data=json.dumps(query))
|
||||||
|
|
|
@ -452,14 +452,16 @@ class TestComprehensive(unittest.TestCase):
|
||||||
* deleted
|
* deleted
|
||||||
* to_ids
|
* to_ids
|
||||||
* include_event_uuid
|
* include_event_uuid
|
||||||
missing: attachments, warning list
|
warning list
|
||||||
'''
|
'''
|
||||||
first = self.create_simple_event()
|
first = self.create_simple_event()
|
||||||
first.info = 'foo bar blah'
|
first.info = 'foo bar blah'
|
||||||
|
# First has one text attribute
|
||||||
second = self.create_simple_event()
|
second = self.create_simple_event()
|
||||||
second.info = 'foo blah'
|
second.info = 'foo blah'
|
||||||
second.set_date('2018-09-01')
|
second.set_date('2018-09-01')
|
||||||
second.add_attribute('ip-src', '8.8.8.8')
|
second.add_attribute('ip-src', '8.8.8.8')
|
||||||
|
# second has two attributes: text and ip-src
|
||||||
try:
|
try:
|
||||||
first = self.user_misp_connector.add_event(first)
|
first = self.user_misp_connector.add_event(first)
|
||||||
second = self.user_misp_connector.add_event(second)
|
second = self.user_misp_connector.add_event(second)
|
||||||
|
@ -517,6 +519,8 @@ class TestComprehensive(unittest.TestCase):
|
||||||
# FIXME: should return one event
|
# FIXME: should return one event
|
||||||
# self.assertEqual(len(events), 1)
|
# self.assertEqual(len(events), 1)
|
||||||
# self.assertEqual(events[0].id, second.id)
|
# self.assertEqual(events[0].id, second.id)
|
||||||
|
|
||||||
|
# date_from / date_to
|
||||||
events = self.user_misp_connector.search(timestamp=timeframe, date_from=date.today().isoformat(), pythonify=True)
|
events = self.user_misp_connector.search(timestamp=timeframe, date_from=date.today().isoformat(), pythonify=True)
|
||||||
self.assertEqual(len(events), 1)
|
self.assertEqual(len(events), 1)
|
||||||
self.assertEqual(events[0].id, first.id)
|
self.assertEqual(events[0].id, first.id)
|
||||||
|
@ -535,15 +539,15 @@ class TestComprehensive(unittest.TestCase):
|
||||||
events = self.user_misp_connector.search(timestamp=timeframe, to_ids='0', pythonify=True)
|
events = self.user_misp_connector.search(timestamp=timeframe, to_ids='0', pythonify=True)
|
||||||
self.assertEqual(len(events), 2)
|
self.assertEqual(len(events), 2)
|
||||||
events = self.user_misp_connector.search(timestamp=timeframe, to_ids='1', pythonify=True)
|
events = self.user_misp_connector.search(timestamp=timeframe, to_ids='1', pythonify=True)
|
||||||
# FIXME: should only return second
|
self.assertEqual(len(events), 2)
|
||||||
# self.assertEqual(len(events), 1)
|
self.assertEqual(len(events[0].attributes), 0)
|
||||||
# self.assertEqual(events[0].id, second.id)
|
self.assertEqual(events[1].id, second.id)
|
||||||
# self.assertEqual(len(events[0].attributes), 1)
|
self.assertEqual(len(events[1].attributes), 1)
|
||||||
events = self.user_misp_connector.search(timestamp=timeframe, to_ids='exclude', pythonify=True)
|
events = self.user_misp_connector.search(timestamp=timeframe, to_ids='exclude', pythonify=True)
|
||||||
self.assertEqual(len(events), 2)
|
self.assertEqual(len(events), 2)
|
||||||
# FIXME: Should have one attribute
|
# FIXME: exclude == 1
|
||||||
# self.assertEqual(len(events[0].attributes), 1)
|
# self.assertEqual(len(events[0].attributes), 1)
|
||||||
self.assertEqual(len(events[1].attributes), 1)
|
# self.assertEqual(len(events[1].attributes), 1)
|
||||||
|
|
||||||
# deleted
|
# deleted
|
||||||
second.attributes[1].delete()
|
second.attributes[1].delete()
|
||||||
|
@ -582,13 +586,10 @@ class TestComprehensive(unittest.TestCase):
|
||||||
first.add_attribute('malware-sample', value='testfile.py', data=BytesIO(f.read()))
|
first.add_attribute('malware-sample', value='testfile.py', data=BytesIO(f.read()))
|
||||||
|
|
||||||
first = self.user_misp_connector.update_event(first)
|
first = self.user_misp_connector.update_event(first)
|
||||||
# time.sleep(30)
|
|
||||||
events = self.user_misp_connector.search(timestamp=first.timestamp.timestamp(), with_attachments=True,
|
events = self.user_misp_connector.search(timestamp=first.timestamp.timestamp(), with_attachments=True,
|
||||||
pythonify=True)
|
pythonify=True)
|
||||||
self.assertEqual(len(events), 1)
|
self.assertEqual(len(events), 1)
|
||||||
# print(events[0].attributes[-1].to_json())
|
self.assertIs(type(events[0].attributes[-1].malware_binary), BytesIO)
|
||||||
# FIXME: the attachment isn't there.
|
|
||||||
# self.assertIs(type(events[0].attributes[-1].malware_binary), BytesIO)
|
|
||||||
events = self.user_misp_connector.search(timestamp=first.timestamp.timestamp(), with_attachments=False,
|
events = self.user_misp_connector.search(timestamp=first.timestamp.timestamp(), with_attachments=False,
|
||||||
pythonify=True)
|
pythonify=True)
|
||||||
self.assertEqual(len(events), 1)
|
self.assertEqual(len(events), 1)
|
||||||
|
@ -615,17 +616,27 @@ class TestComprehensive(unittest.TestCase):
|
||||||
|
|
||||||
def test_get_csv(self):
|
def test_get_csv(self):
|
||||||
first = self.create_simple_event()
|
first = self.create_simple_event()
|
||||||
|
second = self.create_simple_event()
|
||||||
|
second.info = 'foo blah'
|
||||||
|
second.set_date('2018-09-01')
|
||||||
|
second.add_attribute('ip-src', '8.8.8.8')
|
||||||
try:
|
try:
|
||||||
first.attributes[0].comment = 'This is the original comment'
|
first.attributes[0].comment = 'This is the original comment'
|
||||||
first = self.user_misp_connector.add_event(first)
|
first = self.user_misp_connector.add_event(first)
|
||||||
response = self.user_misp_connector.fast_publish(first.id, alert=False)
|
response = self.user_misp_connector.fast_publish(first.id, alert=False)
|
||||||
self.assertEqual(response['errors'][0][1]['message'], 'You do not have permission to use this functionality.')
|
self.assertEqual(response['errors'][0][1]['message'], 'You do not have permission to use this functionality.')
|
||||||
|
|
||||||
|
# default search, all attributes with to_ids == False
|
||||||
self.admin_misp_connector.fast_publish(first.id, alert=False)
|
self.admin_misp_connector.fast_publish(first.id, alert=False)
|
||||||
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp() - 5, pythonify=True)
|
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp() - 5, pythonify=True)
|
||||||
# FIXME: Should not return anything (to_ids is False)
|
# FIXME: Should not return anything (to_ids is False)
|
||||||
# self.assertEqual(len(csv), 0)
|
# self.assertEqual(len(csv), 0)
|
||||||
|
|
||||||
|
# Also export attributes with to_ids set to false
|
||||||
|
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp() - 5, ignore=True, pythonify=True)
|
||||||
|
self.assertEqual(len(csv), 1)
|
||||||
|
|
||||||
|
# Default search, attribute with to_ids == True
|
||||||
first.attributes[0].to_ids = True
|
first.attributes[0].to_ids = True
|
||||||
first = self.user_misp_connector.update_event(first)
|
first = self.user_misp_connector.update_event(first)
|
||||||
self.admin_misp_connector.fast_publish(first.id, alert=False)
|
self.admin_misp_connector.fast_publish(first.id, alert=False)
|
||||||
|
@ -633,9 +644,48 @@ class TestComprehensive(unittest.TestCase):
|
||||||
self.assertEqual(len(csv), 1)
|
self.assertEqual(len(csv), 1)
|
||||||
self.assertEqual(csv[0]['value'], first.attributes[0].value)
|
self.assertEqual(csv[0]['value'], first.attributes[0].value)
|
||||||
|
|
||||||
|
# eventid
|
||||||
|
csv = self.user_misp_connector.get_csv(eventid=first.id, pythonify=True)
|
||||||
|
self.assertEqual(len(csv), 1)
|
||||||
|
self.assertEqual(csv[0]['value'], first.attributes[0].value)
|
||||||
|
|
||||||
|
# category
|
||||||
|
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), category='Other', pythonify=True)
|
||||||
|
self.assertEqual(len(csv), 1)
|
||||||
|
self.assertEqual(csv[0]['value'], first.attributes[0].value)
|
||||||
|
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), category='Person', pythonify=True)
|
||||||
|
self.assertEqual(len(csv), 0)
|
||||||
|
|
||||||
|
# type_attribute
|
||||||
|
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), type_attribute='text', pythonify=True)
|
||||||
|
self.assertEqual(len(csv), 1)
|
||||||
|
self.assertEqual(csv[0]['value'], first.attributes[0].value)
|
||||||
|
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), type_attribute='ip-src', pythonify=True)
|
||||||
|
self.assertEqual(len(csv), 0)
|
||||||
|
|
||||||
|
# context
|
||||||
|
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), include_context=True, pythonify=True)
|
||||||
|
self.assertEqual(len(csv), 1)
|
||||||
|
# print(csv[0])
|
||||||
|
# FIXME: there is no context.
|
||||||
|
|
||||||
|
# date_from date_to
|
||||||
|
second = self.user_misp_connector.add_event(second)
|
||||||
|
csv = self.user_misp_connector.get_csv(date_from=date.today().isoformat(), pythonify=True)
|
||||||
|
self.assertEqual(len(csv), 1)
|
||||||
|
self.assertEqual(csv[0]['value'], first.attributes[0].value)
|
||||||
|
csv = self.user_misp_connector.get_csv(date_from='2018-09-01', date_to='2018-09-02', pythonify=True)
|
||||||
|
self.assertEqual(len(csv), 2)
|
||||||
|
|
||||||
|
# headerless
|
||||||
|
csv = self.user_misp_connector.get_csv(date_from='2018-09-01', date_to='2018-09-02', headerless=True)
|
||||||
|
# FIXME: The header is here.
|
||||||
|
# print(csv)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# Delete event
|
# Delete event
|
||||||
self.admin_misp_connector.delete_event(first.id)
|
self.admin_misp_connector.delete_event(first.id)
|
||||||
|
self.admin_misp_connector.delete_event(second.id)
|
||||||
|
|
||||||
@unittest.skip("Currently failing")
|
@unittest.skip("Currently failing")
|
||||||
def test_search_type_event_csv(self):
|
def test_search_type_event_csv(self):
|
||||||
|
|
Loading…
Reference in New Issue