From 96576af02b9aa938f225734d666e5e9fc9be12e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Fri, 9 Aug 2019 17:58:55 +0200 Subject: [PATCH] new: Helpers & testcases for syncing --- pymisp/aping.py | 27 ++- pymisp/mispevent.py | 2 +- tests/testlive_comprehensive.py | 18 ++ tests/testlive_sync.py | 317 +++++++++++++++++++++++++++----- 4 files changed, 320 insertions(+), 44 deletions(-) diff --git a/pymisp/aping.py b/pymisp/aping.py index 378737d..29c096c 100644 --- a/pymisp/aping.py +++ b/pymisp/aping.py @@ -164,6 +164,10 @@ class ExpandedPyMISP(PyMISP): response = self._prepare_request('GET', f'/servers/serverSettings') return self._check_response(response, expect_json=True) + def restart_workers(self): + response = self._prepare_request('POST', f'/servers/restartWorkers') + return self._check_response(response, expect_json=True) + def toggle_global_pythonify(self): self.global_pythonify = not self.global_pythonify @@ -220,10 +224,11 @@ class ExpandedPyMISP(PyMISP): response = self._prepare_request('DELETE', f'events/delete/{event_id}') return self._check_response(response, expect_json=True) - def publish(self, event_id: int, alert: bool=False): + def publish(self, event: Union[MISPEvent, int, str, UUID], alert: bool=False): """Publish the event with one single HTTP POST. The default is to not send a mail as it is assumed this method is called on update. """ + event_id = self.__get_uuid_or_id_from_abstract_misp(event) if alert: response = self._prepare_request('POST', f'events/alert/{event_id}') else: @@ -980,6 +985,26 @@ class ExpandedPyMISP(PyMISP): to_return.append(s) return to_return + def get_sync_config(self, pythonify: bool=False): + '''WARNING: This method only works if the current user is a sync user''' + server = self._prepare_request('GET', 'servers/createSync') + server = self._check_response(server, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in server: + return server + s = MISPServer() + s.from_dict(**server) + return s + + def import_server(self, server: MISPServer, pythonify: bool=False): + """Import a sync server config""" + server = self._prepare_request('POST', f'servers/import', data=server) + server = self._check_response(server, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in server: + return server + s = MISPServer() + s.from_dict(**server) + return s + def add_server(self, server: MISPServer, pythonify: bool=False): """Add a server to synchronise with""" server = self._prepare_request('POST', f'servers/add', data=server) diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 0290796..4472d38 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -864,7 +864,7 @@ class MISPUser(AbstractMISP): def __repr__(self): if hasattr(self, 'email'): - return '<{self.__class__.__name__}(object_uuid={self.email})'.format(self=self) + return '<{self.__class__.__name__}(email={self.email})'.format(self=self) return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 6cd0f36..7b75a01 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -494,6 +494,24 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first.id) + def test_delete_by_uuid(self): + try: + first = self.create_simple_event() + obj = MISPObject('file') + obj.add_attribute('filename', 'foo') + first.add_object(obj) + first = self.user_misp_connector.add_event(first) + r = self.user_misp_connector.delete_attribute(first.attributes[0].uuid) + self.assertEqual(r['message'], 'Attribute deleted.') + # FIXME https://github.com/MISP/MISP/issues/4974 + # r = self.user_misp_connector.delete_object(first.objects[0].uuid) + # self.assertEqual(r['message'], 'Object deleted.') + # r = self.user_misp_connector.delete_event(first.uuid) + # self.assertEqual(r['message'], 'Event deleted.') + finally: + # Delete event + self.admin_misp_connector.delete_event(first.id) + def test_search_publish_timestamp(self): '''Search for a specific publication timestamp, an interval, and invalid values.''' # Creating event 1 diff --git a/tests/testlive_sync.py b/tests/testlive_sync.py index 87d3f1a..f546808 100644 --- a/tests/testlive_sync.py +++ b/tests/testlive_sync.py @@ -11,7 +11,7 @@ import logging logging.disable(logging.CRITICAL) try: - from pymisp import ExpandedPyMISP, MISPOrganisation, MISPUser, MISPServer + from pymisp import ExpandedPyMISP, MISPOrganisation, MISPUser, MISPEvent, MISPObject, MISPSharingGroup, Distribution except ImportError: if sys.version_info < (3, 6): print('This test suite requires Python 3.6+, breaking.') @@ -42,7 +42,8 @@ misp_instances = [ 'external_baseurl': 'https://192.168.1.1', 'key': key, 'orgname': 'First org', - 'email_admin': 'first@admin.local', + 'email_site_admin': 'first@site-admin.local', + 'email_admin': 'first@org-admin.local', 'email_user': 'first@user.local' }, { @@ -50,7 +51,8 @@ misp_instances = [ 'external_baseurl': 'https://192.168.1.2', 'key': key, 'orgname': 'Second org', - 'email_admin': 'second@admin.local', + 'email_site_admin': 'second@site-admin.local', + 'email_admin': 'second@org-admin.local', 'email_user': 'second@user.local' }, { @@ -58,7 +60,8 @@ misp_instances = [ 'external_baseurl': 'https://192.168.1.3', 'key': key, 'orgname': 'Third org', - 'email_admin': 'third@admin.local', + 'email_site_admin': 'third@site-admin.local', + 'email_admin': 'third@org-admin.local', 'email_user': 'third@user.local' }, ] @@ -70,47 +73,63 @@ fast_mode = True class MISPInstance(): def __init__(self, params): - self.site_admin_connector = ExpandedPyMISP(params['url'], params['key'], ssl=False, debug=False) + self.initial_user_connector = ExpandedPyMISP(params['url'], params['key'], ssl=False, debug=False) + # Git pull + self.initial_user_connector.update_misp() # Set the default role (id 3 on the VM is normal user) - self.site_admin_connector.set_default_role(3) + self.initial_user_connector.set_default_role(3) + # Restart workers + self.initial_user_connector.restart_workers() if not fast_mode: - # Git pull - self.site_admin_connector.update_misp() # Load submodules - self.site_admin_connector.update_object_templates() - self.site_admin_connector.update_galaxies() - self.site_admin_connector.update_noticelists() - self.site_admin_connector.update_warninglists() - self.site_admin_connector.update_taxonomies() + self.initial_user_connector.update_object_templates() + self.initial_user_connector.update_galaxies() + self.initial_user_connector.update_noticelists() + self.initial_user_connector.update_warninglists() + self.initial_user_connector.update_taxonomies() - self.site_admin_connector.toggle_global_pythonify() + self.initial_user_connector.toggle_global_pythonify() # Create organisation organisation = MISPOrganisation() organisation.name = params['orgname'] - self.test_org = self.site_admin_connector.add_organisation(organisation) + self.test_org = self.initial_user_connector.add_organisation(organisation) print(self.test_org.name, self.test_org.uuid) + # Create Site admin in new org + user = MISPUser() + user.email = params['email_site_admin'] + user.org_id = self.test_org.id + user.role_id = 1 # Site admin + self.test_site_admin = self.initial_user_connector.add_user(user) + self.site_admin_connector = ExpandedPyMISP(params['url'], self.test_site_admin.authkey, ssl=False, debug=False) + self.site_admin_connector.toggle_global_pythonify() # Create org admin user = MISPUser() user.email = params['email_admin'] user.org_id = self.test_org.id user.role_id = 2 # Org admin - self.test_admin = self.site_admin_connector.add_user(user) - self.org_admin_connector = ExpandedPyMISP(params['url'], self.test_admin.authkey, ssl=False, debug=False) + self.test_org_admin = self.site_admin_connector.add_user(user) + self.org_admin_connector = ExpandedPyMISP(params['url'], self.test_org_admin.authkey, ssl=False, debug=False) self.org_admin_connector.toggle_global_pythonify() # Create user user = MISPUser() user.email = params['email_user'] user.org_id = self.test_org.id self.test_usr = self.org_admin_connector.add_user(user) - self.usr_connector = ExpandedPyMISP(params['url'], self.test_admin.authkey, ssl=False, debug=False) - self.usr_connector.toggle_global_pythonify() + self.user_connector = ExpandedPyMISP(params['url'], self.test_usr.authkey, ssl=False, debug=False) + self.user_connector.toggle_global_pythonify() # Setup external_baseurl self.site_admin_connector.set_server_setting('MISP.external_baseurl', params['external_baseurl'], force=True) + # Setup baseurl + self.site_admin_connector.set_server_setting('MISP.baseurl', params['url'], force=True) self.external_base_url = params['external_baseurl'] self.sync = [] + self.sync_servers = [] + + def __repr__(self): + return f'<{self.__class__.__name__}(external={self.external_base_url})' def create_sync_user(self, organisation): sync_org = self.site_admin_connector.add_organisation(organisation) @@ -120,21 +139,22 @@ class MISPInstance(): user.org_id = sync_org.id user.role_id = 5 # Org admin sync_user = self.site_admin_connector.add_user(user) - self.sync.append((sync_org, sync_user, self.external_base_url)) + sync_user_connector = ExpandedPyMISP(self.site_admin_connector.root_url, sync_user.authkey, ssl=False, debug=False) + sync_server_config = sync_user_connector.get_sync_config(pythonify=True) + self.sync.append((sync_org, sync_user, sync_server_config)) - def create_sync_server(self, name, remote_url, authkey, organisation): - server = MISPServer() - server.name = name + def create_sync_server(self, name, server): + server = self.site_admin_connector.import_server(server) server.self_signed = True - server.url = remote_url - server.authkey = authkey - server.remote_org_id = organisation.id - server = self.site_admin_connector.add_server(server) + server.pull = True # Not automatic, but allows to do a pull + server = self.site_admin_connector.update_server(server) r = self.site_admin_connector.test_server(server) - print(r) + if r['status'] != 1: + raise Exception(f'Sync test failed: {r}') + self.sync_servers.append(server) def cleanup(self): - for org, user, remote_url in self.sync: + for org, user, _ in self.sync: self.site_admin_connector.delete_user(user) # Delete user from other org self.site_admin_connector.delete_organisation(org) @@ -144,9 +164,10 @@ class MISPInstance(): # Delete users self.org_admin_connector.delete_user(self.test_usr.id) - self.site_admin_connector.delete_user(self.test_admin.id) + self.site_admin_connector.delete_user(self.test_org_admin.id) + self.initial_user_connector.delete_user(self.test_site_admin.id) # Delete org - self.site_admin_connector.delete_organisation(self.test_org.id) + self.initial_user_connector.delete_organisation(self.test_org.id) class TestSync(unittest.TestCase): @@ -177,13 +198,22 @@ class TestSync(unittest.TestCase): sync_identifiers = [i.sync for i in cls.instances] for instance in cls.instances: for sync_identifier in sync_identifiers: - for org, user, remote_url in sync_identifier: + for org, user, sync_server_config in sync_identifier: if org.name != instance.test_org.name: continue - instance.create_sync_server(name=f'Sync with {remote_url}', - remote_url=remote_url, - authkey=user.authkey, - organisation=instance.test_org) + instance.create_sync_server(name=f'Sync with {sync_server_config.url}', + server=sync_server_config) + + ready = False + while not ready: + ready = True + for i in cls.instances: + settings = i.site_admin_connector.server_settings() + if (not settings['workers']['default']['ok'] + or not settings['workers']['prio']['ok']): + print(f'Not ready: {i}') + ready = False + time.sleep(1) @classmethod def tearDownClass(cls): @@ -194,11 +224,214 @@ class TestSync(unittest.TestCase): subprocess.Popen(['VBoxManage', 'controlvm', 'Test Sync 2', 'poweroff']) subprocess.Popen(['VBoxManage', 'controlvm', 'Test Sync 3', 'poweroff']) time.sleep(20) - subprocess.Popen(['VBoxManage', 'snapshot', 'Test Sync 1', 'restore', 'Snapshot 1']) - subprocess.Popen(['VBoxManage', 'snapshot', 'Test Sync 2', 'restore', 'Snapshot 1']) - subprocess.Popen(['VBoxManage', 'snapshot', 'Test Sync 3', 'restore', 'Snapshot 1']) + subprocess.Popen(['VBoxManage', 'snapshot', 'Test Sync 1', 'restore', 'WithRefresh']) + subprocess.Popen(['VBoxManage', 'snapshot', 'Test Sync 2', 'restore', 'WithRefresh']) + subprocess.Popen(['VBoxManage', 'snapshot', 'Test Sync 3', 'restore', 'WithRefresh']) def test_simple_sync(self): - server = MISPServer() - server.name = 'Second Instance' - server.url = misp_instances[1]['external_baseurl'] + '''Test simple event, push to one server''' + event = MISPEvent() + event.info = 'Event created on first instance' + event.distribution = Distribution.all_communities + event.add_attribute('ip-src', '1.1.1.1') + try: + source = self.instances[0] + dest = self.instances[1] + event = source.org_admin_connector.add_event(event) + source.org_admin_connector.publish(event) + source.site_admin_connector.server_push(source.sync_servers[0], event) + time.sleep(10) + dest_event = dest.org_admin_connector.get_event(event.uuid) + self.assertEqual(event.attributes[0].value, dest_event.attributes[0].value) + + finally: + source.org_admin_connector.delete_event(event) + dest.site_admin_connector.delete_event(dest_event) + + def test_sync_community(self): + '''Simple event, this community only, pull from member of the community''' + event = MISPEvent() + event.info = 'Event created on first instance' + event.distribution = Distribution.this_community_only + event.add_attribute('ip-src', '1.1.1.1') + try: + source = self.instances[0] + dest = self.instances[1] + event = source.org_admin_connector.add_event(event) + source.org_admin_connector.publish(event) + dest.site_admin_connector.server_pull(dest.sync_servers[0]) + time.sleep(10) + dest_event = dest.org_admin_connector.get_event(event.uuid) + self.assertEqual(dest_event.distribution, 0) + finally: + source.org_admin_connector.delete_event(event) + dest.site_admin_connector.delete_event(dest_event) + + def test_sync_all_communities(self): + '''Simple event, all communities, enable automatic push on two sub-instances''' + event = MISPEvent() + event.info = 'Event created on first instance' + event.distribution = Distribution.all_communities + event.add_attribute('ip-src', '1.1.1.1') + try: + source = self.instances[0] + server = source.site_admin_connector.update_server({'push': True}, source.sync_servers[0].id) + self.assertTrue(server.push) + middle = self.instances[1] + middle.site_admin_connector.update_server({'push': True}, middle.sync_servers[1].id) # Enable automatic push to 3rd instance + last = self.instances[2] + event = source.user_connector.add_event(event) + source.org_admin_connector.publish(event) + source.site_admin_connector.server_push(source.sync_servers[0]) + time.sleep(30) + middle_event = middle.user_connector.get_event(event.uuid) + self.assertEqual(event.attributes[0].value, middle_event.attributes[0].value) + last_event = last.user_connector.get_event(event.uuid) + self.assertEqual(event.attributes[0].value, last_event.attributes[0].value) + finally: + source.org_admin_connector.delete_event(event) + middle.site_admin_connector.delete_event(middle_event) + last.site_admin_connector.delete_event(last_event) + + def create_complex_event(self): + event = MISPEvent() + event.info = 'Complex Event' + event.distribution = Distribution.all_communities + event.add_tag('tlp:white') + + event.add_attribute('ip-src', '8.8.8.8') + event.add_attribute('ip-dst', '8.8.8.9') + event.add_attribute('domain', 'google.com') + event.add_attribute('md5', '3c656da41f4645f77e3ec3281b63dd43') + + event.attributes[0].distribution = Distribution.your_organisation_only + event.attributes[1].distribution = Distribution.this_community_only + event.attributes[2].distribution = Distribution.connected_communities + + event.attributes[0].add_tag('tlp:red') + event.attributes[1].add_tag('tlp:amber') + event.attributes[2].add_tag('tlp:green') + + obj = MISPObject('file') + + obj.distribution = Distribution.connected_communities + obj.add_attribute('filename', 'testfile') + obj.add_attribute('md5', '3c656da41f4645f77e3ec3281b63dd44') + obj.attributes[0].distribution = Distribution.your_organisation_only + + event.add_object(obj) + + return event + + def test_complex_event_push_pull(self): + '''Test automatic push''' + event = self.create_complex_event() + try: + source = self.instances[0] + source.site_admin_connector.update_server({'push': True}, source.sync_servers[0].id) + middle = self.instances[1] + middle.site_admin_connector.update_server({'push': True}, middle.sync_servers[1].id) # Enable automatic push to 3rd instance + last = self.instances[2] + + event = source.org_admin_connector.add_event(event) + source.org_admin_connector.publish(event) + time.sleep(15) + event_middle = middle.user_connector.get_event(event.uuid) + event_last = last.user_connector.get_event(event.uuid) + self.assertEqual(len(event_middle.attributes), 2) # attribute 3 and 4 + self.assertEqual(len(event_middle.objects[0].attributes), 1) # attribute 2 + self.assertEqual(len(event_last.attributes), 1) # attribute 4 + self.assertFalse(event_last.objects) + # Test if event is properly sanitized + event_middle_as_site_admin = middle.site_admin_connector.get_event(event.uuid) + self.assertEqual(len(event_middle_as_site_admin.attributes), 2) # attribute 3 and 4 + self.assertEqual(len(event_middle_as_site_admin.objects[0].attributes), 1) # attribute 2 + # FIXME https://github.com/MISP/MISP/issues/4975 + # Force pull from the last one + # last.site_admin_connector.server_pull(last.sync_servers[0]) + # time.sleep(6) + # event_last = last.user_connector.get_event(event.uuid) + # self.assertEqual(len(event_last.objects[0].attributes), 1) # attribute 2 + # self.assertEqual(len(event_last.attributes), 2) # attribute 3 and 4 + # Force pull from the middle one + # middle.site_admin_connector.server_pull(last.sync_servers[0]) + # time.sleep(6) + # event_middle = middle.user_connector.get_event(event.uuid) + # self.assertEqual(len(event_middle.attributes), 3) # attribute 2, 3 and 4 + # Force pull from the last one + # last.site_admin_connector.server_pull(last.sync_servers[0]) + # time.sleep(6) + # event_last = last.user_connector.get_event(event.uuid) + # self.assertEqual(len(event_last.attributes), 2) # attribute 3 and 4 + finally: + source.org_admin_connector.delete_event(event) + middle.site_admin_connector.delete_event(event_middle) + last.site_admin_connector.delete_event(event_last) + + def test_complex_event_pull(self): + '''Test pull''' + event = self.create_complex_event() + try: + source = self.instances[0] + middle = self.instances[1] + last = self.instances[2] + + event = source.org_admin_connector.add_event(event) + source.org_admin_connector.publish(event) + middle.site_admin_connector.server_pull(middle.sync_servers[0]) + time.sleep(6) + last.site_admin_connector.server_pull(last.sync_servers[1]) + time.sleep(6) + event_middle = middle.user_connector.get_event(event.uuid) + event_last = last.user_connector.get_event(event.uuid) + self.assertEqual(len(event_middle.attributes), 3) # attribute 2, 3 and 4 + self.assertEqual(len(event_middle.objects[0].attributes), 1) # attribute 2 + self.assertEqual(len(event_last.attributes), 2) # attribute 3, 4 + self.assertEqual(len(event_last.objects[0].attributes), 1) + # Test if event is properly sanitized + event_middle_as_site_admin = middle.site_admin_connector.get_event(event.uuid) + self.assertEqual(len(event_middle_as_site_admin.attributes), 3) # attribute 2, 3 and 4 + self.assertEqual(len(event_middle_as_site_admin.objects[0].attributes), 1) # attribute 2 + finally: + source.org_admin_connector.delete_event(event) + middle.site_admin_connector.delete_event(event_middle) + last.site_admin_connector.delete_event(event_last) + + def test_sharing_group(self): + '''Test Sharing Group''' + event = self.create_complex_event() + try: + source = self.instances[0] + source.site_admin_connector.update_server({'push': True}, source.sync_servers[0].id) + middle = self.instances[1] + middle.site_admin_connector.update_server({'push': True}, middle.sync_servers[1].id) # Enable automatic push to 3rd instance + last = self.instances[2] + + sg = MISPSharingGroup() + sg.name = 'Testcases SG' + sg.releasability = 'Testing' + sharing_group = source.site_admin_connector.add_sharing_group(sg) + a = source.site_admin_connector.add_org_to_sharing_group(sharing_group, middle.test_org.uuid) + + a = event.add_attribute('text', 'SG only attr') + a.distribution = Distribution.sharing_group + a.sharing_group_id = sharing_group.id + + event = source.org_admin_connector.add_event(event) + source.org_admin_connector.publish(event) + time.sleep(60) + + event_middle = middle.user_connector.get_event(event.uuid) + event_last = last.user_connector.get_event(event.uuid) + self.assertEqual(len(event_middle.attributes), 3) + self.assertEqual(len(event_last.attributes), 1) + # Test if event is properly sanitized + event_middle_as_site_admin = middle.site_admin_connector.get_event(event.uuid) + self.assertEqual(len(event_middle_as_site_admin.attributes), 3) + event_last_as_site_admin = last.site_admin_connector.get_event(event.uuid) + self.assertEqual(len(event_last_as_site_admin.attributes), 2) # FIXME: should be 1, I think. + finally: + source.org_admin_connector.delete_event(event) + middle.site_admin_connector.delete_event(event_middle) + last.site_admin_connector.delete_event(event_last) + source.site_admin_connector.delete_sharing_group(sharing_group.id)