chg: Update and improve live testing

pull/471/head
Raphaël Vinot 2019-08-16 10:48:06 +02:00
parent 2d37c68bd7
commit 2e84dd69fc
3 changed files with 209 additions and 178 deletions

View File

@ -160,6 +160,10 @@ class ExpandedPyMISP(PyMISP):
response = self._prepare_request('POST', f'/servers/serverSettingsEdit/{setting}', data=data)
return self._check_response(response, expect_json=True)
def get_server_setting(self, setting: str):
response = self._prepare_request('GET', f'/servers/getSetting/{setting}')
return self._check_response(response, expect_json=True)
def server_settings(self):
response = self._prepare_request('GET', f'/servers/serverSettings')
return self._check_response(response, expect_json=True)
@ -210,6 +214,8 @@ class ExpandedPyMISP(PyMISP):
'''Update an event on a MISP instance'''
if event_id is None:
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
else:
event_id = self.__get_uuid_or_id_from_abstract_misp(event_id)
updated_event = self._prepare_request('POST', f'events/{event_id}', data=event)
updated_event = self._check_response(updated_event, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in updated_event:
@ -265,6 +271,8 @@ class ExpandedPyMISP(PyMISP):
'''Update an object on a MISP instance'''
if object_id is None:
object_id = self.__get_uuid_or_id_from_abstract_misp(misp_object)
else:
object_id = self.__get_uuid_or_id_from_abstract_misp(object_id)
updated_object = self._prepare_request('POST', f'objects/edit/{object_id}', data=misp_object)
updated_object = self._check_response(updated_object, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in updated_object:
@ -391,6 +399,8 @@ class ExpandedPyMISP(PyMISP):
'''Update an attribute on a MISP instance'''
if attribute_id is None:
attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute)
else:
attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute_id)
updated_attribute = self._prepare_request('POST', f'attributes/edit/{attribute_id}', data=attribute)
updated_attribute = self._check_response(updated_attribute, expect_json=True)
if ('errors' in updated_attribute and updated_attribute['errors'][0] == 403
@ -614,6 +624,8 @@ class ExpandedPyMISP(PyMISP):
"""Edit only the provided parameters of a tag."""
if tag_id is None:
tag_id = self.__get_uuid_or_id_from_abstract_misp(tag)
else:
tag_id = self.__get_uuid_or_id_from_abstract_misp(tag_id)
# FIXME: inconsistency in MISP: https://github.com/MISP/MISP/issues/4852
tag = {'Tag': tag}
updated_tag = self._prepare_request('POST', f'tags/edit/{tag_id}', data=tag)
@ -925,6 +937,8 @@ class ExpandedPyMISP(PyMISP):
'''Update a feed on a MISP instance'''
if feed_id is None:
feed_id = self.__get_uuid_or_id_from_abstract_misp(feed)
else:
feed_id = self.__get_uuid_or_id_from_abstract_misp(feed_id)
# FIXME: https://github.com/MISP/MISP/issues/4834
feed = {'Feed': feed}
updated_feed = self._prepare_request('POST', f'feeds/edit/{feed_id}', data=feed)
@ -991,7 +1005,7 @@ class ExpandedPyMISP(PyMISP):
return to_return
def get_sync_config(self, pythonify: bool=False):
'''WARNING: This method only works if the current user is a sync user'''
'''WARNING: This method only works if the user calling it is a sync user'''
server = self._prepare_request('GET', 'servers/createSync')
server = self._check_response(server, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in server:
@ -1001,7 +1015,7 @@ class ExpandedPyMISP(PyMISP):
return s
def import_server(self, server: MISPServer, pythonify: bool=False):
"""Import a sync server config"""
"""Import a sync server config received from get_sync_config"""
server = self._prepare_request('POST', f'servers/import', data=server)
server = self._check_response(server, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in server:
@ -1011,7 +1025,8 @@ class ExpandedPyMISP(PyMISP):
return s
def add_server(self, server: MISPServer, pythonify: bool=False):
"""Add a server to synchronise with"""
"""Add a server to synchronise with.
Note: You probably fant to use ExpandedPyMISP.get_sync_config and ExpandedPyMISP.import_server instead"""
server = self._prepare_request('POST', f'servers/add', data=server)
server = self._check_response(server, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in server:
@ -1024,6 +1039,8 @@ class ExpandedPyMISP(PyMISP):
'''Update a server to synchronise with'''
if server_id is None:
server_id = self.__get_uuid_or_id_from_abstract_misp(server)
else:
server_id = self.__get_uuid_or_id_from_abstract_misp(server_id)
updated_server = self._prepare_request('POST', f'servers/edit/{server_id}', data=server)
updated_server = self._check_response(updated_server, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in updated_server:
@ -1196,6 +1213,8 @@ class ExpandedPyMISP(PyMISP):
'''Update an organisation'''
if organisation_id is None:
organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation)
else:
organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation_id)
updated_organisation = self._prepare_request('POST', f'admin/organisations/edit/{organisation_id}', data=organisation)
updated_organisation = self._check_response(updated_organisation, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in updated_organisation:
@ -1253,6 +1272,8 @@ class ExpandedPyMISP(PyMISP):
'''Update an event on a MISP instance'''
if user_id is None:
user_id = self.__get_uuid_or_id_from_abstract_misp(user)
else:
user_id = self.__get_uuid_or_id_from_abstract_misp(user_id)
updated_user = self._prepare_request('POST', f'admin/users/edit/{user_id}', data=user)
updated_user = self._check_response(updated_user, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in updated_user:
@ -1601,6 +1622,8 @@ class ExpandedPyMISP(PyMISP):
url_path = f'sightings/restSearch/{context}'
else:
url_path = 'sightings/restSearch'
if isinstance(context_id, (MISPEvent, MISPAttribute)):
context_id = self.__get_uuid_or_id_from_abstract_misp(context_id)
query['id'] = context_id
query['type'] = type_sighting
query['from'] = date_from
@ -1862,9 +1885,15 @@ class ExpandedPyMISP(PyMISP):
return str(obj)
if isinstance(obj, (int, str)):
return obj
if 'id' in obj:
if self._old_misp((2, 4, 113), '2020-01-01', sys._getframe().f_code.co_name, message='MISP now accepts UUIDs to access entiries, usinf it is a lot safer across instances. Just update your MISP instance, plz.'):
if 'id' in obj:
return obj['id']
if isinstance(obj, MISPShadowAttribute):
# A ShadowAttribute has the same UUID as the related Attribute, we *need* to use the ID
return obj['id']
return obj['uuid']
if 'uuid' in obj:
return obj['uuid']
return obj['id']
def _make_misp_bool(self, parameter: Union[bool, str, None]):
'''MISP wants 0 or 1 for bool, so we avoid True/False '0', '1' '''

View File

@ -82,11 +82,11 @@ class TestComprehensive(unittest.TestCase):
@classmethod
def tearDownClass(cls):
# Delete publisher
cls.admin_misp_connector.delete_user(cls.test_pub.id)
cls.admin_misp_connector.delete_user(cls.test_pub)
# Delete user
cls.admin_misp_connector.delete_user(cls.test_usr.id)
cls.admin_misp_connector.delete_user(cls.test_usr)
# Delete org
cls.admin_misp_connector.delete_organisation(cls.test_org.id)
cls.admin_misp_connector.delete_organisation(cls.test_org)
def create_simple_event(self, force_timestamps=False):
mispevent = MISPEvent(force_timestamps=force_timestamps)
@ -152,25 +152,18 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(final_setting['value'], 5000)
break
self.admin_misp_connector.set_server_setting('MISP.max_correlations_per_event', 10)
settings = self.admin_misp_connector.server_settings()
for final_setting in settings['finalSettings']:
if final_setting['setting'] == 'MISP.max_correlations_per_event':
self.assertEqual(final_setting['value'], 10)
break
setting = self.admin_misp_connector.get_server_setting('MISP.max_correlations_per_event')
self.assertEqual(setting['value'], 10)
self.admin_misp_connector.set_server_setting('MISP.max_correlations_per_event', 5000)
settings = self.admin_misp_connector.server_settings()
for final_setting in settings['finalSettings']:
if final_setting['setting'] == 'MISP.live':
self.assertTrue(final_setting['value'])
break
setting = self.admin_misp_connector.get_server_setting('MISP.live')
self.assertTrue(setting['value'])
self.admin_misp_connector.set_server_setting('MISP.live', False, force=True)
settings = self.admin_misp_connector.server_settings()
for final_setting in settings['finalSettings']:
if final_setting['setting'] == 'MISP.live':
self.assertFalse(final_setting['value'])
break
setting = self.admin_misp_connector.get_server_setting('MISP.live')
self.assertFalse(setting['value'])
self.admin_misp_connector.set_server_setting('MISP.live', True, force=True)
setting = self.admin_misp_connector.get_server_setting('MISP.live')
self.assertTrue(setting['value'])
def test_search_value_event(self):
'''Search a value on the event controller
@ -194,9 +187,9 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(events, [])
finally:
# Delete events
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
self.admin_misp_connector.delete_event(third)
def test_search_value_attribute(self):
'''Search value in attributes controller'''
@ -245,9 +238,9 @@ class TestComprehensive(unittest.TestCase):
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
self.admin_misp_connector.delete_event(third)
def test_search_type_event(self):
'''Search multiple events, search events containing attributes with specific types'''
@ -266,9 +259,9 @@ class TestComprehensive(unittest.TestCase):
self.assertIn(e.id, [second.id, third.id])
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
self.admin_misp_connector.delete_event(third)
def test_search_type_attribute(self):
'''Search multiple attributes, search attributes with specific types'''
@ -290,9 +283,9 @@ class TestComprehensive(unittest.TestCase):
self.assertIn(a.event_id, [second.id, third.id])
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
self.admin_misp_connector.delete_event(third)
def test_search_tag_event(self):
'''Search Tags at events level'''
@ -324,9 +317,9 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(events, [])
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
self.admin_misp_connector.delete_event(third)
def test_search_tag_attribute(self):
'''Search Tags at attributes level'''
@ -351,9 +344,9 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(len(attributes), 1)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
self.admin_misp_connector.delete_event(third)
def test_search_tag_advanced_event(self):
'''Advanced search Tags at events level'''
@ -381,9 +374,9 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual([t for t in a.tags if t.name == 'tlp:white___test'], [])
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
self.admin_misp_connector.delete_event(third)
def test_search_tag_advanced_attributes(self):
'''Advanced search Tags at attributes level'''
@ -400,9 +393,9 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual([t for t in a.tags if t.name == 'foo_double___test'], [])
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
self.admin_misp_connector.delete_event(third)
def test_search_timestamp_event(self):
'''Search specific update timestamps at events level'''
@ -437,8 +430,8 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(events[0].timestamp.timestamp(), int(event_creation_timestamp_first.timestamp()))
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
def test_search_timestamp_attribute(self):
'''Search specific update timestamps at attributes level'''
@ -475,8 +468,8 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(attributes[0].timestamp.timestamp(), int(event_creation_timestamp_first.timestamp()))
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
def test_user_perms(self):
'''Test publish rights'''
@ -492,7 +485,7 @@ class TestComprehensive(unittest.TestCase):
self.assertTrue(first.published)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(first)
def test_delete_by_uuid(self):
try:
@ -503,14 +496,13 @@ class TestComprehensive(unittest.TestCase):
first = self.user_misp_connector.add_event(first)
r = self.user_misp_connector.delete_attribute(first.attributes[0].uuid)
self.assertEqual(r['message'], 'Attribute deleted.')
# FIXME https://github.com/MISP/MISP/issues/4974
# r = self.user_misp_connector.delete_object(first.objects[0].uuid)
# self.assertEqual(r['message'], 'Object deleted.')
# r = self.user_misp_connector.delete_event(first.uuid)
# self.assertEqual(r['message'], 'Event deleted.')
r = self.user_misp_connector.delete_object(first.objects[0].uuid)
self.assertEqual(r['message'], 'Object deleted')
r = self.user_misp_connector.delete_event(first.uuid)
self.assertEqual(r['message'], 'Event deleted.')
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(first)
def test_search_publish_timestamp(self):
'''Search for a specific publication timestamp, an interval, and invalid values.'''
@ -548,8 +540,8 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(events[0].id, first.id)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
def test_default_distribution(self):
'''The default distributions on the VM are This community only for the events and Inherit from event for attr/obj)'''
@ -573,13 +565,13 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(first.objects[1].distribution, Distribution.inherit.value)
self.assertEqual(first.objects[1].attributes[0].distribution, Distribution.inherit.value)
# Attribute create
attribute = self.user_misp_connector.add_attribute(first.id, {'type': 'comment', 'value': 'bar'})
attribute = self.user_misp_connector.add_attribute(first, {'type': 'comment', 'value': 'bar'})
self.assertEqual(attribute.value, 'bar', attribute.to_json())
self.assertEqual(attribute.distribution, Distribution.inherit.value, attribute.to_json())
# Object - add
o = MISPObject('file')
o.add_attribute('filename', value='blah.exe')
new_obj = self.user_misp_connector.add_object(first.id, o)
new_obj = self.user_misp_connector.add_object(first, o)
self.assertEqual(new_obj.distribution, int(Distribution.inherit.value))
self.assertEqual(new_obj.attributes[0].distribution, int(Distribution.inherit.value))
# Object - edit
@ -591,7 +583,7 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(a.distribution, int(Distribution.inherit.value))
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(first)
def test_simple_event(self):
'''Search a bunch of parameters:
@ -781,8 +773,8 @@ class TestComprehensive(unittest.TestCase):
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
def test_edit_attribute(self):
first = self.create_simple_event()
@ -791,16 +783,18 @@ class TestComprehensive(unittest.TestCase):
first = self.user_misp_connector.add_event(first)
first.attributes[0].comment = 'This is the modified comment'
attribute = self.user_misp_connector.update_attribute(first.attributes[0])
self.assertTrue(isinstance(attribute, MISPAttribute), attribute)
self.assertEqual(attribute.comment, 'This is the modified comment')
attribute = self.user_misp_connector.update_attribute({'comment': 'This is the modified comment, again'}, attribute.id)
self.assertEqual(attribute.comment, 'This is the modified comment, again')
attribute = self.user_misp_connector.update_attribute({'disable_correlation': True}, attribute.id)
self.assertTrue(attribute.disable_correlation)
attribute = self.user_misp_connector.update_attribute({'disable_correlation': False}, attribute.id)
self.assertFalse(attribute.disable_correlation)
attribute = self.user_misp_connector.update_attribute({'comment': 'This is the modified comment, again'}, attribute)
self.assertTrue(isinstance(attribute, MISPAttribute), attribute)
self.assertEqual(attribute.comment, 'This is the modified comment, again', attribute)
attribute = self.user_misp_connector.update_attribute({'disable_correlation': True}, attribute)
self.assertTrue(attribute.disable_correlation, attribute)
attribute = self.user_misp_connector.update_attribute({'disable_correlation': False}, attribute)
self.assertFalse(attribute.disable_correlation, attribute)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(first)
def test_sightings(self):
first = self.create_simple_event()
@ -820,8 +814,8 @@ class TestComprehensive(unittest.TestCase):
s.source = 'Testcases'
s.type = '1'
# NOTE: no pythonify available yet
# r = self.user_misp_connector.add_sighting(s, second.attributes[0].id)
r = self.user_misp_connector.add_sighting(s, second.attributes[0].id)
# r = self.user_misp_connector.add_sighting(s, second.attributes[0])
r = self.user_misp_connector.add_sighting(s, second.attributes[0])
self.assertEqual(r['message'], 'Sighting added')
s = self.user_misp_connector.search_sightings(publish_timestamp=current_ts, include_attribute=True,
@ -867,11 +861,11 @@ class TestComprehensive(unittest.TestCase):
# NOTE: no pythonify available yet
# r = self.admin_misp_connector.add_sighting(s, second.attributes[0].id, pythonify=True)
r = self.admin_misp_connector.add_sighting(s, second.attributes[0].id)
r = self.admin_misp_connector.add_sighting(s, second.attributes[0])
self.assertEqual(r['message'], 'Sighting added')
s = self.user_misp_connector.sightings(second.attributes[0])
self.assertEqual(len(s), 2)
s = self.user_misp_connector.sightings(second.attributes[0], self.test_org.id)
s = self.user_misp_connector.sightings(second.attributes[0], self.test_org)
self.assertEqual(len(s), 1)
self.assertEqual(s[0].org_id, self.test_org.id)
# Delete sighting
@ -880,8 +874,8 @@ class TestComprehensive(unittest.TestCase):
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
def test_search_csv(self):
first = self.create_simple_event()
@ -894,13 +888,13 @@ class TestComprehensive(unittest.TestCase):
first = self.user_misp_connector.add_event(first)
second = self.user_misp_connector.add_event(second)
response = self.user_misp_connector.publish(first.id, alert=False)
response = self.user_misp_connector.publish(first, alert=False)
self.assertEqual(response['errors'][1]['message'], 'You do not have permission to use this functionality.')
# Default search, attribute with to_ids == True
first.attributes[0].to_ids = True
first = self.user_misp_connector.update_event(first)
self.admin_misp_connector.publish(first.id, alert=False)
self.admin_misp_connector.publish(first, alert=False)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp())
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
@ -959,8 +953,8 @@ class TestComprehensive(unittest.TestCase):
# Mostly solved -> https://github.com/MISP/MISP/issues/4886
time.sleep(5)
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
def test_search_stix(self):
first = self.create_simple_event()
@ -975,7 +969,7 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(stix2['objects'][-1]['pattern'], "[network-traffic:src_ref.type = 'ipv4-addr' AND network-traffic:src_ref.value = '8.8.8.8']")
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(first)
def test_update_object(self):
first = self.create_simple_event()
@ -1028,16 +1022,16 @@ class TestComprehensive(unittest.TestCase):
tags = self.admin_misp_connector.tags(pythonify=True)
for t in tags:
if t.name == 'generic_tag_test':
response = self.admin_misp_connector.delete_tag(t.id)
response = self.admin_misp_connector.delete_tag(t)
self.assertEqual(response['message'], 'Tag deleted.')
# Test delete object
r = self.user_misp_connector.delete_object(second.objects[0].id)
r = self.user_misp_connector.delete_object(second.objects[0])
self.assertEqual(r['message'], 'Object deleted')
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
def test_custom_template(self):
first = self.create_simple_event()
@ -1054,7 +1048,7 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(obj.get_attributes_by_relation('test_overwrite')[0].value, 'blah')
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(first)
def test_unknown_template(self):
first = self.create_simple_event()
@ -1075,7 +1069,7 @@ class TestComprehensive(unittest.TestCase):
self.assertTrue(first.objects[1].attributes[0].disable_correlation)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(first)
def test_domain_ip_object(self):
first = self.create_simple_event()
@ -1089,7 +1083,7 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(len(first.objects[0].attributes), 5)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(first)
def test_asn_object(self):
first = self.create_simple_event()
@ -1102,7 +1096,7 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(len(first.objects[0].attributes), 3)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(first)
def test_object_template(self):
r = self.admin_misp_connector.update_object_templates()
@ -1124,7 +1118,7 @@ class TestComprehensive(unittest.TestCase):
for tag in tags:
if not tag.hide_tag:
break
tag = self.admin_misp_connector.get_tag(tag.id, pythonify=True)
tag = self.admin_misp_connector.get_tag(tag, pythonify=True)
self.assertTrue('name' in tag)
# Enable by MISPTag
tag = self.admin_misp_connector.disable_tag(tag, pythonify=True)
@ -1152,12 +1146,12 @@ class TestComprehensive(unittest.TestCase):
self.assertFalse(first.attributes[0].tags)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(first)
# Delete tag
response = self.admin_misp_connector.delete_tag(new_tag.id)
response = self.admin_misp_connector.delete_tag(new_tag)
self.assertEqual(response['message'], 'Tag deleted.')
response = self.admin_misp_connector.delete_tag(non_exportable_tag.id)
response = self.admin_misp_connector.delete_tag(non_exportable_tag)
self.assertEqual(response['message'], 'Tag deleted.')
def test_add_event_with_attachment_object_controller(self):
@ -1166,29 +1160,27 @@ class TestComprehensive(unittest.TestCase):
first = self.user_misp_connector.add_event(first)
fo, peo, seos = make_binary_objects('tests/viper-test-files/test_files/whoami.exe')
for s in seos:
r = self.user_misp_connector.add_object(first.id, s)
r = self.user_misp_connector.add_object(first, s)
self.assertEqual(r.name, 'pe-section', r)
r = self.user_misp_connector.add_object(first.id, peo)
r = self.user_misp_connector.add_object(first, peo)
self.assertEqual(r.name, 'pe', r)
for ref in peo.ObjectReference:
r = self.user_misp_connector.add_object_reference(ref)
# FIXME: https://github.com/MISP/MISP/issues/4866
self.assertEqual(r.object_uuid, peo.uuid, r.to_json())
r = self.user_misp_connector.add_object(first.id, fo)
r = self.user_misp_connector.add_object(first, fo)
obj_attrs = r.get_attributes_by_relation('ssdeep')
self.assertEqual(len(obj_attrs), 1, obj_attrs)
self.assertEqual(r.name, 'file', r)
r = self.user_misp_connector.add_object_reference(fo.ObjectReference[0])
# FIXME: https://github.com/MISP/MISP/issues/4866
self.assertEqual(r.object_uuid, fo.uuid, r.to_json())
self.assertEqual(r.referenced_uuid, peo.uuid, r.to_json())
r = self.user_misp_connector.delete_object_reference(r.id)
r = self.user_misp_connector.delete_object_reference(r)
self.assertEqual(r['message'], 'ObjectReference deleted')
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(first)
def test_add_event_with_attachment(self):
first = self.create_simple_event()
@ -1206,7 +1198,7 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(first.objects[0].references[0].relationship_type, 'includes')
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(first)
def test_taxonomies(self):
# Make sure we're up-to-date
@ -1219,14 +1211,14 @@ class TestComprehensive(unittest.TestCase):
for tax in taxonomies:
if tax.namespace == list_name_test:
break
r = self.admin_misp_connector.get_taxonomy(tax.id, pythonify=True)
r = self.admin_misp_connector.get_taxonomy(tax, pythonify=True)
self.assertEqual(r.namespace, list_name_test)
self.assertTrue('enabled' in r)
r = self.admin_misp_connector.enable_taxonomy(tax.id)
r = self.admin_misp_connector.enable_taxonomy(tax)
self.assertEqual(r['message'], 'Taxonomy enabled')
r = self.admin_misp_connector.enable_taxonomy_tags(tax.id)
r = self.admin_misp_connector.enable_taxonomy_tags(tax)
self.assertEqual(r['name'], 'The tag(s) has been saved.')
r = self.admin_misp_connector.disable_taxonomy(tax.id)
r = self.admin_misp_connector.disable_taxonomy(tax)
self.assertEqual(r['message'], 'Taxonomy disabled')
def test_warninglists(self):
@ -1245,17 +1237,17 @@ class TestComprehensive(unittest.TestCase):
if wl.name == list_name_test:
break
testwl = wl
r = self.admin_misp_connector.get_warninglist(testwl.id, pythonify=True)
r = self.admin_misp_connector.get_warninglist(testwl, pythonify=True)
self.assertEqual(r.name, list_name_test)
self.assertTrue('WarninglistEntry' in r)
r = self.admin_misp_connector.enable_warninglist(testwl.id)
r = self.admin_misp_connector.enable_warninglist(testwl)
self.assertEqual(r['success'], '1 warninglist(s) enabled')
# Check if a value is in a warning list
md5_empty_file = 'd41d8cd98f00b204e9800998ecf8427e'
r = self.user_misp_connector.values_in_warninglist([md5_empty_file])
self.assertEqual(r[md5_empty_file][0]['name'], list_name_test)
r = self.admin_misp_connector.disable_warninglist(testwl.id)
r = self.admin_misp_connector.disable_warninglist(testwl)
self.assertEqual(r['success'], '1 warninglist(s) disabled')
def test_noticelists(self):
@ -1270,13 +1262,13 @@ class TestComprehensive(unittest.TestCase):
if nl.name == list_name_test:
break
testnl = nl
r = self.admin_misp_connector.get_noticelist(testnl.id, pythonify=True)
r = self.admin_misp_connector.get_noticelist(testnl, pythonify=True)
self.assertEqual(r.name, list_name_test)
# FIXME: https://github.com/MISP/MISP/issues/4856
self.assertTrue('NoticelistEntry' in r)
r = self.admin_misp_connector.enable_noticelist(testnl.id)
r = self.admin_misp_connector.enable_noticelist(testnl)
self.assertTrue(r['Noticelist']['enabled'], r)
r = self.admin_misp_connector.disable_noticelist(testnl.id)
r = self.admin_misp_connector.disable_noticelist(testnl)
self.assertFalse(r['Noticelist']['enabled'], r)
def test_galaxies(self):
@ -1290,7 +1282,7 @@ class TestComprehensive(unittest.TestCase):
for galaxy in galaxies:
if galaxy.name == list_name_test:
break
r = self.admin_misp_connector.get_galaxy(galaxy.id, pythonify=True)
r = self.admin_misp_connector.get_galaxy(galaxy, pythonify=True)
self.assertEqual(r.name, list_name_test)
# FIXME: Fails due to https://github.com/MISP/MISP/issues/4855
# self.assertTrue('GalaxyCluster' in r)
@ -1299,11 +1291,11 @@ class TestComprehensive(unittest.TestCase):
first = self.create_simple_event()
try:
first = self.user_misp_connector.add_event(first)
r = self.admin_misp_connector.push_event_to_ZMQ(first.id)
r = self.admin_misp_connector.push_event_to_ZMQ(first)
self.assertEqual(r['message'], 'Event published to ZMQ')
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(first)
def test_csv_loader(self):
csv1 = CSVLoader(template_name='file', csv_path=Path('tests/csv_testfiles/valid_fieldnames.csv'))
@ -1317,11 +1309,11 @@ class TestComprehensive(unittest.TestCase):
try:
first = self.user_misp_connector.add_event(event)
for o in csv2.load():
new_object = self.user_misp_connector.add_object(first.id, o)
new_object = self.user_misp_connector.add_object(first, o)
self.assertEqual(len(new_object.attributes), 3)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(first)
def test_user(self):
# Get list
@ -1366,21 +1358,22 @@ class TestComprehensive(unittest.TestCase):
first = self.user_misp_connector.add_event(first)
second = self.admin_misp_connector.add_event(second, pythonify=True)
# Get attribute
attribute = self.user_misp_connector.get_attribute(first.attributes[0].id)
attribute = self.user_misp_connector.get_attribute(first.attributes[0])
self.assertEqual(first.attributes[0].uuid, attribute.uuid)
# Add attribute
new_attribute = MISPAttribute()
new_attribute.value = '1.2.3.4'
new_attribute.type = 'ip-dst'
new_attribute = self.user_misp_connector.add_attribute(first.id, new_attribute)
self.assertEqual(new_attribute.value, '1.2.3.4')
new_attribute = self.user_misp_connector.add_attribute(first, new_attribute)
self.assertTrue(isinstance(new_attribute, MISPAttribute), new_attribute)
self.assertEqual(new_attribute.value, '1.2.3.4', new_attribute)
# Test attribute already in event
# new_attribute.uuid = str(uuid4())
# new_attribute = self.user_misp_connector.add_attribute(first.id, new_attribute)
# new_attribute = self.user_misp_connector.add_attribute(first, new_attribute)
new_similar = MISPAttribute()
new_similar.value = '1.2.3.4'
new_similar.type = 'ip-dst'
similar_error = self.user_misp_connector.add_attribute(first.id, new_similar)
similar_error = self.user_misp_connector.add_attribute(first, new_similar)
self.assertEqual(similar_error['errors'][1]['errors']['value'][0], 'A similar attribute already exists for this event.')
# Test add multiple attributes at once
@ -1397,7 +1390,7 @@ class TestComprehensive(unittest.TestCase):
attr4.value = '1.2.3.6'
attr4.type = 'ip-dst'
attr4.add_tag('tlp:amber___test')
response = self.user_misp_connector.add_attribute(first.id, [attr1, attr2, attr3, attr4])
response = self.user_misp_connector.add_attribute(first, [attr1, attr2, attr3, attr4])
if 'attributes' in response:
# FIXME: this if statement can be removed as soon as 2.4.113 is released: the format changed between 112 and 113, we test 113+
self.assertEqual(response['attributes'][0].value, '1.2.3.5')
@ -1421,10 +1414,10 @@ class TestComprehensive(unittest.TestCase):
new_proposal_update = self.user_misp_connector.update_attribute_proposal(new_attribute.id, {'to_ids': False})
self.assertEqual(new_proposal_update.to_ids, False)
# Delete attribute as proposal
proposal_delete = self.user_misp_connector.delete_attribute_proposal(new_attribute.id)
proposal_delete = self.user_misp_connector.delete_attribute_proposal(new_attribute)
self.assertTrue(proposal_delete['saved'])
# Get attribute proposal
temp_new_proposal = self.user_misp_connector.get_attribute_proposal(new_proposal.id)
temp_new_proposal = self.user_misp_connector.get_attribute_proposal(new_proposal)
self.assertEqual(temp_new_proposal.uuid, new_proposal.uuid)
# Get attribute proposal*S*
proposals = self.user_misp_connector.attribute_proposals()
@ -1441,19 +1434,19 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(len(proposals), 1)
self.assertEqual(proposals[0].value, '123.123.123.1')
# Accept attribute proposal - New attribute
self.user_misp_connector.accept_attribute_proposal(new_proposal.id)
first = self.user_misp_connector.get_event(first.id)
self.user_misp_connector.accept_attribute_proposal(new_proposal)
first = self.user_misp_connector.get_event(first)
self.assertEqual(first.attributes[-1].value, '5.2.3.4')
# Accept attribute proposal - Attribute update
response = self.user_misp_connector.accept_attribute_proposal(new_proposal_update.id)
response = self.user_misp_connector.accept_attribute_proposal(new_proposal_update)
self.assertEqual(response['message'], 'Proposed change accepted.')
attribute = self.user_misp_connector.get_attribute(new_attribute.id)
attribute = self.user_misp_connector.get_attribute(new_attribute)
self.assertEqual(attribute.to_ids, False)
# Discard attribute proposal
new_proposal_update = self.user_misp_connector.update_attribute_proposal(new_attribute.id, {'to_ids': True})
response = self.user_misp_connector.discard_attribute_proposal(new_proposal_update.id)
response = self.user_misp_connector.discard_attribute_proposal(new_proposal_update)
self.assertEqual(response['message'], 'Proposal discarded.')
attribute = self.user_misp_connector.get_attribute(new_attribute.id)
attribute = self.user_misp_connector.get_attribute(new_attribute)
self.assertEqual(attribute.to_ids, False)
# Test fallback to proposal if the user doesn't own the event
@ -1461,26 +1454,26 @@ class TestComprehensive(unittest.TestCase):
prop_attr.from_dict(**{'type': 'ip-dst', 'value': '123.43.32.21'})
# Add attribute on event owned by someone else
attribute = self.user_misp_connector.add_attribute(second.id, prop_attr)
self.assertTrue(isinstance(attribute, MISPShadowAttribute))
self.assertTrue(isinstance(attribute, MISPShadowAttribute), attribute)
# Test if add proposal without category works - https://github.com/MISP/MISP/issues/4868
attribute = self.user_misp_connector.add_attribute(second.id, {'type': 'ip-dst', 'value': '123.43.32.22'})
self.assertTrue(isinstance(attribute, MISPShadowAttribute))
# Add attribute with the same value as an existing proposal
prop_attr.uuid = str(uuid4())
attribute = self.admin_misp_connector.add_attribute(second.id, prop_attr, pythonify=True)
attribute = self.admin_misp_connector.add_attribute(second, prop_attr, pythonify=True)
prop_attr.uuid = str(uuid4())
# Add a duplicate attribute (same value)
attribute = self.admin_misp_connector.add_attribute(second.id, prop_attr, pythonify=True)
attribute = self.admin_misp_connector.add_attribute(second, prop_attr, pythonify=True)
self.assertTrue('errors' in attribute)
# Update attribute owned by someone else
attribute = self.user_misp_connector.update_attribute({'comment': 'blah'}, second.attributes[0].id)
self.assertTrue(isinstance(attribute, MISPShadowAttribute))
self.assertTrue(isinstance(attribute, MISPShadowAttribute), attribute)
self.assertEqual(attribute.value, second.attributes[0].value)
# Delete attribute owned by someone else
response = self.user_misp_connector.delete_attribute(second.attributes[1].id)
response = self.user_misp_connector.delete_attribute(second.attributes[1])
self.assertTrue(response['success'])
# Delete attribute owned by user
response = self.admin_misp_connector.delete_attribute(second.attributes[1].id)
response = self.admin_misp_connector.delete_attribute(second.attributes[1])
self.assertEqual(response['message'], 'Attribute deleted.')
# Test attribute*S*
@ -1495,8 +1488,8 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(len(events), 2)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
def test_search_type_event_csv(self):
try:
@ -1512,9 +1505,9 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(len(events), 6)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
self.admin_misp_connector.delete_event(third)
def test_search_logs(self):
# FIXME: https://github.com/MISP/MISP/issues/4872
@ -1587,7 +1580,7 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(list(users_stats.keys()), ['flatData', 'treemap'])
users_stats = self.admin_misp_connector.users_statistics(context='attributehistogram')
self.assertTrue(isinstance(users_stats, dict))
self.assertTrue(isinstance(users_stats, dict), users_stats)
self.user_misp_connector.add_sighting({'value': first.attributes[0].value})
users_stats = self.user_misp_connector.users_statistics(context='sightings')
@ -1598,9 +1591,9 @@ class TestComprehensive(unittest.TestCase):
# self.assertTrue('matrix' in users_stats)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
self.admin_misp_connector.delete_event(third)
def test_direct(self):
try:
@ -1612,7 +1605,7 @@ class TestComprehensive(unittest.TestCase):
event_get.from_dict(**r)
self.assertDictEqual(event.to_dict(), event_get.to_dict())
finally:
self.admin_misp_connector.delete_event(event.id)
self.admin_misp_connector.delete_event(event)
def test_freetext(self):
first = self.create_simple_event()
@ -1620,27 +1613,27 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.toggle_warninglist(warninglist_name='%dns resolv%', force_enable=True)
first = self.user_misp_connector.add_event(first)
# disable_background_processing => returns the parsed data, before insertion
r = self.user_misp_connector.freetext(first.id, '1.1.1.1 foo@bar.de', adhereToWarninglists=False,
r = self.user_misp_connector.freetext(first, '1.1.1.1 foo@bar.de', adhereToWarninglists=False,
distribution=2, returnMetaAttributes=False, pythonify=True,
kw_params={'disable_background_processing': 1})
self.assertTrue(isinstance(r, list))
self.assertEqual(r[0].value, '1.1.1.1')
r = self.user_misp_connector.freetext(first.id, '9.9.9.9 foo@bar.com', adhereToWarninglists='soft',
r = self.user_misp_connector.freetext(first, '9.9.9.9 foo@bar.com', adhereToWarninglists='soft',
distribution=2, returnMetaAttributes=False, pythonify=True,
kw_params={'disable_background_processing': 1})
self.assertTrue(isinstance(r, list))
self.assertEqual(r[0].value, '9.9.9.9')
event = self.user_misp_connector.get_event(first.id, pythonify=True)
event = self.user_misp_connector.get_event(first, pythonify=True)
self.assertEqual(event.attributes[3].value, '9.9.9.9')
self.assertFalse(event.attributes[3].to_ids)
r_wl = self.user_misp_connector.freetext(first.id, '8.8.8.8 foo@bar.de', adhereToWarninglists=True,
r_wl = self.user_misp_connector.freetext(first, '8.8.8.8 foo@bar.de', adhereToWarninglists=True,
distribution=2, returnMetaAttributes=False,
kw_params={'disable_background_processing': 0})
self.assertEqual(r_wl[0].value, '8.8.8.8')
event = self.user_misp_connector.get_event(first.id, pythonify=True)
event = self.user_misp_connector.get_event(first, pythonify=True)
for attribute in event.attributes:
self.assertFalse(attribute.value == '8.8.8.8')
r = self.user_misp_connector.freetext(first.id, '1.1.1.1 foo@bar.de', adhereToWarninglists=True,
r = self.user_misp_connector.freetext(first, '1.1.1.1 foo@bar.de', adhereToWarninglists=True,
distribution=2, returnMetaAttributes=True)
self.assertTrue(isinstance(r, list))
self.assertTrue(isinstance(r[0]['types'], dict))
@ -1648,7 +1641,7 @@ class TestComprehensive(unittest.TestCase):
# Mostly solved https://github.com/MISP/MISP/issues/4886
time.sleep(10)
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(first)
def test_sharing_groups(self):
# add
@ -1659,13 +1652,13 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(sharing_group.name, 'Testcases SG')
self.assertEqual(sharing_group.releasability, 'Testing')
# add org
r = self.admin_misp_connector.add_org_to_sharing_group(sharing_group.id,
self.test_org.id, extend=True)
r = self.admin_misp_connector.add_org_to_sharing_group(sharing_group,
self.test_org, extend=True)
self.assertEqual(r['name'], 'Organisation added to the sharing group.')
# delete org
r = self.admin_misp_connector.remove_org_from_sharing_group(sharing_group.id,
self.test_org.id)
r = self.admin_misp_connector.remove_org_from_sharing_group(sharing_group,
self.test_org)
self.assertEqual(r['name'], 'Organisation removed from the sharing group.', r)
# Get list
sharing_groups = self.admin_misp_connector.sharing_groups(pythonify=True)
@ -1689,7 +1682,7 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(first_attribute.sharing_group_id, int(sharing_group.id))
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(first)
# Delete sharing group
r = self.admin_misp_connector.delete_sharing_group(sharing_group.id)
self.assertEqual(r['message'], 'SharingGroup deleted')
@ -1708,7 +1701,7 @@ class TestComprehensive(unittest.TestCase):
feed = self.admin_misp_connector.update_feed(feed, pythonify=True)
self.assertEqual(feed.name, 'TestFeed - Update')
# Delete
r = self.admin_misp_connector.delete_feed(feed.id)
r = self.admin_misp_connector.delete_feed(feed)
self.assertEqual(r['message'], 'Feed deleted.')
# List
feeds = self.admin_misp_connector.feeds(pythonify=True)
@ -1717,7 +1710,7 @@ class TestComprehensive(unittest.TestCase):
if feed.name == 'The Botvrij.eu Data':
break
# Get
botvrij = self.admin_misp_connector.get_feed(feed.id, pythonify=True)
botvrij = self.admin_misp_connector.get_feed(feed, pythonify=True)
self.assertEqual(botvrij.url, "http://www.botvrij.eu/data/feed-osint")
# Enable
# MISP OSINT
@ -1731,11 +1724,11 @@ class TestComprehensive(unittest.TestCase):
feed = self.admin_misp_connector.enable_feed_cache(botvrij.id, pythonify=True)
self.assertTrue(feed.caching_enabled)
# Cache
r = self.admin_misp_connector.cache_feed(botvrij.id)
r = self.admin_misp_connector.cache_feed(botvrij)
self.assertEqual(r['message'], 'Feed caching job initiated.')
# Fetch
# Cannot test that, it fetches all the events.
# r = self.admin_misp_connector.fetch_feed(botvrij.id)
# r = self.admin_misp_connector.fetch_feed(botvrij)
# FIXME https://github.com/MISP/MISP/issues/4834#issuecomment-511889274
# self.assertEqual(r['message'], 'Feed caching job initiated.')
@ -1774,7 +1767,7 @@ class TestComprehensive(unittest.TestCase):
servers = self.admin_misp_connector.servers(pythonify=True)
self.assertEqual(servers[0].name, 'Updated name')
# Delete
r = self.admin_misp_connector.delete_server(server.id)
r = self.admin_misp_connector.delete_server(server)
self.assertEqual(r['name'], 'Server deleted')
@unittest.skipIf(sys.version_info < (3, 6), 'Not supported on python < 3.6')
@ -1788,7 +1781,7 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(len(first.objects), 7)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(first)
def test_upload_stix(self):
# FIXME https://github.com/MISP/MISP/issues/4892
@ -1806,7 +1799,7 @@ class TestComprehensive(unittest.TestCase):
self.assertTrue(isinstance(second, dict))
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second['Event']['id'])

View File

@ -435,7 +435,9 @@ class TestSync(unittest.TestCase):
sg.name = 'Testcases SG'
sg.releasability = 'Testing'
sharing_group = source.site_admin_connector.add_sharing_group(sg)
a = source.site_admin_connector.add_org_to_sharing_group(sharing_group, middle.test_org.uuid)
source.site_admin_connector.add_org_to_sharing_group(sharing_group, middle.test_org.uuid)
source.site_admin_connector.add_server_to_sharing_group(sharing_group, 0) # Add local server
# NOTE: the data on that sharing group *won't be synced anywhere*
a = event.add_attribute('text', 'SG only attr')
a.distribution = Distribution.sharing_group
@ -443,26 +445,33 @@ class TestSync(unittest.TestCase):
event = source.org_admin_connector.add_event(event)
source.org_admin_connector.publish(event)
time.sleep(15)
time.sleep(60)
event_middle = middle.user_connector.get_event(event.uuid)
event_last = last.user_connector.get_event(event.uuid)
self.assertEqual(len(event_middle.attributes), 3)
event_middle = middle.user_connector.get_event(event)
self.assertTrue(isinstance(event_middle, MISPEvent), event_middle)
self.assertEqual(len(event_middle.attributes), 2, event_middle)
self.assertEqual(len(event_middle.objects), 1, event_middle)
self.assertEqual(len(event_middle.objects[0].attributes), 1, event_middle)
event_last = last.user_connector.get_event(event)
self.assertTrue(isinstance(event_last, MISPEvent), event_last)
self.assertEqual(len(event_last.attributes), 1)
# Test if event is properly sanitized
event_middle_as_site_admin = middle.site_admin_connector.get_event(event.uuid)
self.assertEqual(len(event_middle_as_site_admin.attributes), 3)
self.assertEqual(len(event_middle_as_site_admin.attributes), 2)
event_last_as_site_admin = last.site_admin_connector.get_event(event.uuid)
self.assertEqual(len(event_last_as_site_admin.attributes), 1)
# Get sharing group from middle instance
sgs = middle.site_admin_connector.sharing_groups()
self.assertEqual(len(sgs), 1)
self.assertEqual(sgs[0].name, 'Testcases SG')
middle.site_admin_connector.delete_sharing_group(sgs[0])
self.assertEqual(len(sgs), 0)
# TODO: Update sharing group so the attribute is pushed
# self.assertEqual(sgs[0].name, 'Testcases SG')
# middle.site_admin_connector.delete_sharing_group(sgs[0])
finally:
source.org_admin_connector.delete_event(event)
middle.site_admin_connector.delete_event(event_middle)
last.site_admin_connector.delete_event(event_last)
middle.site_admin_connector.delete_event(event)
last.site_admin_connector.delete_event(event)
source.site_admin_connector.delete_sharing_group(sharing_group.id)
middle.site_admin_connector.delete_sharing_group(sharing_group.id)
source.site_admin_connector.update_server({'push': False}, source.sync_servers[0].id)