chg: More testing improvments

pull/265/head
Raphaël Vinot 2018-08-21 11:16:51 +02:00
parent 99271bac24
commit 3ee90602ea
1 changed files with 167 additions and 114 deletions

View File

@ -3,7 +3,7 @@
import unittest import unittest
from pymisp import ExpandedPyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPAttribute from pymisp import ExpandedPyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis
from datetime import datetime, timedelta from datetime import datetime, timedelta
import time import time
@ -66,6 +66,8 @@ class TestComprehensive(unittest.TestCase):
first_event.analysis = Analysis.completed first_event.analysis = Analysis.completed
first_event.set_date("2017-12-31") first_event.set_date("2017-12-31")
first_event.add_attribute('text', str(uuid4())) first_event.add_attribute('text', str(uuid4()))
first_event.attributes[0].add_tag('admin_only')
first_event.attributes[0].add_tag('tlp:white___test')
second_event = MISPEvent() second_event = MISPEvent()
second_event.info = 'Second event - org only - medium - ongoing' second_event.info = 'Second event - org only - medium - ongoing'
@ -109,14 +111,18 @@ class TestComprehensive(unittest.TestCase):
try: try:
first, second, third = self.environment() first, second, third = self.environment()
# Search as admin # Search as admin
response = self.admin_misp_connector.search(value=first.attributes[0].value) events = self.admin_misp_connector.search(value=first.attributes[0].value)
self.assertEqual(len(response), 2) self.assertEqual(len(events), 2)
for e in events:
self.assertIn(e.id, [first.id, second.id])
# Search as user # Search as user
response = self.user_misp_connector.search(value=first.attributes[0].value) events = self.user_misp_connector.search(value=first.attributes[0].value)
self.assertEqual(len(response), 1) self.assertEqual(len(events), 1)
for e in events:
self.assertIn(e.id, [second.id])
# Non-existing value # Non-existing value
response = self.user_misp_connector.search(value=str(uuid4())) events = self.user_misp_connector.search(value=str(uuid4()))
self.assertEqual(response, []) self.assertEqual(events, [])
finally: finally:
# Delete events # Delete events
self.admin_misp_connector.delete_event(first.id) self.admin_misp_connector.delete_event(first.id)
@ -127,31 +133,39 @@ class TestComprehensive(unittest.TestCase):
try: try:
first, second, third = self.environment() first, second, third = self.environment()
# Search as admin # Search as admin
response = self.admin_misp_connector.search(controller='attributes', value=first.attributes[0].value) attributes = self.admin_misp_connector.search(controller='attributes', value=first.attributes[0].value)
self.assertEqual(len(response), 2) self.assertEqual(len(attributes), 2)
for a in attributes:
self.assertIn(a.event_id, [first.id, second.id])
# Search as user # Search as user
response = self.user_misp_connector.search(controller='attributes', value=first.attributes[0].value) attributes = self.user_misp_connector.search(controller='attributes', value=first.attributes[0].value)
self.assertEqual(len(response), 1) self.assertEqual(len(attributes), 1)
for a in attributes:
self.assertIn(a.event_id, [second.id])
# Non-existing value # Non-existing value
response = self.user_misp_connector.search(controller='attributes', value=str(uuid4())) attributes = self.user_misp_connector.search(controller='attributes', value=str(uuid4()))
self.assertEqual(response, []) self.assertEqual(attributes, [])
finally: finally:
# Delete event # Delete event
self.admin_misp_connector.delete_event(first.id) self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id) self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id) self.admin_misp_connector.delete_event(third.id)
@unittest.skip("Currently failing") # @unittest.skip("Currently failing")
def test_search_type_event(self): def test_search_type_event(self):
try: try:
first, second, third = self.environment() first, second, third = self.environment()
# Search as admin # Search as admin
response = self.admin_misp_connector.search(timestamp=first.timestamp.timestamp()) events = self.admin_misp_connector.search(timestamp=first.timestamp.timestamp())
self.assertEqual(len(response), 3) self.assertEqual(len(events), 3)
attrubutes_types_search = self.admin_misp_connector.build_complex_query(or_parameters=['ip-src', 'ip-dst']) for e in events:
response = self.admin_misp_connector.search(controller='events', timestamp=first.timestamp.timestamp(), self.assertIn(e.id, [first.id, second.id, third.id])
type_attribute=attrubutes_types_search) attributes_types_search = self.admin_misp_connector.build_complex_query(or_parameters=['ip-src', 'ip-dst'])
self.assertEqual(len(response), 2) events = self.admin_misp_connector.search(timestamp=first.timestamp.timestamp(),
type_attribute=attributes_types_search)
self.assertEqual(len(events), 1)
for e in events:
self.assertIn(e.id, [third.id])
finally: finally:
# Delete event # Delete event
self.admin_misp_connector.delete_event(first.id) self.admin_misp_connector.delete_event(first.id)
@ -162,12 +176,19 @@ class TestComprehensive(unittest.TestCase):
try: try:
first, second, third = self.environment() first, second, third = self.environment()
# Search as admin # Search as admin
response = self.admin_misp_connector.search(controller='attributes', timestamp=first.timestamp.timestamp()) attributes = self.admin_misp_connector.search(controller='attributes',
self.assertEqual(len(response), 7) timestamp=first.timestamp.timestamp())
attrubutes_types_search = self.admin_misp_connector.build_complex_query(or_parameters=['ip-src', 'ip-dst']) self.assertEqual(len(attributes), 7)
response = self.admin_misp_connector.search(controller='attributes', timestamp=first.timestamp.timestamp(), for a in attributes:
type_attribute=attrubutes_types_search) self.assertIn(a.event_id, [first.id, second.id, third.id])
self.assertEqual(len(response), 3) # Search as user
attributes_types_search = self.admin_misp_connector.build_complex_query(or_parameters=['ip-src', 'ip-dst'])
attributes = self.admin_misp_connector.search(controller='attributes',
timestamp=first.timestamp.timestamp(),
type_attribute=attributes_types_search)
self.assertEqual(len(attributes), 3)
for a in attributes:
self.assertIn(a.event_id, [second.id, third.id])
finally: finally:
# Delete event # Delete event
self.admin_misp_connector.delete_event(first.id) self.admin_misp_connector.delete_event(first.id)
@ -178,15 +199,29 @@ class TestComprehensive(unittest.TestCase):
try: try:
first, second, third = self.environment() first, second, third = self.environment()
# Search as admin # Search as admin
response = self.admin_misp_connector.search(tags='tlp:white___test') events = self.admin_misp_connector.search(tags='tlp:white___test')
self.assertEqual(len(response), 2) self.assertEqual(len(events), 3)
response = self.admin_misp_connector.search(tags='tlp:amber___test') for e in events:
self.assertEqual(len(response), 1) self.assertIn(e.id, [first.id, second.id, third.id])
events = self.admin_misp_connector.search(tags='tlp:amber___test')
self.assertEqual(len(events), 1)
for e in events:
self.assertIn(e.id, [third.id])
events = self.admin_misp_connector.search(tags='admin_only')
self.assertEqual(len(events), 1)
for e in events:
self.assertIn(e.id, [first.id])
# Search as user # Search as user
response = self.user_misp_connector.search(tags='tlp:white___test') events = self.user_misp_connector.search(tags='tlp:white___test')
self.assertEqual(len(response), 1) self.assertEqual(len(events), 2)
response = self.user_misp_connector.search(tags='tlp:amber___test') for e in events:
self.assertEqual(len(response), 0) self.assertIn(e.id, [second.id, third.id])
events = self.user_misp_connector.search(tags='tlp:amber___test')
self.assertEqual(len(events), 1)
for e in events:
self.assertIn(e.id, [third.id])
events = self.user_misp_connector.search(tags='admin_only')
self.assertEqual(events, [])
finally: finally:
# Delete event # Delete event
self.admin_misp_connector.delete_event(first.id) self.admin_misp_connector.delete_event(first.id)
@ -197,15 +232,19 @@ class TestComprehensive(unittest.TestCase):
try: try:
first, second, third = self.environment() first, second, third = self.environment()
# Search as admin # Search as admin
response = self.admin_misp_connector.search(controller='attributes', tags='tlp:white___test') attributes = self.admin_misp_connector.search(controller='attributes', tags='tlp:white___test')
self.assertEqual(len(response), 4) self.assertEqual(len(attributes), 5)
response = self.admin_misp_connector.search(controller='attributes', tags='tlp:amber___test') attributes = self.admin_misp_connector.search(controller='attributes', tags='tlp:amber___test')
self.assertEqual(len(response), 1) self.assertEqual(len(attributes), 2)
attributes = self.admin_misp_connector.search(tags='admin_only')
self.assertEqual(len(attributes), 1)
# Search as user # Search as user
response = self.user_misp_connector.search(controller='attributes', tags='tlp:white___test') attributes = self.user_misp_connector.search(controller='attributes', tags='tlp:white___test')
self.assertEqual(len(response), 1) self.assertEqual(len(attributes), 4)
response = self.user_misp_connector.search(controller='attributes', tags='tlp:amber___test') attributes = self.user_misp_connector.search(controller='attributes', tags='tlp:amber___test')
self.assertEqual(len(response), 0) self.assertEqual(len(attributes), 2)
attributes = self.user_misp_connector.search(tags='admin_only')
self.assertEqual(attributes, [])
finally: finally:
# Delete event # Delete event
self.admin_misp_connector.delete_event(first.id) self.admin_misp_connector.delete_event(first.id)
@ -219,11 +258,21 @@ class TestComprehensive(unittest.TestCase):
not_parameters=['tlp:amber___test', not_parameters=['tlp:amber___test',
'foo_double___test']) 'foo_double___test'])
events = self.admin_misp_connector.search(tags=complex_query) events = self.admin_misp_connector.search(tags=complex_query)
self.assertEqual(len(events), 3)
for e in events: for e in events:
self.assertIn(e.id, [first.id, second.id, third.id])
for a in e.attributes: for a in e.attributes:
self.assertEqual([t for t in a.tags if t.name == 'tlp:amber___test'], []) self.assertEqual([t for t in a.tags if t.name == 'tlp:amber___test'], [])
for a in e.attributes: for a in e.attributes:
self.assertEqual([t for t in a.tags if t.name == 'foo_double___test'], []) self.assertEqual([t for t in a.tags if t.name == 'foo_double___test'], [])
complex_query = self.admin_misp_connector.build_complex_query(not_parameters=['tlp:white___test'])
events = self.admin_misp_connector.search(tags=complex_query)
self.assertEqual(len(events), 2)
for e in events:
self.assertIn(e.id, [first.id, second.id])
for a in e.attributes:
self.assertEqual([t for t in a.tags if t.name == 'tlp:white___test'], [])
finally: finally:
# Delete event # Delete event
self.admin_misp_connector.delete_event(first.id) self.admin_misp_connector.delete_event(first.id)
@ -237,6 +286,7 @@ class TestComprehensive(unittest.TestCase):
not_parameters=['tlp:amber___test', not_parameters=['tlp:amber___test',
'foo_double___test']) 'foo_double___test'])
attributes = self.admin_misp_connector.search(controller='attributes', tags=complex_query) attributes = self.admin_misp_connector.search(controller='attributes', tags=complex_query)
self.assertEqual(len(attributes), 3)
for a in attributes: for a in attributes:
self.assertEqual([t for t in a.tags if t.name == 'tlp:amber___test'], []) self.assertEqual([t for t in a.tags if t.name == 'tlp:amber___test'], [])
for a in attributes: for a in attributes:
@ -261,28 +311,28 @@ class TestComprehensive(unittest.TestCase):
second = self.user_misp_connector.add_event(second) second = self.user_misp_connector.add_event(second)
# Search as user # Search as user
# # Test - last 4 min # # Test - last 4 min
response = self.user_misp_connector.search(timestamp='4m') events = self.user_misp_connector.search(timestamp='4m')
self.assertEqual(len(response), 1) self.assertEqual(len(events), 1)
received_event = response[0] self.assertEqual(events[0].id, second.id)
self.assertEqual(received_event.timestamp.timestamp(), int(event_creation_timestamp_second.timestamp())) self.assertEqual(events[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp()))
# # Test timestamp of 2nd event # # Test timestamp of 2nd event
response = self.user_misp_connector.search(timestamp=event_creation_timestamp_second.timestamp()) events = self.user_misp_connector.search(timestamp=event_creation_timestamp_second.timestamp())
self.assertEqual(len(response), 1) self.assertEqual(len(events), 1)
received_event = response[0] self.assertEqual(events[0].id, second.id)
self.assertEqual(received_event.timestamp.timestamp(), int(event_creation_timestamp_second.timestamp())) self.assertEqual(events[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp()))
# # Test interval -6 min -> -4 min # # Test interval -6 min -> -4 min
response = self.user_misp_connector.search(timestamp=['6m', '4m']) events = self.user_misp_connector.search(timestamp=['6m', '4m'])
self.assertEqual(len(response), 1) self.assertEqual(len(events), 1)
received_event = response[0] self.assertEqual(events[0].id, first.id)
self.assertEqual(received_event.timestamp.timestamp(), int(event_creation_timestamp_first.timestamp())) self.assertEqual(events[0].timestamp.timestamp(), int(event_creation_timestamp_first.timestamp()))
finally: finally:
# Delete event # Delete event
self.admin_misp_connector.delete_event(first.id) self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id) self.admin_misp_connector.delete_event(second.id)
def test_search_timestamp_atttibute(self): def test_search_timestamp_attribute(self):
# Creating event 1 - timestamp 5 min ago # Creating event 1 - timestamp 5 min ago
first = self.create_simple_event(force_timestamps=True) first = self.create_simple_event(force_timestamps=True)
event_creation_timestamp_first = datetime.now() - timedelta(minutes=5) event_creation_timestamp_first = datetime.now() - timedelta(minutes=5)
@ -298,22 +348,22 @@ class TestComprehensive(unittest.TestCase):
second = self.user_misp_connector.add_event(second) second = self.user_misp_connector.add_event(second)
# Search as user # Search as user
# # Test - last 4 min # # Test - last 4 min
response = self.user_misp_connector.search(controller='attributes', timestamp='4m') attributes = self.user_misp_connector.search(controller='attributes', timestamp='4m')
self.assertEqual(len(response), 1) self.assertEqual(len(attributes), 1)
received_event = response[0] self.assertEqual(attributes[0].event_id, second.id)
self.assertEqual(received_event.timestamp.timestamp(), int(event_creation_timestamp_second.timestamp())) self.assertEqual(attributes[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp()))
# # Test timestamp of 2nd event # # Test timestamp of 2nd event
response = self.user_misp_connector.search(controller='attributes', timestamp=event_creation_timestamp_second.timestamp()) attributes = self.user_misp_connector.search(controller='attributes', timestamp=event_creation_timestamp_second.timestamp())
self.assertEqual(len(response), 1) self.assertEqual(len(attributes), 1)
received_event = response[0] self.assertEqual(attributes[0].event_id, second.id)
self.assertEqual(received_event.timestamp.timestamp(), int(event_creation_timestamp_second.timestamp())) self.assertEqual(attributes[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp()))
# # Test interval -6 min -> -4 min # # Test interval -6 min -> -4 min
response = self.user_misp_connector.search(controller='attributes', timestamp=['6m', '4m']) attributes = self.user_misp_connector.search(controller='attributes', timestamp=['6m', '4m'])
self.assertEqual(len(response), 1) self.assertEqual(len(attributes), 1)
received_event = response[0] self.assertEqual(attributes[0].event_id, first.id)
self.assertEqual(received_event.timestamp.timestamp(), int(event_creation_timestamp_first.timestamp())) self.assertEqual(attributes[0].timestamp.timestamp(), int(event_creation_timestamp_first.timestamp()))
finally: finally:
# Delete event # Delete event
self.admin_misp_connector.delete_event(first.id) self.admin_misp_connector.delete_event(first.id)
@ -321,7 +371,7 @@ class TestComprehensive(unittest.TestCase):
def test_user_perms(self): def test_user_perms(self):
try: try:
first = self.create_simple_event(force_timestamps=True) first = self.create_simple_event()
first.publish() first.publish()
# Add event as user, no publish rights # Add event as user, no publish rights
first = self.user_misp_connector.add_event(first) first = self.user_misp_connector.add_event(first)
@ -347,24 +397,27 @@ class TestComprehensive(unittest.TestCase):
time.sleep(10) time.sleep(10)
second = self.pub_misp_connector.add_event(second) second = self.pub_misp_connector.add_event(second)
# Test invalid query # Test invalid query
response = self.pub_misp_connector.search(publish_timestamp='5x') events = self.pub_misp_connector.search(publish_timestamp='5x')
self.assertEqual(len(response), 0) self.assertEqual(events, [])
response = self.pub_misp_connector.search(publish_timestamp='ad') events = self.pub_misp_connector.search(publish_timestamp='ad')
self.assertEqual(len(response), 0) self.assertEqual(events, [])
response = self.pub_misp_connector.search(publish_timestamp='aaad') events = self.pub_misp_connector.search(publish_timestamp='aaad')
self.assertEqual(len(response), 0) self.assertEqual(events, [])
# Test - last 4 min # Test - last 4 min
response = self.pub_misp_connector.search(publish_timestamp='5s') events = self.pub_misp_connector.search(publish_timestamp='5s')
self.assertEqual(len(response), 1) self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
# Test 5 sec before timestamp of 2nd event # Test 5 sec before timestamp of 2nd event
response = self.pub_misp_connector.search(publish_timestamp=(second.publish_timestamp.timestamp())) events = self.pub_misp_connector.search(publish_timestamp=(second.publish_timestamp.timestamp()))
self.assertEqual(len(response), 1) self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
# Test interval -6 min -> -4 min # Test interval -6 min -> -4 min
response = self.pub_misp_connector.search(publish_timestamp=[first.publish_timestamp.timestamp() - 5, events = self.pub_misp_connector.search(publish_timestamp=[first.publish_timestamp.timestamp() - 5,
second.publish_timestamp.timestamp() - 5]) second.publish_timestamp.timestamp() - 5])
self.assertEqual(len(response), 1) self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
finally: finally:
# Delete event # Delete event
self.admin_misp_connector.delete_event(first.id) self.admin_misp_connector.delete_event(first.id)
@ -376,42 +429,42 @@ class TestComprehensive(unittest.TestCase):
try: try:
first = self.user_misp_connector.add_event(first) first = self.user_misp_connector.add_event(first)
timeframe = [first.timestamp.timestamp() - 5, first.timestamp.timestamp() + 5] timeframe = [first.timestamp.timestamp() - 5, first.timestamp.timestamp() + 5]
# Search event we just created in multiple ways. Make sure it doesn't catchi it when it shouldn't # Search event we just created in multiple ways. Make sure it doesn't catch it when it shouldn't
response = self.user_misp_connector.search(timestamp=timeframe) events = self.user_misp_connector.search(timestamp=timeframe)
self.assertEqual(len(response), 1) self.assertEqual(len(events), 1)
response = self.user_misp_connector.search(timestamp=timeframe, value='nothere') self.assertEqual(events[0].id, first.id)
self.assertEqual(len(response), 0) events = self.user_misp_connector.search(timestamp=timeframe, value='nothere')
response = self.user_misp_connector.search(timestamp=timeframe, value=first.attributes[0].value) self.assertEqual(events, [])
self.assertEqual(len(response), 1) events = self.user_misp_connector.search(timestamp=timeframe, value=first.attributes[0].value)
response = self.user_misp_connector.search(timestamp=[first.timestamp.timestamp() - 50, self.assertEqual(len(events), 1)
first.timestamp.timestamp() - 10], self.assertEqual(events[0].id, first.id)
value=first.attributes[0].value) events = self.user_misp_connector.search(timestamp=[first.timestamp.timestamp() - 50,
self.assertEqual(len(response), 0) first.timestamp.timestamp() - 10],
value=first.attributes[0].value)
self.assertEqual(events, [])
# Test return content # Test return content
response = self.user_misp_connector.search(timestamp=timeframe, metadata=False) events = self.user_misp_connector.search(timestamp=timeframe, metadata=False)
self.assertEqual(len(response), 1) self.assertEqual(len(events), 1)
t = response[0] self.assertEqual(len(events[0].attributes), 1)
self.assertEqual(len(t.attributes), 1) events = self.user_misp_connector.search(timestamp=timeframe, metadata=True)
response = self.user_misp_connector.search(timestamp=timeframe, metadata=True) self.assertEqual(len(events), 1)
self.assertEqual(len(response), 1) self.assertEqual(len(events[0].attributes), 0)
t = response[0]
self.assertEqual(len(t.attributes), 0)
# other things # other things
response = self.user_misp_connector.search(timestamp=timeframe, published=True) events = self.user_misp_connector.search(timestamp=timeframe, published=True)
self.assertEqual(len(response), 0) self.assertEqual(events, [])
response = self.user_misp_connector.search(timestamp=timeframe, published=False) events = self.user_misp_connector.search(timestamp=timeframe, published=False)
self.assertEqual(len(response), 1) self.assertEqual(len(events), 1)
response = self.user_misp_connector.search(eventid=first.id) events = self.user_misp_connector.search(eventid=first.id)
self.assertEqual(len(response), 1) self.assertEqual(len(events), 1)
response = self.user_misp_connector.search(uuid=first.uuid) events = self.user_misp_connector.search(uuid=first.uuid)
self.assertEqual(len(response), 1) self.assertEqual(len(events), 1)
response = self.user_misp_connector.search(org=first.orgc_id) events = self.user_misp_connector.search(org=first.orgc_id)
self.assertEqual(len(response), 1) self.assertEqual(len(events), 1)
# test like search # test like search
response = self.user_misp_connector.search(timestamp=timeframe, value='%{}%'.format(first.attributes[0].value.split('-')[2])) events = self.user_misp_connector.search(timestamp=timeframe, value='%{}%'.format(first.attributes[0].value.split('-')[2]))
self.assertEqual(len(response), 1) self.assertEqual(len(events), 1)
response = self.user_misp_connector.search(timestamp=timeframe, eventinfo='%bar blah%') events = self.user_misp_connector.search(timestamp=timeframe, eventinfo='%bar blah%')
self.assertEqual(len(response), 1) self.assertEqual(len(events), 1)
finally: finally:
# Delete event # Delete event