chg: Add tests cases for sync, bump describeTypes

pull/436/head
Raphaël Vinot 2019-08-12 14:12:40 +02:00
parent 29cc7142ff
commit 2d37c68bd7
4 changed files with 1087 additions and 1040 deletions

View File

@ -500,14 +500,14 @@ class ExpandedPyMISP(PyMISP):
# ## BEGIN Sighting ###
def sightings(self, misp_entity: AbstractMISP, org: Union[MISPOrganisation, int, str, UUID]=None, pythonify: bool=False):
def sightings(self, misp_entity: AbstractMISP=None, org: Union[MISPOrganisation, int, str, UUID]=None, pythonify: bool=False):
"""Get the list of sighting related to a MISPEvent or a MISPAttribute (depending on type of misp_entity)"""
if isinstance(misp_entity, MISPEvent):
context = 'event'
elif isinstance(misp_entity, MISPAttribute):
context = 'attribute'
else:
raise PyMISPError('misp_entity can only be a MISPEvent or a MISPAttribute')
context = None
if org is not None:
org_id = self.__get_uuid_or_id_from_abstract_misp(org)
else:
@ -519,10 +519,15 @@ class ExpandedPyMISP(PyMISP):
url = f'{url}/{org_id}'
sightings = self._prepare_request('POST', url)
else:
to_post = {'id': misp_entity.id, 'context': context}
if context is None:
url = 'sightings'
to_post = {}
else:
url = 'sightings/listSightings'
to_post = {'id': misp_entity.id, 'context': context}
if org_id:
to_post['org_id'] = org_id
sightings = self._prepare_request('POST', 'sightings/listSightings', data=to_post)
sightings = self._prepare_request('POST', url, data=to_post)
sightings = self._check_response(sightings, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in sightings:

File diff suppressed because it is too large Load Diff

View File

@ -1535,8 +1535,18 @@ class TestComprehensive(unittest.TestCase):
def test_describe_types(self):
remote = self.admin_misp_connector.describe_types_remote
remote_types = remote.pop('types')
remote_categories = remote.pop('categories')
remote_category_type_mappings = remote.pop('category_type_mappings')
local = self.admin_misp_connector.describe_types_local
local_types = local.pop('types')
local_categories = local.pop('categories')
local_category_type_mappings = local.pop('category_type_mappings')
self.assertDictEqual(remote, local)
self.assertEqual(sorted(remote_types), sorted(local_types))
self.assertEqual(sorted(remote_categories), sorted(local_categories))
for category, mapping in remote_category_type_mappings.items():
self.assertEqual(sorted(local_category_type_mappings[category]), sorted(mapping))
def test_versions(self):
self.assertEqual(self.user_misp_connector.version, self.user_misp_connector.pymisp_version_master)

View File

@ -123,6 +123,8 @@ class MISPInstance():
self.site_admin_connector.set_server_setting('MISP.external_baseurl', params['external_baseurl'], force=True)
# Setup baseurl
self.site_admin_connector.set_server_setting('MISP.baseurl', params['url'], force=True)
# Setup host org
self.site_admin_connector.set_server_setting('MISP.host_org_id', self.test_org.id)
self.external_base_url = params['external_baseurl']
self.sync = []
@ -169,6 +171,24 @@ class MISPInstance():
# Delete org
self.initial_user_connector.delete_organisation(self.test_org.id)
# Make sure the instance is back to a clean state
if self.initial_user_connector.events():
raise Exception(f'Events still on the instance {self.external_base_url}')
if self.initial_user_connector.attributes():
raise Exception(f'Attributes still on the instance {self.external_base_url}')
if self.initial_user_connector.attribute_proposals():
raise Exception(f'AttributeProposals still on the instance {self.external_base_url}')
if self.initial_user_connector.sightings():
raise Exception(f'Sightings still on the instance {self.external_base_url}')
if self.initial_user_connector.servers():
raise Exception(f'Servers still on the instance {self.external_base_url}')
if self.initial_user_connector.sharing_groups():
raise Exception(f'SharingGroups still on the instance {self.external_base_url}')
if len(self.initial_user_connector.organisations()) > 1:
raise Exception(f'Organisations still on the instance {self.external_base_url}')
if len(self.initial_user_connector.users()) > 1:
raise Exception(f'Users still on the instance {self.external_base_url}')
class TestSync(unittest.TestCase):
@ -231,7 +251,7 @@ class TestSync(unittest.TestCase):
def test_simple_sync(self):
'''Test simple event, push to one server'''
event = MISPEvent()
event.info = 'Event created on first instance'
event.info = 'Event created on first instance - test_simple_sync'
event.distribution = Distribution.all_communities
event.add_attribute('ip-src', '1.1.1.1')
try:
@ -251,7 +271,7 @@ class TestSync(unittest.TestCase):
def test_sync_community(self):
'''Simple event, this community only, pull from member of the community'''
event = MISPEvent()
event.info = 'Event created on first instance'
event.info = 'Event created on first instance - test_sync_community'
event.distribution = Distribution.this_community_only
event.add_attribute('ip-src', '1.1.1.1')
try:
@ -270,7 +290,7 @@ class TestSync(unittest.TestCase):
def test_sync_all_communities(self):
'''Simple event, all communities, enable automatic push on two sub-instances'''
event = MISPEvent()
event.info = 'Event created on first instance'
event.info = 'Event created on first instance - test_sync_all_communities'
event.distribution = Distribution.all_communities
event.add_attribute('ip-src', '1.1.1.1')
try:
@ -292,6 +312,8 @@ class TestSync(unittest.TestCase):
source.org_admin_connector.delete_event(event)
middle.site_admin_connector.delete_event(middle_event)
last.site_admin_connector.delete_event(last_event)
source.site_admin_connector.update_server({'push': False}, source.sync_servers[0].id)
middle.site_admin_connector.update_server({'push': False}, middle.sync_servers[1].id)
def create_complex_event(self):
event = MISPEvent()
@ -367,6 +389,8 @@ class TestSync(unittest.TestCase):
source.org_admin_connector.delete_event(event)
middle.site_admin_connector.delete_event(event_middle)
last.site_admin_connector.delete_event(event_last)
source.site_admin_connector.update_server({'push': False}, source.sync_servers[0].id)
middle.site_admin_connector.update_server({'push': False}, middle.sync_servers[1].id)
def test_complex_event_pull(self):
'''Test pull'''
@ -419,7 +443,7 @@ class TestSync(unittest.TestCase):
event = source.org_admin_connector.add_event(event)
source.org_admin_connector.publish(event)
time.sleep(60)
time.sleep(15)
event_middle = middle.user_connector.get_event(event.uuid)
event_last = last.user_connector.get_event(event.uuid)
@ -429,9 +453,17 @@ class TestSync(unittest.TestCase):
event_middle_as_site_admin = middle.site_admin_connector.get_event(event.uuid)
self.assertEqual(len(event_middle_as_site_admin.attributes), 3)
event_last_as_site_admin = last.site_admin_connector.get_event(event.uuid)
self.assertEqual(len(event_last_as_site_admin.attributes), 2) # FIXME: should be 1, I think.
self.assertEqual(len(event_last_as_site_admin.attributes), 1)
# Get sharing group from middle instance
sgs = middle.site_admin_connector.sharing_groups()
self.assertEqual(len(sgs), 1)
self.assertEqual(sgs[0].name, 'Testcases SG')
middle.site_admin_connector.delete_sharing_group(sgs[0])
finally:
source.org_admin_connector.delete_event(event)
middle.site_admin_connector.delete_event(event_middle)
last.site_admin_connector.delete_event(event_last)
source.site_admin_connector.delete_sharing_group(sharing_group.id)
middle.site_admin_connector.delete_sharing_group(sharing_group.id)
source.site_admin_connector.update_server({'push': False}, source.sync_servers[0].id)
middle.site_admin_connector.update_server({'push': False}, middle.sync_servers[1].id)