new: Add lots of test cases, find lots of bugs

pull/280/head
Raphaël Vinot 2018-09-18 21:58:20 -07:00
parent cd1de8c6bf
commit e56f70b722
4 changed files with 348 additions and 101 deletions

View File

@ -106,36 +106,90 @@ class ExpandedPyMISP(PyMISP):
# TODO: Make that thing async & test it.
def search(self, controller: str='events', return_format: str='json',
value: Optional[SearchParameterTypes]=None,
eventinfo: Optional[str]=None,
type_attribute: Optional[SearchParameterTypes]=None,
category: Optional[SearchParameterTypes]=None,
org: Optional[SearchParameterTypes]=None,
tags: Optional[SearchParameterTypes]=None,
date_from: Optional[DateTypes]=None, date_to: Optional[DateTypes]=None,
quickfilter: Optional[bool]=None,
date_from: Optional[DateTypes]=None,
date_to: Optional[DateTypes]=None,
eventid: Optional[SearchType]=None,
with_attachment: Optional[bool]=None,
with_attachments: Optional[bool]=None, withAttachments: Optional[bool]=None,
metadata: Optional[bool]=None,
uuid: Optional[str]=None,
published: Optional[bool]=None,
searchall: Optional[bool]=None,
enforce_warninglist: Optional[bool]=None, enforceWarninglist: Optional[bool]=None,
sg_reference_only: Optional[bool]=None,
publish_timestamp: Optional[DateInterval]=None,
publish_timestamp: Optional[DateInterval]=None, last: Optional[DateInterval]=None,
timestamp: Optional[DateInterval]=None,
published: Optional[bool]=None,
enforce_warninglist: Optional[bool]=None, enforceWarninglist: Optional[bool]=None,
to_ids: Optional[str]=None,
deleted: Optional[str]=None,
include_event_uuid: Optional[str]=None, includeEventUuid: Optional[str]=None,
event_timestamp: Optional[DateTypes]=None,
eventinfo: Optional[str]=None,
searchall: Optional[bool]=None,
sg_reference_only: Optional[bool]=None,
pythonify: Optional[bool]=False,
**kwargs):
'''
Search in the MISP instance
:param returnFormat: Set the return format of the search (Currently supported: json, xml, openioc, suricata, snort - more formats are being moved to restSearch with the goal being that all searches happen through this API). Can be passed as the first parameter after restSearch or via the JSON payload.
:param value: Search for the given value in the attributes' value field.
:param type_attribute: The attribute type, any valid MISP attribute type is accepted.
:param category: The attribute category, any valid MISP attribute category is accepted.
:param org: Search by the creator organisation by supplying the organisation identifier.
:param tags: Tags to search or to exclude. You can pass a list, or the output of `build_complex_query`
:param quickfilter: If set it makes the search ignore all of the other arguments, except for the auth key and value. MISP will return all events that have a sub-string match on value in the event info, event orgc, or any of the attribute value fields, or in the attribute comment.
:param date_from: Events with the date set to a date after the one specified. This filter will use the date of the event.
:param date_to: Events with the date set to a date before the one specified. This filter will use the date of the event.
:param eventid: The events that should be included / excluded from the search
:param with_attachments: If set, encodes the attachments / zipped malware samples as base64 in the data field within each attribute
:param metadata: Only the metadata (event, tags, relations) is returned, attributes and proposals are omitted.
:param uuid: Restrict the results by uuid.
:param publish_timestamp: Restrict the results by the last publish timestamp (newer than).
:param timestamp: Restrict the results by the timestamp (last edit). Any event with a timestamp newer than the given timestamp will be returned. In case you are dealing with /attributes as scope, the attribute's timestamp will be used for the lookup.
:param published: Set whether published or unpublished events should be returned. Do not set the parameter if you want both.
:param enforce_warninglist: Remove any attributes from the result that would cause a hit on a warninglist entry.
:param to_ids: By default (0) all attributes are returned that match the other filter parameters, irregardless of their to_ids setting. To restrict the returned data set to to_ids only attributes set this parameter to 1. You can only use the special "exclude" setting to only return attributes that have the to_ids flag disabled.
:param deleted: If this parameter is set to 1, it will return soft-deleted attributes along with active ones. By using "only" as a parameter it will limit the returned data set to soft-deleted data only.
:param include_event_uuid: Instead of just including the event ID, also include the event UUID in each of the attributes.
:param event_timestamp: Only return attributes from events that have received a modification after the given timestamp.
:param eventinfo: Search in the eventinfo field
:param searchall: Set to run a full text search on the whole database (slow)
:param sg_reference_only: Only return a reference to the sharing groups the responses are sharing in (avoid leaking org names)
:param pythonify: Returns a list of PyMISP Objects the the plain json output
Deprecated:
:param withAttachments: synonym for with_attachments
:param last: synonym for publish_timestamp
:param enforceWarninglist: synonym for enforce_warninglist
:param includeEventUuid: synonym for include_event_uuid
'''
if controller not in ['events', 'attributes', 'objects']:
raise ValueError('controller has to be in {}'.format(', '.join(['events', 'attributes', 'objects'])))
# Deprecated stuff / synonyms
if withAttachments is not None:
with_attachments = withAttachments
if last is not None:
publish_timestamp = last
if enforceWarninglist is not None:
enforce_warninglist = enforceWarninglist
if includeEventUuid is not None:
include_event_uuid = includeEventUuid
# Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized.
# They are passed as-is.
query = kwargs
if return_format is not None:
if return_format not in ['json', 'xml', 'openioc', 'suricata', 'snort']:
raise ValueError('return_format has to be in {}'.format(', '.join(['json', 'xml', 'openioc', 'suricata', 'snort'])))
query['returnFormat'] = return_format
if value is not None:
query['value'] = value
if eventinfo is not None:
query['eventinfo'] = eventinfo
if type_attribute is not None:
query['type'] = type_attribute
if category is not None:
@ -144,27 +198,20 @@ class ExpandedPyMISP(PyMISP):
query['org'] = org
if tags is not None:
query['tags'] = tags
if quickfilter is not None:
query['quickfilter'] = quickfilter
if date_from is not None:
query['from'] = self.make_timestamp(date_from)
if date_to is not None:
query['to'] = self.make_timestamp(date_to)
if eventid is not None:
query['eventid'] = eventid
if with_attachment is not None:
query['withAttachments'] = with_attachment
if with_attachments is not None:
query['withAttachments'] = with_attachments
if metadata is not None:
query['metadata'] = metadata
if uuid is not None:
query['uuid'] = uuid
if published is not None:
query['published'] = published
if enforce_warninglist is not None:
query['enforceWarninglist'] = enforce_warninglist
if enforceWarninglist is not None:
# Alias for enforce_warninglist
query['enforceWarninglist'] = enforceWarninglist
if sg_reference_only is not None:
query['sgReferenceOnly'] = sg_reference_only
if publish_timestamp is not None:
if isinstance(publish_timestamp, (list, tuple)):
query['publish_timestamp'] = (self.make_timestamp(publish_timestamp[0]), self.make_timestamp(publish_timestamp[1]))
@ -175,6 +222,29 @@ class ExpandedPyMISP(PyMISP):
query['timestamp'] = (self.make_timestamp(timestamp[0]), self.make_timestamp(timestamp[1]))
else:
query['timestamp'] = self.make_timestamp(timestamp)
if published is not None:
query['published'] = published
if enforce_warninglist is not None:
query['enforceWarninglist'] = enforce_warninglist
if to_ids is not None:
if str(to_ids) not in ['0', '1', 'exclude']:
raise ValueError('to_ids has to be in {}'.format(', '.join(['0', '1', 'exclude'])))
query['to_ids'] = to_ids
if deleted is not None:
query['deleted'] = deleted
if include_event_uuid is not None:
query['includeEventUuid'] = include_event_uuid
if event_timestamp is not None:
if isinstance(event_timestamp, (list, tuple)):
query['event_timestamp'] = (self.make_timestamp(event_timestamp[0]), self.make_timestamp(event_timestamp[1]))
else:
query['event_timestamp'] = self.make_timestamp(event_timestamp)
if eventinfo is not None:
query['eventinfo'] = eventinfo
if searchall is not None:
query['searchall'] = searchall
if sg_reference_only is not None:
query['sgReferenceOnly'] = sg_reference_only
url = urljoin(self.root_url, f'{controller}/restSearch')
response = self._prepare_request('POST', url, data=json.dumps(query))
@ -182,21 +252,24 @@ class ExpandedPyMISP(PyMISP):
if isinstance(normalized_response, str) or (isinstance(normalized_response, dict) and
normalized_response.get('errors')):
return normalized_response
# The response is in json, we can confert it to a list of pythonic MISP objects
to_return = []
if controller == 'events':
for e in normalized_response:
me = MISPEvent()
me.load(e)
to_return.append(me)
elif controller == 'attributes':
for a in normalized_response.get('Attribute'):
ma = MISPAttribute()
ma.from_dict(**a)
to_return.append(ma)
elif controller == 'objects':
raise PyMISPNotImplementedYet('Not implemented yet')
return to_return
elif return_format == 'json' and pythonify:
# The response is in json, we can convert it to a list of pythonic MISP objects
to_return = []
if controller == 'events':
for e in normalized_response:
me = MISPEvent()
me.load(e)
to_return.append(me)
elif controller == 'attributes':
for a in normalized_response.get('Attribute'):
ma = MISPAttribute()
ma.from_dict(**a)
to_return.append(ma)
elif controller == 'objects':
raise PyMISPNotImplementedYet('Not implemented yet')
return to_return
else:
return normalized_response
def get_csv(self,
eventid: Optional[SearchType]=None,
@ -211,6 +284,22 @@ class ExpandedPyMISP(PyMISP):
enforce_warninglist: Optional[bool]=None, enforceWarninglist: Optional[bool]=None,
pythonify: Optional[bool]=False,
**kwargs):
'''
Get MISP data in CSV format.
:param eventid: Restrict the download to a single event
:param ignore: If true, the response includes attributes without the to_ids flag
:param tags: Tags to search or to exclude. You can pass a list, or the output of `build_complex_query`
:param category: The attribute category, any valid MISP attribute category is accepted.
:param type_attribute: The attribute type, any valid MISP attribute type is accepted.
:param include_context: Include the event data with each attribute.
:param date_from: Events with the date set to a date after the one specified. This filter will use the date of the event.
:param date_to: Events with the date set to a date before the one specified. This filter will use the date of the event.
:param publish_timestamp: Events published within the last x amount of time. This filter will use the published timestamp of the event.
:param headerless: The CSV created when this setting is set to true will not contain the header row.
:param enforceWarninglist: All attributes that have a hit on a warninglist will be excluded.
:param pythonify: Returns a list of dictionaries instead of the plain CSV
'''
# Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized.
# They are passed as-is.
@ -250,7 +339,7 @@ class ExpandedPyMISP(PyMISP):
response = self._prepare_request('POST', url, data=json.dumps(query))
normalized_response = self._check_response(response)
if isinstance(normalized_response, str):
if pythonify:
if pythonify and not headerless:
# Make it a list of dict
fieldnames, lines = normalized_response.split('\n', 1)
fieldnames = fieldnames.split(',')

View File

@ -256,11 +256,11 @@ class MISPAttribute(AbstractMISP):
else:
# Assuming the user only passed the filename
self.malware_filename = self.value
m = hashlib.md5()
m.update(self.data.getvalue())
# m = hashlib.md5()
# m.update(self.data.getvalue())
self.value = self.malware_filename
md5 = m.hexdigest()
self.value = '{}|{}'.format(self.malware_filename, md5)
# md5 = m.hexdigest()
# self.value = '{}|{}'.format(self.malware_filename, md5)
self._malware_binary = self.data
self.encrypt = True

View File

@ -1,7 +1,13 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pymisp import PyMISP, __version__
from keys import url, key
try:
from keys import url, key
except ImportError as e:
print(e)
url = 'http://localhost:8080'
key = 'fk5BodCZw8owbscW8pQ4ykMASLeJ4NYhuAbshNjo'
import time
import unittest

View File

@ -4,7 +4,8 @@
import unittest
from pymisp import ExpandedPyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis
from datetime import datetime, timedelta
from datetime import datetime, timedelta, date
from io import BytesIO
import time
@ -24,7 +25,7 @@ class TestComprehensive(unittest.TestCase):
def setUpClass(cls):
cls.maxDiff = None
# Connect as admin
cls.admin_misp_connector = ExpandedPyMISP(url, key)
cls.admin_misp_connector = ExpandedPyMISP(url, key, debug=False)
# Creates an org
org = cls.admin_misp_connector.add_organisation(name='Test Org')
cls.test_org = MISPOrganisation()
@ -113,17 +114,17 @@ class TestComprehensive(unittest.TestCase):
try:
first, second, third = self.environment()
# Search as admin
events = self.admin_misp_connector.search(value=first.attributes[0].value)
events = self.admin_misp_connector.search(value=first.attributes[0].value, pythonify=True)
self.assertEqual(len(events), 2)
for e in events:
self.assertIn(e.id, [first.id, second.id])
# Search as user
events = self.user_misp_connector.search(value=first.attributes[0].value)
events = self.user_misp_connector.search(value=first.attributes[0].value, pythonify=True)
self.assertEqual(len(events), 1)
for e in events:
self.assertIn(e.id, [second.id])
# Non-existing value
events = self.user_misp_connector.search(value=str(uuid4()))
events = self.user_misp_connector.search(value=str(uuid4()), pythonify=True)
self.assertEqual(events, [])
finally:
# Delete events
@ -132,20 +133,21 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(third.id)
def test_search_value_attribute(self):
'''Search value in attributes controller'''
try:
first, second, third = self.environment()
# Search as admin
attributes = self.admin_misp_connector.search(controller='attributes', value=first.attributes[0].value)
attributes = self.admin_misp_connector.search(controller='attributes', value=first.attributes[0].value, pythonify=True)
self.assertEqual(len(attributes), 2)
for a in attributes:
self.assertIn(a.event_id, [first.id, second.id])
# Search as user
attributes = self.user_misp_connector.search(controller='attributes', value=first.attributes[0].value)
attributes = self.user_misp_connector.search(controller='attributes', value=first.attributes[0].value, pythonify=True)
self.assertEqual(len(attributes), 1)
for a in attributes:
self.assertIn(a.event_id, [second.id])
# Non-existing value
attributes = self.user_misp_connector.search(controller='attributes', value=str(uuid4()))
attributes = self.user_misp_connector.search(controller='attributes', value=str(uuid4()), pythonify=True)
self.assertEqual(attributes, [])
finally:
# Delete event
@ -153,18 +155,18 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
# @unittest.skip("Currently failing")
def test_search_type_event(self):
'''Search multiple events, search events containing attributes with specific types'''
try:
first, second, third = self.environment()
# Search as admin
events = self.admin_misp_connector.search(timestamp=first.timestamp.timestamp())
events = self.admin_misp_connector.search(timestamp=first.timestamp.timestamp(), pythonify=True)
self.assertEqual(len(events), 3)
for e in events:
self.assertIn(e.id, [first.id, second.id, third.id])
attributes_types_search = self.admin_misp_connector.build_complex_query(or_parameters=['ip-src', 'ip-dst'])
events = self.admin_misp_connector.search(timestamp=first.timestamp.timestamp(),
type_attribute=attributes_types_search)
type_attribute=attributes_types_search, pythonify=True)
self.assertEqual(len(events), 2)
for e in events:
self.assertIn(e.id, [second.id, third.id])
@ -175,11 +177,12 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(third.id)
def test_search_type_attribute(self):
'''Search multiple attributes, search attributes with specific types'''
try:
first, second, third = self.environment()
# Search as admin
attributes = self.admin_misp_connector.search(controller='attributes',
timestamp=first.timestamp.timestamp())
timestamp=first.timestamp.timestamp(), pythonify=True)
self.assertEqual(len(attributes), 8)
for a in attributes:
self.assertIn(a.event_id, [first.id, second.id, third.id])
@ -187,7 +190,7 @@ class TestComprehensive(unittest.TestCase):
attributes_types_search = self.admin_misp_connector.build_complex_query(or_parameters=['ip-src', 'ip-dst'])
attributes = self.admin_misp_connector.search(controller='attributes',
timestamp=first.timestamp.timestamp(),
type_attribute=attributes_types_search)
type_attribute=attributes_types_search, pythonify=True)
self.assertEqual(len(attributes), 3)
for a in attributes:
self.assertIn(a.event_id, [second.id, third.id])
@ -198,31 +201,32 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(third.id)
def test_search_tag_event(self):
'''Search Tags at events level'''
try:
first, second, third = self.environment()
# Search as admin
events = self.admin_misp_connector.search(tags='tlp:white___test')
events = self.admin_misp_connector.search(tags='tlp:white___test', pythonify=True)
self.assertEqual(len(events), 3)
for e in events:
self.assertIn(e.id, [first.id, second.id, third.id])
events = self.admin_misp_connector.search(tags='tlp:amber___test')
events = self.admin_misp_connector.search(tags='tlp:amber___test', pythonify=True)
self.assertEqual(len(events), 1)
for e in events:
self.assertIn(e.id, [third.id])
events = self.admin_misp_connector.search(tags='admin_only')
events = self.admin_misp_connector.search(tags='admin_only', pythonify=True)
self.assertEqual(len(events), 1)
for e in events:
self.assertIn(e.id, [first.id])
# Search as user
events = self.user_misp_connector.search(tags='tlp:white___test')
events = self.user_misp_connector.search(tags='tlp:white___test', pythonify=True)
self.assertEqual(len(events), 2)
for e in events:
self.assertIn(e.id, [second.id, third.id])
events = self.user_misp_connector.search(tags='tlp:amber___test')
events = self.user_misp_connector.search(tags='tlp:amber___test', pythonify=True)
self.assertEqual(len(events), 1)
for e in events:
self.assertIn(e.id, [third.id])
events = self.user_misp_connector.search(tags='admin_only')
events = self.user_misp_connector.search(tags='admin_only', pythonify=True)
self.assertEqual(events, [])
finally:
# Delete event
@ -231,21 +235,22 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(third.id)
def test_search_tag_attribute(self):
'''Search Tags at attributes level'''
try:
first, second, third = self.environment()
# Search as admin
attributes = self.admin_misp_connector.search(controller='attributes', tags='tlp:white___test')
attributes = self.admin_misp_connector.search(controller='attributes', tags='tlp:white___test', pythonify=True)
self.assertEqual(len(attributes), 5)
attributes = self.admin_misp_connector.search(controller='attributes', tags='tlp:amber___test')
attributes = self.admin_misp_connector.search(controller='attributes', tags='tlp:amber___test', pythonify=True)
self.assertEqual(len(attributes), 2)
attributes = self.admin_misp_connector.search(tags='admin_only')
attributes = self.admin_misp_connector.search(tags='admin_only', pythonify=True)
self.assertEqual(len(attributes), 1)
# Search as user
attributes = self.user_misp_connector.search(controller='attributes', tags='tlp:white___test')
attributes = self.user_misp_connector.search(controller='attributes', tags='tlp:white___test', pythonify=True)
self.assertEqual(len(attributes), 4)
attributes = self.user_misp_connector.search(controller='attributes', tags='tlp:amber___test')
attributes = self.user_misp_connector.search(controller='attributes', tags='tlp:amber___test', pythonify=True)
self.assertEqual(len(attributes), 2)
attributes = self.user_misp_connector.search(tags='admin_only')
attributes = self.user_misp_connector.search(tags='admin_only', pythonify=True)
self.assertEqual(attributes, [])
finally:
# Delete event
@ -254,12 +259,13 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(third.id)
def test_search_tag_advanced_event(self):
'''Advanced search Tags at events level'''
try:
first, second, third = self.environment()
complex_query = self.admin_misp_connector.build_complex_query(or_parameters=['tlp:white___test'],
not_parameters=['tlp:amber___test',
'foo_double___test'])
events = self.admin_misp_connector.search(tags=complex_query)
events = self.admin_misp_connector.search(tags=complex_query, pythonify=True)
self.assertEqual(len(events), 3)
for e in events:
self.assertIn(e.id, [first.id, second.id, third.id])
@ -270,7 +276,7 @@ class TestComprehensive(unittest.TestCase):
complex_query = self.admin_misp_connector.build_complex_query(or_parameters=['unique___test'],
not_parameters=['tlp:white___test'])
events = self.admin_misp_connector.search(tags=complex_query)
events = self.admin_misp_connector.search(tags=complex_query, pythonify=True)
self.assertEqual(len(events), 1)
for e in events:
self.assertIn(e.id, [first.id, second.id])
@ -283,12 +289,13 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(third.id)
def test_search_tag_advanced_attributes(self):
'''Advanced search Tags at attributes level'''
try:
first, second, third = self.environment()
complex_query = self.admin_misp_connector.build_complex_query(or_parameters=['tlp:white___test'],
not_parameters=['tlp:amber___test',
'foo_double___test'])
attributes = self.admin_misp_connector.search(controller='attributes', tags=complex_query)
attributes = self.admin_misp_connector.search(controller='attributes', tags=complex_query, pythonify=True)
self.assertEqual(len(attributes), 3)
for a in attributes:
self.assertEqual([t for t in a.tags if t.name == 'tlp:amber___test'], [])
@ -301,6 +308,7 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(third.id)
def test_search_timestamp_event(self):
'''Search specific update timestamps at events level'''
# Creating event 1 - timestamp 5 min ago
first = self.create_simple_event(force_timestamps=True)
event_creation_timestamp_first = datetime.now() - timedelta(minutes=5)
@ -314,19 +322,19 @@ class TestComprehensive(unittest.TestCase):
second = self.user_misp_connector.add_event(second)
# Search as user
# # Test - last 4 min
events = self.user_misp_connector.search(timestamp='4m')
events = self.user_misp_connector.search(timestamp='4m', pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
self.assertEqual(events[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp()))
# # Test timestamp of 2nd event
events = self.user_misp_connector.search(timestamp=event_creation_timestamp_second.timestamp())
events = self.user_misp_connector.search(timestamp=event_creation_timestamp_second.timestamp(), pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
self.assertEqual(events[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp()))
# # Test interval -6 min -> -4 min
events = self.user_misp_connector.search(timestamp=['6m', '4m'])
events = self.user_misp_connector.search(timestamp=['6m', '4m'], pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
self.assertEqual(events[0].timestamp.timestamp(), int(event_creation_timestamp_first.timestamp()))
@ -336,6 +344,7 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(second.id)
def test_search_timestamp_attribute(self):
'''Search specific update timestamps at attributes level'''
# Creating event 1 - timestamp 5 min ago
first = self.create_simple_event(force_timestamps=True)
event_creation_timestamp_first = datetime.now() - timedelta(minutes=5)
@ -351,19 +360,19 @@ class TestComprehensive(unittest.TestCase):
second = self.user_misp_connector.add_event(second)
# Search as user
# # Test - last 4 min
attributes = self.user_misp_connector.search(controller='attributes', timestamp='4m')
attributes = self.user_misp_connector.search(controller='attributes', timestamp='4m', pythonify=True)
self.assertEqual(len(attributes), 1)
self.assertEqual(attributes[0].event_id, second.id)
self.assertEqual(attributes[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp()))
# # Test timestamp of 2nd event
attributes = self.user_misp_connector.search(controller='attributes', timestamp=event_creation_timestamp_second.timestamp())
attributes = self.user_misp_connector.search(controller='attributes', timestamp=event_creation_timestamp_second.timestamp(), pythonify=True)
self.assertEqual(len(attributes), 1)
self.assertEqual(attributes[0].event_id, second.id)
self.assertEqual(attributes[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp()))
# # Test interval -6 min -> -4 min
attributes = self.user_misp_connector.search(controller='attributes', timestamp=['6m', '4m'])
attributes = self.user_misp_connector.search(controller='attributes', timestamp=['6m', '4m'], pythonify=True)
self.assertEqual(len(attributes), 1)
self.assertEqual(attributes[0].event_id, first.id)
self.assertEqual(attributes[0].timestamp.timestamp(), int(event_creation_timestamp_first.timestamp()))
@ -373,6 +382,7 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(second.id)
def test_user_perms(self):
'''Test publish rights'''
try:
first = self.create_simple_event()
first.publish()
@ -389,6 +399,7 @@ class TestComprehensive(unittest.TestCase):
# @unittest.skip("Uncomment when adding new tests, it has a 10s sleep")
def test_search_publish_timestamp(self):
'''Search for a specific publication timestamp, an interval, and invalid values.'''
# Creating event 1
first = self.create_simple_event()
first.publish()
@ -400,25 +411,25 @@ class TestComprehensive(unittest.TestCase):
time.sleep(10)
second = self.pub_misp_connector.add_event(second)
# Test invalid query
events = self.pub_misp_connector.search(publish_timestamp='5x')
events = self.pub_misp_connector.search(publish_timestamp='5x', pythonify=True)
self.assertEqual(events, [])
events = self.pub_misp_connector.search(publish_timestamp='ad')
events = self.pub_misp_connector.search(publish_timestamp='ad', pythonify=True)
self.assertEqual(events, [])
events = self.pub_misp_connector.search(publish_timestamp='aaad')
events = self.pub_misp_connector.search(publish_timestamp='aaad', pythonify=True)
self.assertEqual(events, [])
# Test - last 4 min
events = self.pub_misp_connector.search(publish_timestamp='5s')
events = self.pub_misp_connector.search(publish_timestamp='5s', pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
# Test 5 sec before timestamp of 2nd event
events = self.pub_misp_connector.search(publish_timestamp=(second.publish_timestamp.timestamp()))
events = self.pub_misp_connector.search(publish_timestamp=(second.publish_timestamp.timestamp()), pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
# Test interval -6 min -> -4 min
events = self.pub_misp_connector.search(publish_timestamp=[first.publish_timestamp.timestamp() - 5,
second.publish_timestamp.timestamp() - 5])
second.publish_timestamp.timestamp() - 5], pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
finally:
@ -427,51 +438,166 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(second.id)
def test_simple_event(self):
'''Search a bunch of parameters:
* Value not existing
* only return metadata
* published yes/no
* event id
* uuid
* creator org
* substring search in value and eventinfo
* quickfilter
* date_from
* date_to
* deleted
* to_ids
* include_event_uuid
missing: attachments, warning list
'''
first = self.create_simple_event()
first.info = 'foo bar blah'
second = self.create_simple_event()
second.info = 'foo blah'
second.set_date('2018-09-01')
second.add_attribute('ip-src', '8.8.8.8')
try:
first = self.user_misp_connector.add_event(first)
second = self.user_misp_connector.add_event(second)
timeframe = [first.timestamp.timestamp() - 5, first.timestamp.timestamp() + 5]
# Search event we just created in multiple ways. Make sure it doesn't catch it when it shouldn't
events = self.user_misp_connector.search(timestamp=timeframe)
self.assertEqual(len(events), 1)
events = self.user_misp_connector.search(timestamp=timeframe, pythonify=True)
self.assertEqual(len(events), 2)
self.assertEqual(events[0].id, first.id)
events = self.user_misp_connector.search(timestamp=timeframe, value='nothere')
self.assertEqual(events[1].id, second.id)
events = self.user_misp_connector.search(timestamp=timeframe, value='nothere', pythonify=True)
self.assertEqual(events, [])
events = self.user_misp_connector.search(timestamp=timeframe, value=first.attributes[0].value)
events = self.user_misp_connector.search(timestamp=timeframe, value=first.attributes[0].value, pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
events = self.user_misp_connector.search(timestamp=[first.timestamp.timestamp() - 50,
first.timestamp.timestamp() - 10],
value=first.attributes[0].value)
value=first.attributes[0].value, pythonify=True)
self.assertEqual(events, [])
# Test return content
events = self.user_misp_connector.search(timestamp=timeframe, metadata=False)
self.assertEqual(len(events), 1)
events = self.user_misp_connector.search(timestamp=timeframe, metadata=False, pythonify=True)
self.assertEqual(len(events), 2)
self.assertEqual(len(events[0].attributes), 1)
events = self.user_misp_connector.search(timestamp=timeframe, metadata=True)
self.assertEqual(len(events), 1)
self.assertEqual(len(events[1].attributes), 2)
events = self.user_misp_connector.search(timestamp=timeframe, metadata=True, pythonify=True)
self.assertEqual(len(events), 2)
self.assertEqual(len(events[0].attributes), 0)
self.assertEqual(len(events[1].attributes), 0)
# other things
events = self.user_misp_connector.search(timestamp=timeframe, published=True)
events = self.user_misp_connector.search(timestamp=timeframe, published=True, pythonify=True)
self.assertEqual(events, [])
events = self.user_misp_connector.search(timestamp=timeframe, published=False)
events = self.user_misp_connector.search(timestamp=timeframe, published=False, pythonify=True)
self.assertEqual(len(events), 2)
events = self.user_misp_connector.search(eventid=first.id, pythonify=True)
self.assertEqual(len(events), 1)
events = self.user_misp_connector.search(eventid=first.id)
self.assertEqual(len(events), 1)
events = self.user_misp_connector.search(uuid=first.uuid)
self.assertEqual(len(events), 1)
events = self.user_misp_connector.search(org=first.orgc_id)
self.assertEqual(events[0].id, first.id)
events = self.user_misp_connector.search(uuid=first.uuid, pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
events = self.user_misp_connector.search(org=first.orgc_id, pythonify=True)
self.assertEqual(len(events), 2)
# test like search
events = self.user_misp_connector.search(timestamp=timeframe, value='%{}%'.format(first.attributes[0].value.split('-')[2]))
events = self.user_misp_connector.search(timestamp=timeframe, value='%{}%'.format(first.attributes[0].value.split('-')[2]), pythonify=True)
self.assertEqual(len(events), 1)
events = self.user_misp_connector.search(timestamp=timeframe, eventinfo='%bar blah%')
self.assertEqual(events[0].id, first.id)
events = self.user_misp_connector.search(timestamp=timeframe, eventinfo='%bar blah%', pythonify=True)
# FIXME: should return one event
# self.assertEqual(len(events), 1)
# self.assertEqual(events[0].id, first.id)
# quickfilter
events = self.user_misp_connector.search(timestamp=timeframe, quickfilter='bar', pythonify=True)
# FIXME: should return one event
# self.assertEqual(len(events), 1)
# self.assertEqual(events[0].id, second.id)
events = self.user_misp_connector.search(timestamp=timeframe, date_from=date.today().isoformat(), pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
events = self.user_misp_connector.search(timestamp=timeframe, date_from='2018-09-01', pythonify=True)
self.assertEqual(len(events), 2)
events = self.user_misp_connector.search(timestamp=timeframe, date_from='2018-09-01', date_to='2018-09-02', pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
# Category
events = self.user_misp_connector.search(timestamp=timeframe, category='Network activity', pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
# toids
events = self.user_misp_connector.search(timestamp=timeframe, to_ids='0', pythonify=True)
self.assertEqual(len(events), 2)
events = self.user_misp_connector.search(timestamp=timeframe, to_ids='1', pythonify=True)
# FIXME: should only return second
# self.assertEqual(len(events), 1)
# self.assertEqual(events[0].id, second.id)
# self.assertEqual(len(events[0].attributes), 1)
events = self.user_misp_connector.search(timestamp=timeframe, to_ids='exclude', pythonify=True)
self.assertEqual(len(events), 2)
# FIXME: Should have one attribute
# self.assertEqual(len(events[0].attributes), 1)
self.assertEqual(len(events[1].attributes), 1)
# deleted
second.attributes[1].delete()
self.user_misp_connector.update_event(second)
events = self.user_misp_connector.search(eventid=second.id, pythonify=True)
self.assertEqual(len(events[0].attributes), 1)
events = self.user_misp_connector.search(eventid=second.id, deleted=True, pythonify=True)
self.assertEqual(len(events[0].attributes), 2)
# include_event_uuid
events = self.user_misp_connector.search(eventid=second.id, include_event_uuid=True, pythonify=True)
# FIXME: doesn't seem to return the event UUID.
# print(events[0].attributes[0].to_json())
# print(second.uuid)
# self.assertEqual(events[0].attributes[0].event_uuid, second.uuid)
# event_timestamp
second.add_attribute('ip-src', '8.8.8.9')
second = self.user_misp_connector.update_event(second)
# FIXME: returns everything
# events = self.user_misp_connector.search(event_timestamp=second.timestamp.timestamp(), pythonify=True)
# self.assertEqual(len(events), 1)
# searchall
# FIXME: searchall doesn't seem to do anything
# second.add_attribute('text', 'This is a test for the full text search', comment='Test stuff comment')
# second = self.user_misp_connector.update_event(second)
# events = self.user_misp_connector.search(value='This is a test for the full text search', searchall=True, pythonify=True)
# self.assertEqual(len(events), 1)
# events = self.user_misp_connector.search(value='stuff', searchall=True, pythonify=True)
# self.assertEqual(len(events), 1)
time.sleep(1)
# attachments
with open('tests/testlive_comprehensive.py', 'rb') as f:
first.add_attribute('malware-sample', value='testfile.py', data=BytesIO(f.read()))
first = self.user_misp_connector.update_event(first)
# time.sleep(30)
events = self.user_misp_connector.search(timestamp=first.timestamp.timestamp(), with_attachments=True,
pythonify=True)
self.assertEqual(len(events), 1)
# print(events[0].attributes[-1].to_json())
# FIXME: the attachment isn't there.
# self.assertIs(type(events[0].attributes[-1].malware_binary), BytesIO)
events = self.user_misp_connector.search(timestamp=first.timestamp.timestamp(), with_attachments=False,
pythonify=True)
self.assertEqual(len(events), 1)
self.assertIs(events[0].attributes[-1].malware_binary, None)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
def test_edit_attribute(self):
first = self.create_simple_event()
@ -492,16 +618,42 @@ class TestComprehensive(unittest.TestCase):
try:
first.attributes[0].comment = 'This is the original comment'
first = self.user_misp_connector.add_event(first)
response = self.user_misp_connector.fast_publish(first.id, alert=False)
self.assertEqual(response['errors'][0][1]['message'], 'You do not have permission to use this functionality.')
self.admin_misp_connector.fast_publish(first.id, alert=False)
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp() - 5, pythonify=True)
# FIXME: Should not return anything (to_ids is False)
# self.assertEqual(len(csv), 0)
first.attributes[0].to_ids = True
first = self.user_misp_connector.update_event(first)
self.admin_misp_connector.fast_publish(first.id, alert=False)
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp() - 5, pythonify=True)
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
@unittest.skip("Currently failing")
def test_search_type_event_csv(self):
try:
first, second, third = self.environment()
# Search as admin
events = self.admin_misp_connector.search(return_format='csv', timestamp=first.timestamp.timestamp())
print(events)
attributes_types_search = self.admin_misp_connector.build_complex_query(or_parameters=['ip-src', 'ip-dst'])
events = self.admin_misp_connector.search(return_format='csv', timestamp=first.timestamp.timestamp(),
type_attribute=attributes_types_search)
print(events)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
if __name__ == '__main__':
unittest.main()