mirror of https://github.com/MISP/PyMISP
new: Helpers & testcases for syncing
parent
783b84899d
commit
96576af02b
|
@ -164,6 +164,10 @@ class ExpandedPyMISP(PyMISP):
|
||||||
response = self._prepare_request('GET', f'/servers/serverSettings')
|
response = self._prepare_request('GET', f'/servers/serverSettings')
|
||||||
return self._check_response(response, expect_json=True)
|
return self._check_response(response, expect_json=True)
|
||||||
|
|
||||||
|
def restart_workers(self):
|
||||||
|
response = self._prepare_request('POST', f'/servers/restartWorkers')
|
||||||
|
return self._check_response(response, expect_json=True)
|
||||||
|
|
||||||
def toggle_global_pythonify(self):
|
def toggle_global_pythonify(self):
|
||||||
self.global_pythonify = not self.global_pythonify
|
self.global_pythonify = not self.global_pythonify
|
||||||
|
|
||||||
|
@ -220,10 +224,11 @@ class ExpandedPyMISP(PyMISP):
|
||||||
response = self._prepare_request('DELETE', f'events/delete/{event_id}')
|
response = self._prepare_request('DELETE', f'events/delete/{event_id}')
|
||||||
return self._check_response(response, expect_json=True)
|
return self._check_response(response, expect_json=True)
|
||||||
|
|
||||||
def publish(self, event_id: int, alert: bool=False):
|
def publish(self, event: Union[MISPEvent, int, str, UUID], alert: bool=False):
|
||||||
"""Publish the event with one single HTTP POST.
|
"""Publish the event with one single HTTP POST.
|
||||||
The default is to not send a mail as it is assumed this method is called on update.
|
The default is to not send a mail as it is assumed this method is called on update.
|
||||||
"""
|
"""
|
||||||
|
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
|
||||||
if alert:
|
if alert:
|
||||||
response = self._prepare_request('POST', f'events/alert/{event_id}')
|
response = self._prepare_request('POST', f'events/alert/{event_id}')
|
||||||
else:
|
else:
|
||||||
|
@ -980,6 +985,26 @@ class ExpandedPyMISP(PyMISP):
|
||||||
to_return.append(s)
|
to_return.append(s)
|
||||||
return to_return
|
return to_return
|
||||||
|
|
||||||
|
def get_sync_config(self, pythonify: bool=False):
|
||||||
|
'''WARNING: This method only works if the current user is a sync user'''
|
||||||
|
server = self._prepare_request('GET', 'servers/createSync')
|
||||||
|
server = self._check_response(server, expect_json=True)
|
||||||
|
if not (self.global_pythonify or pythonify) or 'errors' in server:
|
||||||
|
return server
|
||||||
|
s = MISPServer()
|
||||||
|
s.from_dict(**server)
|
||||||
|
return s
|
||||||
|
|
||||||
|
def import_server(self, server: MISPServer, pythonify: bool=False):
|
||||||
|
"""Import a sync server config"""
|
||||||
|
server = self._prepare_request('POST', f'servers/import', data=server)
|
||||||
|
server = self._check_response(server, expect_json=True)
|
||||||
|
if not (self.global_pythonify or pythonify) or 'errors' in server:
|
||||||
|
return server
|
||||||
|
s = MISPServer()
|
||||||
|
s.from_dict(**server)
|
||||||
|
return s
|
||||||
|
|
||||||
def add_server(self, server: MISPServer, pythonify: bool=False):
|
def add_server(self, server: MISPServer, pythonify: bool=False):
|
||||||
"""Add a server to synchronise with"""
|
"""Add a server to synchronise with"""
|
||||||
server = self._prepare_request('POST', f'servers/add', data=server)
|
server = self._prepare_request('POST', f'servers/add', data=server)
|
||||||
|
|
|
@ -864,7 +864,7 @@ class MISPUser(AbstractMISP):
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
if hasattr(self, 'email'):
|
if hasattr(self, 'email'):
|
||||||
return '<{self.__class__.__name__}(object_uuid={self.email})'.format(self=self)
|
return '<{self.__class__.__name__}(email={self.email})'.format(self=self)
|
||||||
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
|
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -494,6 +494,24 @@ class TestComprehensive(unittest.TestCase):
|
||||||
# Delete event
|
# Delete event
|
||||||
self.admin_misp_connector.delete_event(first.id)
|
self.admin_misp_connector.delete_event(first.id)
|
||||||
|
|
||||||
|
def test_delete_by_uuid(self):
|
||||||
|
try:
|
||||||
|
first = self.create_simple_event()
|
||||||
|
obj = MISPObject('file')
|
||||||
|
obj.add_attribute('filename', 'foo')
|
||||||
|
first.add_object(obj)
|
||||||
|
first = self.user_misp_connector.add_event(first)
|
||||||
|
r = self.user_misp_connector.delete_attribute(first.attributes[0].uuid)
|
||||||
|
self.assertEqual(r['message'], 'Attribute deleted.')
|
||||||
|
# FIXME https://github.com/MISP/MISP/issues/4974
|
||||||
|
# r = self.user_misp_connector.delete_object(first.objects[0].uuid)
|
||||||
|
# self.assertEqual(r['message'], 'Object deleted.')
|
||||||
|
# r = self.user_misp_connector.delete_event(first.uuid)
|
||||||
|
# self.assertEqual(r['message'], 'Event deleted.')
|
||||||
|
finally:
|
||||||
|
# Delete event
|
||||||
|
self.admin_misp_connector.delete_event(first.id)
|
||||||
|
|
||||||
def test_search_publish_timestamp(self):
|
def test_search_publish_timestamp(self):
|
||||||
'''Search for a specific publication timestamp, an interval, and invalid values.'''
|
'''Search for a specific publication timestamp, an interval, and invalid values.'''
|
||||||
# Creating event 1
|
# Creating event 1
|
||||||
|
|
|
@ -11,7 +11,7 @@ import logging
|
||||||
logging.disable(logging.CRITICAL)
|
logging.disable(logging.CRITICAL)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from pymisp import ExpandedPyMISP, MISPOrganisation, MISPUser, MISPServer
|
from pymisp import ExpandedPyMISP, MISPOrganisation, MISPUser, MISPEvent, MISPObject, MISPSharingGroup, Distribution
|
||||||
except ImportError:
|
except ImportError:
|
||||||
if sys.version_info < (3, 6):
|
if sys.version_info < (3, 6):
|
||||||
print('This test suite requires Python 3.6+, breaking.')
|
print('This test suite requires Python 3.6+, breaking.')
|
||||||
|
@ -42,7 +42,8 @@ misp_instances = [
|
||||||
'external_baseurl': 'https://192.168.1.1',
|
'external_baseurl': 'https://192.168.1.1',
|
||||||
'key': key,
|
'key': key,
|
||||||
'orgname': 'First org',
|
'orgname': 'First org',
|
||||||
'email_admin': 'first@admin.local',
|
'email_site_admin': 'first@site-admin.local',
|
||||||
|
'email_admin': 'first@org-admin.local',
|
||||||
'email_user': 'first@user.local'
|
'email_user': 'first@user.local'
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -50,7 +51,8 @@ misp_instances = [
|
||||||
'external_baseurl': 'https://192.168.1.2',
|
'external_baseurl': 'https://192.168.1.2',
|
||||||
'key': key,
|
'key': key,
|
||||||
'orgname': 'Second org',
|
'orgname': 'Second org',
|
||||||
'email_admin': 'second@admin.local',
|
'email_site_admin': 'second@site-admin.local',
|
||||||
|
'email_admin': 'second@org-admin.local',
|
||||||
'email_user': 'second@user.local'
|
'email_user': 'second@user.local'
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -58,7 +60,8 @@ misp_instances = [
|
||||||
'external_baseurl': 'https://192.168.1.3',
|
'external_baseurl': 'https://192.168.1.3',
|
||||||
'key': key,
|
'key': key,
|
||||||
'orgname': 'Third org',
|
'orgname': 'Third org',
|
||||||
'email_admin': 'third@admin.local',
|
'email_site_admin': 'third@site-admin.local',
|
||||||
|
'email_admin': 'third@org-admin.local',
|
||||||
'email_user': 'third@user.local'
|
'email_user': 'third@user.local'
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
|
@ -70,47 +73,63 @@ fast_mode = True
|
||||||
class MISPInstance():
|
class MISPInstance():
|
||||||
|
|
||||||
def __init__(self, params):
|
def __init__(self, params):
|
||||||
self.site_admin_connector = ExpandedPyMISP(params['url'], params['key'], ssl=False, debug=False)
|
self.initial_user_connector = ExpandedPyMISP(params['url'], params['key'], ssl=False, debug=False)
|
||||||
|
# Git pull
|
||||||
|
self.initial_user_connector.update_misp()
|
||||||
# Set the default role (id 3 on the VM is normal user)
|
# Set the default role (id 3 on the VM is normal user)
|
||||||
self.site_admin_connector.set_default_role(3)
|
self.initial_user_connector.set_default_role(3)
|
||||||
|
# Restart workers
|
||||||
|
self.initial_user_connector.restart_workers()
|
||||||
if not fast_mode:
|
if not fast_mode:
|
||||||
# Git pull
|
|
||||||
self.site_admin_connector.update_misp()
|
|
||||||
# Load submodules
|
# Load submodules
|
||||||
self.site_admin_connector.update_object_templates()
|
self.initial_user_connector.update_object_templates()
|
||||||
self.site_admin_connector.update_galaxies()
|
self.initial_user_connector.update_galaxies()
|
||||||
self.site_admin_connector.update_noticelists()
|
self.initial_user_connector.update_noticelists()
|
||||||
self.site_admin_connector.update_warninglists()
|
self.initial_user_connector.update_warninglists()
|
||||||
self.site_admin_connector.update_taxonomies()
|
self.initial_user_connector.update_taxonomies()
|
||||||
|
|
||||||
self.site_admin_connector.toggle_global_pythonify()
|
self.initial_user_connector.toggle_global_pythonify()
|
||||||
|
|
||||||
# Create organisation
|
# Create organisation
|
||||||
organisation = MISPOrganisation()
|
organisation = MISPOrganisation()
|
||||||
organisation.name = params['orgname']
|
organisation.name = params['orgname']
|
||||||
self.test_org = self.site_admin_connector.add_organisation(organisation)
|
self.test_org = self.initial_user_connector.add_organisation(organisation)
|
||||||
print(self.test_org.name, self.test_org.uuid)
|
print(self.test_org.name, self.test_org.uuid)
|
||||||
|
# Create Site admin in new org
|
||||||
|
user = MISPUser()
|
||||||
|
user.email = params['email_site_admin']
|
||||||
|
user.org_id = self.test_org.id
|
||||||
|
user.role_id = 1 # Site admin
|
||||||
|
self.test_site_admin = self.initial_user_connector.add_user(user)
|
||||||
|
self.site_admin_connector = ExpandedPyMISP(params['url'], self.test_site_admin.authkey, ssl=False, debug=False)
|
||||||
|
self.site_admin_connector.toggle_global_pythonify()
|
||||||
# Create org admin
|
# Create org admin
|
||||||
user = MISPUser()
|
user = MISPUser()
|
||||||
user.email = params['email_admin']
|
user.email = params['email_admin']
|
||||||
user.org_id = self.test_org.id
|
user.org_id = self.test_org.id
|
||||||
user.role_id = 2 # Org admin
|
user.role_id = 2 # Org admin
|
||||||
self.test_admin = self.site_admin_connector.add_user(user)
|
self.test_org_admin = self.site_admin_connector.add_user(user)
|
||||||
self.org_admin_connector = ExpandedPyMISP(params['url'], self.test_admin.authkey, ssl=False, debug=False)
|
self.org_admin_connector = ExpandedPyMISP(params['url'], self.test_org_admin.authkey, ssl=False, debug=False)
|
||||||
self.org_admin_connector.toggle_global_pythonify()
|
self.org_admin_connector.toggle_global_pythonify()
|
||||||
# Create user
|
# Create user
|
||||||
user = MISPUser()
|
user = MISPUser()
|
||||||
user.email = params['email_user']
|
user.email = params['email_user']
|
||||||
user.org_id = self.test_org.id
|
user.org_id = self.test_org.id
|
||||||
self.test_usr = self.org_admin_connector.add_user(user)
|
self.test_usr = self.org_admin_connector.add_user(user)
|
||||||
self.usr_connector = ExpandedPyMISP(params['url'], self.test_admin.authkey, ssl=False, debug=False)
|
self.user_connector = ExpandedPyMISP(params['url'], self.test_usr.authkey, ssl=False, debug=False)
|
||||||
self.usr_connector.toggle_global_pythonify()
|
self.user_connector.toggle_global_pythonify()
|
||||||
|
|
||||||
# Setup external_baseurl
|
# Setup external_baseurl
|
||||||
self.site_admin_connector.set_server_setting('MISP.external_baseurl', params['external_baseurl'], force=True)
|
self.site_admin_connector.set_server_setting('MISP.external_baseurl', params['external_baseurl'], force=True)
|
||||||
|
# Setup baseurl
|
||||||
|
self.site_admin_connector.set_server_setting('MISP.baseurl', params['url'], force=True)
|
||||||
|
|
||||||
self.external_base_url = params['external_baseurl']
|
self.external_base_url = params['external_baseurl']
|
||||||
self.sync = []
|
self.sync = []
|
||||||
|
self.sync_servers = []
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f'<{self.__class__.__name__}(external={self.external_base_url})'
|
||||||
|
|
||||||
def create_sync_user(self, organisation):
|
def create_sync_user(self, organisation):
|
||||||
sync_org = self.site_admin_connector.add_organisation(organisation)
|
sync_org = self.site_admin_connector.add_organisation(organisation)
|
||||||
|
@ -120,21 +139,22 @@ class MISPInstance():
|
||||||
user.org_id = sync_org.id
|
user.org_id = sync_org.id
|
||||||
user.role_id = 5 # Org admin
|
user.role_id = 5 # Org admin
|
||||||
sync_user = self.site_admin_connector.add_user(user)
|
sync_user = self.site_admin_connector.add_user(user)
|
||||||
self.sync.append((sync_org, sync_user, self.external_base_url))
|
sync_user_connector = ExpandedPyMISP(self.site_admin_connector.root_url, sync_user.authkey, ssl=False, debug=False)
|
||||||
|
sync_server_config = sync_user_connector.get_sync_config(pythonify=True)
|
||||||
|
self.sync.append((sync_org, sync_user, sync_server_config))
|
||||||
|
|
||||||
def create_sync_server(self, name, remote_url, authkey, organisation):
|
def create_sync_server(self, name, server):
|
||||||
server = MISPServer()
|
server = self.site_admin_connector.import_server(server)
|
||||||
server.name = name
|
|
||||||
server.self_signed = True
|
server.self_signed = True
|
||||||
server.url = remote_url
|
server.pull = True # Not automatic, but allows to do a pull
|
||||||
server.authkey = authkey
|
server = self.site_admin_connector.update_server(server)
|
||||||
server.remote_org_id = organisation.id
|
|
||||||
server = self.site_admin_connector.add_server(server)
|
|
||||||
r = self.site_admin_connector.test_server(server)
|
r = self.site_admin_connector.test_server(server)
|
||||||
print(r)
|
if r['status'] != 1:
|
||||||
|
raise Exception(f'Sync test failed: {r}')
|
||||||
|
self.sync_servers.append(server)
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
for org, user, remote_url in self.sync:
|
for org, user, _ in self.sync:
|
||||||
self.site_admin_connector.delete_user(user) # Delete user from other org
|
self.site_admin_connector.delete_user(user) # Delete user from other org
|
||||||
self.site_admin_connector.delete_organisation(org)
|
self.site_admin_connector.delete_organisation(org)
|
||||||
|
|
||||||
|
@ -144,9 +164,10 @@ class MISPInstance():
|
||||||
|
|
||||||
# Delete users
|
# Delete users
|
||||||
self.org_admin_connector.delete_user(self.test_usr.id)
|
self.org_admin_connector.delete_user(self.test_usr.id)
|
||||||
self.site_admin_connector.delete_user(self.test_admin.id)
|
self.site_admin_connector.delete_user(self.test_org_admin.id)
|
||||||
|
self.initial_user_connector.delete_user(self.test_site_admin.id)
|
||||||
# Delete org
|
# Delete org
|
||||||
self.site_admin_connector.delete_organisation(self.test_org.id)
|
self.initial_user_connector.delete_organisation(self.test_org.id)
|
||||||
|
|
||||||
|
|
||||||
class TestSync(unittest.TestCase):
|
class TestSync(unittest.TestCase):
|
||||||
|
@ -177,13 +198,22 @@ class TestSync(unittest.TestCase):
|
||||||
sync_identifiers = [i.sync for i in cls.instances]
|
sync_identifiers = [i.sync for i in cls.instances]
|
||||||
for instance in cls.instances:
|
for instance in cls.instances:
|
||||||
for sync_identifier in sync_identifiers:
|
for sync_identifier in sync_identifiers:
|
||||||
for org, user, remote_url in sync_identifier:
|
for org, user, sync_server_config in sync_identifier:
|
||||||
if org.name != instance.test_org.name:
|
if org.name != instance.test_org.name:
|
||||||
continue
|
continue
|
||||||
instance.create_sync_server(name=f'Sync with {remote_url}',
|
instance.create_sync_server(name=f'Sync with {sync_server_config.url}',
|
||||||
remote_url=remote_url,
|
server=sync_server_config)
|
||||||
authkey=user.authkey,
|
|
||||||
organisation=instance.test_org)
|
ready = False
|
||||||
|
while not ready:
|
||||||
|
ready = True
|
||||||
|
for i in cls.instances:
|
||||||
|
settings = i.site_admin_connector.server_settings()
|
||||||
|
if (not settings['workers']['default']['ok']
|
||||||
|
or not settings['workers']['prio']['ok']):
|
||||||
|
print(f'Not ready: {i}')
|
||||||
|
ready = False
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def tearDownClass(cls):
|
def tearDownClass(cls):
|
||||||
|
@ -194,11 +224,214 @@ class TestSync(unittest.TestCase):
|
||||||
subprocess.Popen(['VBoxManage', 'controlvm', 'Test Sync 2', 'poweroff'])
|
subprocess.Popen(['VBoxManage', 'controlvm', 'Test Sync 2', 'poweroff'])
|
||||||
subprocess.Popen(['VBoxManage', 'controlvm', 'Test Sync 3', 'poweroff'])
|
subprocess.Popen(['VBoxManage', 'controlvm', 'Test Sync 3', 'poweroff'])
|
||||||
time.sleep(20)
|
time.sleep(20)
|
||||||
subprocess.Popen(['VBoxManage', 'snapshot', 'Test Sync 1', 'restore', 'Snapshot 1'])
|
subprocess.Popen(['VBoxManage', 'snapshot', 'Test Sync 1', 'restore', 'WithRefresh'])
|
||||||
subprocess.Popen(['VBoxManage', 'snapshot', 'Test Sync 2', 'restore', 'Snapshot 1'])
|
subprocess.Popen(['VBoxManage', 'snapshot', 'Test Sync 2', 'restore', 'WithRefresh'])
|
||||||
subprocess.Popen(['VBoxManage', 'snapshot', 'Test Sync 3', 'restore', 'Snapshot 1'])
|
subprocess.Popen(['VBoxManage', 'snapshot', 'Test Sync 3', 'restore', 'WithRefresh'])
|
||||||
|
|
||||||
def test_simple_sync(self):
|
def test_simple_sync(self):
|
||||||
server = MISPServer()
|
'''Test simple event, push to one server'''
|
||||||
server.name = 'Second Instance'
|
event = MISPEvent()
|
||||||
server.url = misp_instances[1]['external_baseurl']
|
event.info = 'Event created on first instance'
|
||||||
|
event.distribution = Distribution.all_communities
|
||||||
|
event.add_attribute('ip-src', '1.1.1.1')
|
||||||
|
try:
|
||||||
|
source = self.instances[0]
|
||||||
|
dest = self.instances[1]
|
||||||
|
event = source.org_admin_connector.add_event(event)
|
||||||
|
source.org_admin_connector.publish(event)
|
||||||
|
source.site_admin_connector.server_push(source.sync_servers[0], event)
|
||||||
|
time.sleep(10)
|
||||||
|
dest_event = dest.org_admin_connector.get_event(event.uuid)
|
||||||
|
self.assertEqual(event.attributes[0].value, dest_event.attributes[0].value)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
source.org_admin_connector.delete_event(event)
|
||||||
|
dest.site_admin_connector.delete_event(dest_event)
|
||||||
|
|
||||||
|
def test_sync_community(self):
|
||||||
|
'''Simple event, this community only, pull from member of the community'''
|
||||||
|
event = MISPEvent()
|
||||||
|
event.info = 'Event created on first instance'
|
||||||
|
event.distribution = Distribution.this_community_only
|
||||||
|
event.add_attribute('ip-src', '1.1.1.1')
|
||||||
|
try:
|
||||||
|
source = self.instances[0]
|
||||||
|
dest = self.instances[1]
|
||||||
|
event = source.org_admin_connector.add_event(event)
|
||||||
|
source.org_admin_connector.publish(event)
|
||||||
|
dest.site_admin_connector.server_pull(dest.sync_servers[0])
|
||||||
|
time.sleep(10)
|
||||||
|
dest_event = dest.org_admin_connector.get_event(event.uuid)
|
||||||
|
self.assertEqual(dest_event.distribution, 0)
|
||||||
|
finally:
|
||||||
|
source.org_admin_connector.delete_event(event)
|
||||||
|
dest.site_admin_connector.delete_event(dest_event)
|
||||||
|
|
||||||
|
def test_sync_all_communities(self):
|
||||||
|
'''Simple event, all communities, enable automatic push on two sub-instances'''
|
||||||
|
event = MISPEvent()
|
||||||
|
event.info = 'Event created on first instance'
|
||||||
|
event.distribution = Distribution.all_communities
|
||||||
|
event.add_attribute('ip-src', '1.1.1.1')
|
||||||
|
try:
|
||||||
|
source = self.instances[0]
|
||||||
|
server = source.site_admin_connector.update_server({'push': True}, source.sync_servers[0].id)
|
||||||
|
self.assertTrue(server.push)
|
||||||
|
middle = self.instances[1]
|
||||||
|
middle.site_admin_connector.update_server({'push': True}, middle.sync_servers[1].id) # Enable automatic push to 3rd instance
|
||||||
|
last = self.instances[2]
|
||||||
|
event = source.user_connector.add_event(event)
|
||||||
|
source.org_admin_connector.publish(event)
|
||||||
|
source.site_admin_connector.server_push(source.sync_servers[0])
|
||||||
|
time.sleep(30)
|
||||||
|
middle_event = middle.user_connector.get_event(event.uuid)
|
||||||
|
self.assertEqual(event.attributes[0].value, middle_event.attributes[0].value)
|
||||||
|
last_event = last.user_connector.get_event(event.uuid)
|
||||||
|
self.assertEqual(event.attributes[0].value, last_event.attributes[0].value)
|
||||||
|
finally:
|
||||||
|
source.org_admin_connector.delete_event(event)
|
||||||
|
middle.site_admin_connector.delete_event(middle_event)
|
||||||
|
last.site_admin_connector.delete_event(last_event)
|
||||||
|
|
||||||
|
def create_complex_event(self):
|
||||||
|
event = MISPEvent()
|
||||||
|
event.info = 'Complex Event'
|
||||||
|
event.distribution = Distribution.all_communities
|
||||||
|
event.add_tag('tlp:white')
|
||||||
|
|
||||||
|
event.add_attribute('ip-src', '8.8.8.8')
|
||||||
|
event.add_attribute('ip-dst', '8.8.8.9')
|
||||||
|
event.add_attribute('domain', 'google.com')
|
||||||
|
event.add_attribute('md5', '3c656da41f4645f77e3ec3281b63dd43')
|
||||||
|
|
||||||
|
event.attributes[0].distribution = Distribution.your_organisation_only
|
||||||
|
event.attributes[1].distribution = Distribution.this_community_only
|
||||||
|
event.attributes[2].distribution = Distribution.connected_communities
|
||||||
|
|
||||||
|
event.attributes[0].add_tag('tlp:red')
|
||||||
|
event.attributes[1].add_tag('tlp:amber')
|
||||||
|
event.attributes[2].add_tag('tlp:green')
|
||||||
|
|
||||||
|
obj = MISPObject('file')
|
||||||
|
|
||||||
|
obj.distribution = Distribution.connected_communities
|
||||||
|
obj.add_attribute('filename', 'testfile')
|
||||||
|
obj.add_attribute('md5', '3c656da41f4645f77e3ec3281b63dd44')
|
||||||
|
obj.attributes[0].distribution = Distribution.your_organisation_only
|
||||||
|
|
||||||
|
event.add_object(obj)
|
||||||
|
|
||||||
|
return event
|
||||||
|
|
||||||
|
def test_complex_event_push_pull(self):
|
||||||
|
'''Test automatic push'''
|
||||||
|
event = self.create_complex_event()
|
||||||
|
try:
|
||||||
|
source = self.instances[0]
|
||||||
|
source.site_admin_connector.update_server({'push': True}, source.sync_servers[0].id)
|
||||||
|
middle = self.instances[1]
|
||||||
|
middle.site_admin_connector.update_server({'push': True}, middle.sync_servers[1].id) # Enable automatic push to 3rd instance
|
||||||
|
last = self.instances[2]
|
||||||
|
|
||||||
|
event = source.org_admin_connector.add_event(event)
|
||||||
|
source.org_admin_connector.publish(event)
|
||||||
|
time.sleep(15)
|
||||||
|
event_middle = middle.user_connector.get_event(event.uuid)
|
||||||
|
event_last = last.user_connector.get_event(event.uuid)
|
||||||
|
self.assertEqual(len(event_middle.attributes), 2) # attribute 3 and 4
|
||||||
|
self.assertEqual(len(event_middle.objects[0].attributes), 1) # attribute 2
|
||||||
|
self.assertEqual(len(event_last.attributes), 1) # attribute 4
|
||||||
|
self.assertFalse(event_last.objects)
|
||||||
|
# Test if event is properly sanitized
|
||||||
|
event_middle_as_site_admin = middle.site_admin_connector.get_event(event.uuid)
|
||||||
|
self.assertEqual(len(event_middle_as_site_admin.attributes), 2) # attribute 3 and 4
|
||||||
|
self.assertEqual(len(event_middle_as_site_admin.objects[0].attributes), 1) # attribute 2
|
||||||
|
# FIXME https://github.com/MISP/MISP/issues/4975
|
||||||
|
# Force pull from the last one
|
||||||
|
# last.site_admin_connector.server_pull(last.sync_servers[0])
|
||||||
|
# time.sleep(6)
|
||||||
|
# event_last = last.user_connector.get_event(event.uuid)
|
||||||
|
# self.assertEqual(len(event_last.objects[0].attributes), 1) # attribute 2
|
||||||
|
# self.assertEqual(len(event_last.attributes), 2) # attribute 3 and 4
|
||||||
|
# Force pull from the middle one
|
||||||
|
# middle.site_admin_connector.server_pull(last.sync_servers[0])
|
||||||
|
# time.sleep(6)
|
||||||
|
# event_middle = middle.user_connector.get_event(event.uuid)
|
||||||
|
# self.assertEqual(len(event_middle.attributes), 3) # attribute 2, 3 and 4
|
||||||
|
# Force pull from the last one
|
||||||
|
# last.site_admin_connector.server_pull(last.sync_servers[0])
|
||||||
|
# time.sleep(6)
|
||||||
|
# event_last = last.user_connector.get_event(event.uuid)
|
||||||
|
# self.assertEqual(len(event_last.attributes), 2) # attribute 3 and 4
|
||||||
|
finally:
|
||||||
|
source.org_admin_connector.delete_event(event)
|
||||||
|
middle.site_admin_connector.delete_event(event_middle)
|
||||||
|
last.site_admin_connector.delete_event(event_last)
|
||||||
|
|
||||||
|
def test_complex_event_pull(self):
|
||||||
|
'''Test pull'''
|
||||||
|
event = self.create_complex_event()
|
||||||
|
try:
|
||||||
|
source = self.instances[0]
|
||||||
|
middle = self.instances[1]
|
||||||
|
last = self.instances[2]
|
||||||
|
|
||||||
|
event = source.org_admin_connector.add_event(event)
|
||||||
|
source.org_admin_connector.publish(event)
|
||||||
|
middle.site_admin_connector.server_pull(middle.sync_servers[0])
|
||||||
|
time.sleep(6)
|
||||||
|
last.site_admin_connector.server_pull(last.sync_servers[1])
|
||||||
|
time.sleep(6)
|
||||||
|
event_middle = middle.user_connector.get_event(event.uuid)
|
||||||
|
event_last = last.user_connector.get_event(event.uuid)
|
||||||
|
self.assertEqual(len(event_middle.attributes), 3) # attribute 2, 3 and 4
|
||||||
|
self.assertEqual(len(event_middle.objects[0].attributes), 1) # attribute 2
|
||||||
|
self.assertEqual(len(event_last.attributes), 2) # attribute 3, 4
|
||||||
|
self.assertEqual(len(event_last.objects[0].attributes), 1)
|
||||||
|
# Test if event is properly sanitized
|
||||||
|
event_middle_as_site_admin = middle.site_admin_connector.get_event(event.uuid)
|
||||||
|
self.assertEqual(len(event_middle_as_site_admin.attributes), 3) # attribute 2, 3 and 4
|
||||||
|
self.assertEqual(len(event_middle_as_site_admin.objects[0].attributes), 1) # attribute 2
|
||||||
|
finally:
|
||||||
|
source.org_admin_connector.delete_event(event)
|
||||||
|
middle.site_admin_connector.delete_event(event_middle)
|
||||||
|
last.site_admin_connector.delete_event(event_last)
|
||||||
|
|
||||||
|
def test_sharing_group(self):
|
||||||
|
'''Test Sharing Group'''
|
||||||
|
event = self.create_complex_event()
|
||||||
|
try:
|
||||||
|
source = self.instances[0]
|
||||||
|
source.site_admin_connector.update_server({'push': True}, source.sync_servers[0].id)
|
||||||
|
middle = self.instances[1]
|
||||||
|
middle.site_admin_connector.update_server({'push': True}, middle.sync_servers[1].id) # Enable automatic push to 3rd instance
|
||||||
|
last = self.instances[2]
|
||||||
|
|
||||||
|
sg = MISPSharingGroup()
|
||||||
|
sg.name = 'Testcases SG'
|
||||||
|
sg.releasability = 'Testing'
|
||||||
|
sharing_group = source.site_admin_connector.add_sharing_group(sg)
|
||||||
|
a = source.site_admin_connector.add_org_to_sharing_group(sharing_group, middle.test_org.uuid)
|
||||||
|
|
||||||
|
a = event.add_attribute('text', 'SG only attr')
|
||||||
|
a.distribution = Distribution.sharing_group
|
||||||
|
a.sharing_group_id = sharing_group.id
|
||||||
|
|
||||||
|
event = source.org_admin_connector.add_event(event)
|
||||||
|
source.org_admin_connector.publish(event)
|
||||||
|
time.sleep(60)
|
||||||
|
|
||||||
|
event_middle = middle.user_connector.get_event(event.uuid)
|
||||||
|
event_last = last.user_connector.get_event(event.uuid)
|
||||||
|
self.assertEqual(len(event_middle.attributes), 3)
|
||||||
|
self.assertEqual(len(event_last.attributes), 1)
|
||||||
|
# Test if event is properly sanitized
|
||||||
|
event_middle_as_site_admin = middle.site_admin_connector.get_event(event.uuid)
|
||||||
|
self.assertEqual(len(event_middle_as_site_admin.attributes), 3)
|
||||||
|
event_last_as_site_admin = last.site_admin_connector.get_event(event.uuid)
|
||||||
|
self.assertEqual(len(event_last_as_site_admin.attributes), 2) # FIXME: should be 1, I think.
|
||||||
|
finally:
|
||||||
|
source.org_admin_connector.delete_event(event)
|
||||||
|
middle.site_admin_connector.delete_event(event_middle)
|
||||||
|
last.site_admin_connector.delete_event(event_last)
|
||||||
|
source.site_admin_connector.delete_sharing_group(sharing_group.id)
|
||||||
|
|
Loading…
Reference in New Issue