chg: Add more test cases

pull/265/head
Raphaël Vinot 2018-08-19 14:35:32 +02:00
parent 2c03fb96c2
commit 303079af3b
2 changed files with 108 additions and 28 deletions

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from .exceptions import MISPServerError from .exceptions import MISPServerError
from .api import PyMISP, everything_broken from .api import PyMISP, everything_broken, MISPEvent, MISPAttribute
from typing import TypeVar, Optional, Tuple, List, Dict from typing import TypeVar, Optional, Tuple, List, Dict
from datetime import date, datetime from datetime import date, datetime
import json import json
@ -64,16 +64,18 @@ class ExpandedPyMISP(PyMISP):
return {'errors': [(response.status_code, error_message)]} return {'errors': [(response.status_code, error_message)]}
# At this point, we had no error. # At this point, we had no error.
if logger.isEnabledFor(logging.DEBUG):
logger.debug(response)
try: try:
response = response.json() response = response.json()
if logger.isEnabledFor(logging.DEBUG):
logger.debug(response)
if response.get('response') is not None: if response.get('response') is not None:
# Cleanup. # Cleanup.
return response.get('response') return response.get('response')
return response return response
except Exception: except Exception:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(response.text)
return response.text return response.text
# TODO: Make that thing async & test it. # TODO: Make that thing async & test it.
@ -151,4 +153,25 @@ class ExpandedPyMISP(PyMISP):
url = urljoin(self.root_url, f'{controller}/restSearch') url = urljoin(self.root_url, f'{controller}/restSearch')
response = self._prepare_request('POST', url, data=json.dumps(query)) response = self._prepare_request('POST', url, data=json.dumps(query))
return self._check_response(response) normalized_response = self._check_response(response)
if isinstance(normalized_response, str) or (isinstance(normalized_response, dict) and
normalized_response.get('errors')):
return normalized_response
# The response is in json, we can confert it to a list of pythonic MISP objects
to_return = []
if controller == 'events':
for e in normalized_response:
me = MISPEvent()
me.load(e)
to_return.append(me)
elif controller == 'attributes':
print(normalized_response)
# FIXME: if the query doesn't match, the request returns an empty list, and not a dictionary;
if normalized_response:
for a in normalized_response.get('Attribute'):
ma = MISPAttribute()
ma.from_dict(**a)
to_return.append(ma)
elif controller == 'objects':
raise Exception('Not implemented yet')
return to_return

View File

@ -83,16 +83,85 @@ class TestComprehensive(unittest.TestCase):
# Delete event # Delete event
self.admin_misp_connector.delete_event(c_me.id) self.admin_misp_connector.delete_event(c_me.id)
def test_search_value_attribute(self): def test_search_event_type(self):
me = self.create_event_org_only() me = self.create_event_org_only()
me.add_attribute('ip-src', '8.8.8.8')
second = self.create_event_org_only()
second.add_attribute('ip-dst', '9.9.9.9')
third = self.create_event_org_only()
try: try:
# Create event # Create event
created_event = self.admin_misp_connector.add_event(me) created_event = self.admin_misp_connector.add_event(me)
c_me = MISPEvent() c_me = MISPEvent()
c_me.load(created_event) c_me.load(created_event)
created_event = self.admin_misp_connector.add_event(second)
second_me = MISPEvent()
second_me.load(created_event)
created_event = self.admin_misp_connector.add_event(third)
third_me = MISPEvent()
third_me.load(created_event)
# Search as admin
response = self.admin_misp_connector.search(timestamp=c_me.timestamp.timestamp())
self.assertEqual(len(response), 3)
attrubutes_types_search = self.admin_misp_connector.build_complex_query(or_parameters=['ip-src', 'ip-dst'])
response = self.admin_misp_connector.search(controller='events', timestamp=c_me.timestamp.timestamp(),
type_attribute=attrubutes_types_search)
# print(response)
self.assertEqual(len(response), 2)
finally:
# Delete event
self.admin_misp_connector.delete_event(c_me.id)
self.admin_misp_connector.delete_event(second_me.id)
self.admin_misp_connector.delete_event(third_me.id)
def test_search_attribute_type(self):
me = self.create_event_org_only()
me.add_attribute('ip-src', '8.8.8.8')
second = self.create_event_org_only()
second.add_attribute('ip-dst', '9.9.9.9')
third = self.create_event_org_only()
try:
# Create event
created_event = self.admin_misp_connector.add_event(me)
c_me = MISPEvent()
c_me.load(created_event)
created_event = self.admin_misp_connector.add_event(second)
second_me = MISPEvent()
second_me.load(created_event)
created_event = self.admin_misp_connector.add_event(third)
third_me = MISPEvent()
third_me.load(created_event)
# Search as admin
response = self.admin_misp_connector.search(controller='attributes', timestamp=c_me.timestamp.timestamp())
self.assertEqual(len(response), 5)
attrubutes_types_search = self.admin_misp_connector.build_complex_query(or_parameters=['ip-src', 'ip-dst'])
response = self.admin_misp_connector.search(controller='attributes', timestamp=c_me.timestamp.timestamp(),
type_attribute=attrubutes_types_search)
# print(response)
self.assertEqual(len(response), 2)
finally:
# Delete event
self.admin_misp_connector.delete_event(c_me.id)
self.admin_misp_connector.delete_event(second_me.id)
self.admin_misp_connector.delete_event(third_me.id)
def test_search_value_attribute(self):
me = self.create_event_org_only()
me.add_attribute('text', str(uuid4()))
second = self.create_event_org_only()
second.add_attribute('text', me.attributes[0].value)
try:
# Create event
created_event = self.admin_misp_connector.add_event(me)
c_me = MISPEvent()
c_me.load(created_event)
created_event = self.admin_misp_connector.add_event(second)
second_me = MISPEvent()
second_me.load(created_event)
# Search as admin # Search as admin
response = self.admin_misp_connector.search(controller='attributes', value=me.attributes[0].value) response = self.admin_misp_connector.search(controller='attributes', value=me.attributes[0].value)
self.assertEqual(len(response), 1) self.assertEqual(len(response), 1)
# Connect as user # Connect as user
user_misp_connector = ExpandedPyMISP(url, self.test_usr.authkey) user_misp_connector = ExpandedPyMISP(url, self.test_usr.authkey)
# Search as user # Search as user
@ -101,6 +170,7 @@ class TestComprehensive(unittest.TestCase):
finally: finally:
# Delete event # Delete event
self.admin_misp_connector.delete_event(c_me.id) self.admin_misp_connector.delete_event(c_me.id)
self.admin_misp_connector.delete_event(second_me.id)
def test_search_tag_event(self): def test_search_tag_event(self):
me = self.create_event_with_tags() me = self.create_event_with_tags()
@ -132,12 +202,10 @@ class TestComprehensive(unittest.TestCase):
to_delete.load(created_event) to_delete.load(created_event)
complex_query = user_misp_connector.build_complex_query(or_parameters=['tlp:white___test'], not_parameters=['tlp:amber___test']) complex_query = user_misp_connector.build_complex_query(or_parameters=['tlp:white___test'], not_parameters=['tlp:amber___test'])
# Search as user # Search as user
response = user_misp_connector.search(tags=complex_query) events = user_misp_connector.search(tags=complex_query)
for e in response: for e in events:
to_validate = MISPEvent()
to_validate.load(e)
# FIXME Expected event without the tlp:amber attribute, broken for now # FIXME Expected event without the tlp:amber attribute, broken for now
for a in to_validate.attributes: for a in e.attributes:
print([t for t in a.tags if t.name == 'tlp:amber___test']) print([t for t in a.tags if t.name == 'tlp:amber___test'])
# self.assertEqual([t for t in a.tags if t.name == 'tlp:amber___test'], []) # self.assertEqual([t for t in a.tags if t.name == 'tlp:amber___test'], [])
# Delete event # Delete event
@ -165,22 +233,19 @@ class TestComprehensive(unittest.TestCase):
# # Test - last 4 min # # Test - last 4 min
response = user_misp_connector.search(timestamp='4m') response = user_misp_connector.search(timestamp='4m')
self.assertEqual(len(response), 1) self.assertEqual(len(response), 1)
received_event = MISPEvent() received_event = response[0]
received_event.load(response[0])
self.assertEqual(received_event.timestamp.timestamp(), int(event_creation_timestamp_second.timestamp())) self.assertEqual(received_event.timestamp.timestamp(), int(event_creation_timestamp_second.timestamp()))
# # Test 5 sec before timestamp of 2nd event # # Test timestamp of 2nd event
response = user_misp_connector.search(timestamp=(event_creation_timestamp_second.timestamp())) response = user_misp_connector.search(timestamp=event_creation_timestamp_second.timestamp())
self.assertEqual(len(response), 1) self.assertEqual(len(response), 1)
received_event = MISPEvent() received_event = response[0]
received_event.load(response[0])
self.assertEqual(received_event.timestamp.timestamp(), int(event_creation_timestamp_second.timestamp())) self.assertEqual(received_event.timestamp.timestamp(), int(event_creation_timestamp_second.timestamp()))
# # Test interval -6 min -> -4 min # # Test interval -6 min -> -4 min
response = user_misp_connector.search(timestamp=['6m', '4m']) response = user_misp_connector.search(timestamp=['6m', '4m'])
self.assertEqual(len(response), 1) self.assertEqual(len(response), 1)
received_event = MISPEvent() received_event = response[0]
received_event.load(response[0])
self.assertEqual(received_event.timestamp.timestamp(), int(event_creation_timestamp_first.timestamp())) self.assertEqual(received_event.timestamp.timestamp(), int(event_creation_timestamp_first.timestamp()))
finally: finally:
# Delete event # Delete event
@ -236,20 +301,14 @@ class TestComprehensive(unittest.TestCase):
# # Test - last 4 min # # Test - last 4 min
response = pub_misp_connector.search(publish_timestamp='5s') response = pub_misp_connector.search(publish_timestamp='5s')
self.assertEqual(len(response), 1) self.assertEqual(len(response), 1)
received_event = MISPEvent()
received_event.load(response[0])
# # Test 5 sec before timestamp of 2nd event # # Test 5 sec before timestamp of 2nd event
response = pub_misp_connector.search(publish_timestamp=(second_to_delete.publish_timestamp.timestamp())) response = pub_misp_connector.search(publish_timestamp=(second_to_delete.publish_timestamp.timestamp()))
self.assertEqual(len(response), 1) self.assertEqual(len(response), 1)
received_event = MISPEvent()
received_event.load(response[0])
# # Test interval -6 min -> -4 min # # Test interval -6 min -> -4 min
response = pub_misp_connector.search(publish_timestamp=[first_to_delete.publish_timestamp.timestamp() - 5, second_to_delete.publish_timestamp.timestamp() - 5]) response = pub_misp_connector.search(publish_timestamp=[first_to_delete.publish_timestamp.timestamp() - 5, second_to_delete.publish_timestamp.timestamp() - 5])
self.assertEqual(len(response), 1) self.assertEqual(len(response), 1)
received_event = MISPEvent()
received_event.load(response[0])
finally: finally:
# Delete event # Delete event
self.admin_misp_connector.delete_event(first_to_delete.id) self.admin_misp_connector.delete_event(first_to_delete.id)
@ -276,13 +335,11 @@ class TestComprehensive(unittest.TestCase):
# Test return content # Test return content
response = user_misp_connector.search(timestamp=timeframe, metadata=False) response = user_misp_connector.search(timestamp=timeframe, metadata=False)
self.assertEqual(len(response), 1) self.assertEqual(len(response), 1)
t = MISPEvent() t = response[0]
t.load(response[0])
self.assertEqual(len(t.attributes), 1) self.assertEqual(len(t.attributes), 1)
response = user_misp_connector.search(timestamp=timeframe, metadata=True) response = user_misp_connector.search(timestamp=timeframe, metadata=True)
self.assertEqual(len(response), 1) self.assertEqual(len(response), 1)
t = MISPEvent() t = response[0]
t.load(response[0])
self.assertEqual(len(t.attributes), 0) self.assertEqual(len(t.attributes), 0)
# other things # other things
response = user_misp_connector.search(timestamp=timeframe, published=True) response = user_misp_connector.search(timestamp=timeframe, published=True)