2015-09-22 15:25:47 +02:00
|
|
|
#!/usr/bin/env python
|
|
|
|
# -*- coding: utf-8 -*-
|
2017-12-01 16:15:46 +01:00
|
|
|
from pymisp import PyMISP, __version__
|
2018-09-19 06:58:20 +02:00
|
|
|
try:
|
|
|
|
from keys import url, key
|
|
|
|
except ImportError as e:
|
|
|
|
print(e)
|
2019-07-18 14:05:08 +02:00
|
|
|
url = 'https://localhost:8443'
|
|
|
|
key = 'K5yV0CcxdnklzDfCKlnPniIxrMX41utQ2dG13zZ3'
|
2018-09-19 06:58:20 +02:00
|
|
|
|
2015-09-22 15:25:47 +02:00
|
|
|
import time
|
|
|
|
|
|
|
|
import unittest
|
|
|
|
|
2015-11-25 09:51:22 +01:00
|
|
|
|
2015-09-22 15:25:47 +02:00
|
|
|
class TestBasic(unittest.TestCase):
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
self.maxDiff = None
|
2019-08-26 19:40:14 +02:00
|
|
|
self.misp = PyMISP(url, key, False, 'json')
|
2017-12-01 14:02:04 +01:00
|
|
|
self.live_describe_types = self.misp.get_live_describe_types()
|
2015-09-22 15:25:47 +02:00
|
|
|
|
|
|
|
def _clean_event(self, event):
|
2016-01-04 17:21:00 +01:00
|
|
|
event['Event'].pop('orgc_id', None)
|
2015-09-22 15:25:47 +02:00
|
|
|
event['Event'].pop('uuid', None)
|
2016-01-04 17:21:00 +01:00
|
|
|
event['Event'].pop('sharing_group_id', None)
|
2015-09-22 15:25:47 +02:00
|
|
|
event['Event'].pop('timestamp', None)
|
2016-01-04 17:21:00 +01:00
|
|
|
event['Event'].pop('org_id', None)
|
2015-09-22 15:25:47 +02:00
|
|
|
event['Event'].pop('date', None)
|
|
|
|
event['Event'].pop('RelatedEvent', None)
|
|
|
|
event['Event'].pop('publish_timestamp', None)
|
|
|
|
if event['Event'].get('Attribute'):
|
|
|
|
for a in event['Event'].get('Attribute'):
|
|
|
|
a.pop('uuid', None)
|
|
|
|
a.pop('event_id', None)
|
|
|
|
a.pop('id', None)
|
|
|
|
a.pop('timestamp', None)
|
2016-01-04 17:21:00 +01:00
|
|
|
if event['Event'].get('Orgc'):
|
|
|
|
event['Event']['Orgc'].pop('uuid', None)
|
|
|
|
event['Event']['Orgc'].pop('id', None)
|
|
|
|
if event['Event'].get('Org'):
|
|
|
|
event['Event']['Org'].pop('uuid', None)
|
|
|
|
event['Event']['Org'].pop('id', None)
|
2015-09-22 15:25:47 +02:00
|
|
|
return event['Event'].pop('id', None)
|
|
|
|
|
|
|
|
def new_event(self):
|
|
|
|
event = self.misp.new_event(0, 1, 0, "This is a test")
|
|
|
|
event_id = self._clean_event(event)
|
2015-09-26 21:00:33 +02:00
|
|
|
to_check = {u'Event': {u'info': u'This is a test', u'locked': False,
|
2017-03-10 16:32:15 +01:00
|
|
|
u'attribute_count': u'0', 'disable_correlation': False, u'analysis': u'0',
|
2015-09-26 21:00:33 +02:00
|
|
|
u'ShadowAttribute': [], u'published': False,
|
2017-02-21 09:39:26 +01:00
|
|
|
u'distribution': u'0', u'event_creator_email': u'admin@admin.test', u'Attribute': [], u'proposal_email_lock': False,
|
2018-04-12 18:34:50 +02:00
|
|
|
u'extends_uuid': '',
|
2019-08-26 19:40:14 +02:00
|
|
|
u'Object': [], u'Org': {'local': True, u'name': u'ORGNAME'},
|
|
|
|
u'Orgc': {'local': True, u'name': u'ORGNAME'},
|
2017-01-07 03:03:02 +01:00
|
|
|
u'Galaxy': [],
|
2016-12-07 14:35:49 +01:00
|
|
|
u'threat_level_id': u'1'}}
|
|
|
|
self.assertEqual(event, to_check, 'Failed at creating a new Event')
|
2015-09-22 15:25:47 +02:00
|
|
|
return int(event_id)
|
|
|
|
|
|
|
|
def add_hashes(self, eventid):
|
|
|
|
r = self.misp.get_event(eventid)
|
|
|
|
event = r.json()
|
2018-01-12 22:01:08 +01:00
|
|
|
event = self.misp.add_hashes(event,
|
|
|
|
category='Payload installation',
|
|
|
|
filename='dll_installer.dll',
|
|
|
|
md5='0a209ac0de4ac033f31d6ba9191a8f7a',
|
|
|
|
sha1='1f0ae54ac3f10d533013f74f48849de4e65817a7',
|
|
|
|
sha256='003315b0aea2fcb9f77d29223dd8947d0e6792b3a0227e054be8eb2a11f443d9',
|
|
|
|
ssdeep=None,
|
|
|
|
comment='Fanny modules',
|
|
|
|
to_ids=False,
|
|
|
|
distribution=2,
|
|
|
|
proposal=False)
|
2015-09-22 15:25:47 +02:00
|
|
|
self._clean_event(event)
|
|
|
|
to_check = {u'Event': {u'info': u'This is a test', u'locked': False,
|
|
|
|
u'attribute_count': u'3', u'analysis': u'0',
|
2017-02-21 09:39:26 +01:00
|
|
|
u'ShadowAttribute': [], u'published': False, u'distribution': u'0', u'event_creator_email': u'admin@admin.test',
|
2019-08-26 19:40:14 +02:00
|
|
|
u'Org': {'local': True, u'name': u'ORGNAME'},
|
|
|
|
u'Orgc': {'local': True, u'name': u'ORGNAME'},
|
2016-12-07 10:01:55 +01:00
|
|
|
u'Galaxy': [],
|
2015-09-22 15:25:47 +02:00
|
|
|
u'Attribute': [
|
|
|
|
{u'category': u'Payload installation', u'comment': u'Fanny modules',
|
|
|
|
u'to_ids': False, u'value': u'dll_installer.dll|0a209ac0de4ac033f31d6ba9191a8f7a',
|
|
|
|
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|md5'},
|
|
|
|
{u'category': u'Payload installation', u'comment': u'Fanny modules',
|
|
|
|
u'to_ids': False, u'value': u'dll_installer.dll|1f0ae54ac3f10d533013f74f48849de4e65817a7',
|
|
|
|
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha1'},
|
|
|
|
{u'category': u'Payload installation', u'comment': u'Fanny modules',
|
|
|
|
u'to_ids': False, u'value': u'dll_installer.dll|003315b0aea2fcb9f77d29223dd8947d0e6792b3a0227e054be8eb2a11f443d9',
|
|
|
|
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha256'}],
|
|
|
|
u'proposal_email_lock': False, u'threat_level_id': u'1'}}
|
2015-09-26 21:00:33 +02:00
|
|
|
self.assertEqual(event, to_check, 'Failed at adding hashes')
|
2015-09-22 15:25:47 +02:00
|
|
|
|
|
|
|
def publish(self, eventid):
|
|
|
|
r = self.misp.get_event(eventid)
|
|
|
|
event = r.json()
|
|
|
|
event = self.misp.publish(event)
|
|
|
|
self._clean_event(event)
|
|
|
|
to_check = {u'Event': {u'info': u'This is a test', u'locked': False,
|
|
|
|
u'attribute_count': u'3', u'analysis': u'0',
|
2017-02-21 09:39:26 +01:00
|
|
|
u'ShadowAttribute': [], u'published': True, u'distribution': u'0', u'event_creator_email': u'admin@admin.test',
|
2019-08-26 19:40:14 +02:00
|
|
|
u'Org': {'local': True, u'name': u'ORGNAME'},
|
|
|
|
u'Orgc': {'local': True, u'name': u'ORGNAME'},
|
2016-12-07 10:01:55 +01:00
|
|
|
u'Galaxy': [],
|
2015-09-22 15:25:47 +02:00
|
|
|
u'Attribute': [
|
|
|
|
{u'category': u'Payload installation', u'comment': u'Fanny modules',
|
|
|
|
u'to_ids': False, u'value': u'dll_installer.dll|0a209ac0de4ac033f31d6ba9191a8f7a',
|
|
|
|
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|md5'},
|
|
|
|
{u'category': u'Payload installation', u'comment': u'Fanny modules',
|
|
|
|
u'to_ids': False, u'value': u'dll_installer.dll|1f0ae54ac3f10d533013f74f48849de4e65817a7',
|
|
|
|
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha1'},
|
|
|
|
{u'category': u'Payload installation', u'comment': u'Fanny modules',
|
|
|
|
u'to_ids': False, u'value': u'dll_installer.dll|003315b0aea2fcb9f77d29223dd8947d0e6792b3a0227e054be8eb2a11f443d9',
|
|
|
|
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha256'}],
|
|
|
|
u'proposal_email_lock': False, u'threat_level_id': u'1'}}
|
2015-09-26 21:00:33 +02:00
|
|
|
self.assertEqual(event, to_check, 'Failed at publishing event')
|
2015-09-22 15:25:47 +02:00
|
|
|
|
|
|
|
def delete(self, eventid):
|
|
|
|
event = self.misp.delete_event(eventid)
|
2016-08-12 14:30:50 +02:00
|
|
|
print(event)
|
2015-09-26 21:00:33 +02:00
|
|
|
|
|
|
|
def delete_attr(self, attrid):
|
|
|
|
event = self.misp.delete_attribute(attrid)
|
2016-08-12 14:30:50 +02:00
|
|
|
print(event)
|
2015-09-26 21:00:33 +02:00
|
|
|
|
|
|
|
def get(self, eventid):
|
|
|
|
event = self.misp.get_event(eventid)
|
2016-08-12 14:30:50 +02:00
|
|
|
print(event)
|
2015-09-26 21:00:33 +02:00
|
|
|
|
2016-07-14 13:55:37 +02:00
|
|
|
def get_stix(self, **kwargs):
|
|
|
|
event = self.misp.get_stix(kwargs)
|
|
|
|
print(event)
|
|
|
|
|
2015-09-26 21:00:33 +02:00
|
|
|
def add(self):
|
|
|
|
event = {u'Event': {u'info': u'This is a test', u'locked': False,
|
|
|
|
u'attribute_count': u'3', u'analysis': u'0',
|
2017-02-21 09:39:26 +01:00
|
|
|
u'ShadowAttribute': [], u'published': False, u'distribution': u'0', u'event_creator_email': u'admin@admin.test',
|
2015-09-26 21:00:33 +02:00
|
|
|
u'Attribute': [
|
|
|
|
{u'category': u'Payload installation', u'comment': u'Fanny modules',
|
|
|
|
u'to_ids': False, u'value': u'dll_installer.dll|0a209ac0de4ac033f31d6ba9191a8f7a',
|
|
|
|
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|md5'},
|
|
|
|
{u'category': u'Payload installation', u'comment': u'Fanny modules',
|
|
|
|
u'to_ids': False, u'value': u'dll_installer.dll|1f0ae54ac3f10d533013f74f48849de4e65817a7',
|
|
|
|
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha1'},
|
|
|
|
{u'category': u'Payload installation', u'comment': u'Fanny modules',
|
|
|
|
u'to_ids': False, u'value': u'dll_installer.dll|003315b0aea2fcb9f77d29223dd8947d0e6792b3a0227e054be8eb2a11f443d9',
|
|
|
|
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha256'}],
|
|
|
|
u'proposal_email_lock': False, u'threat_level_id': u'1'}}
|
|
|
|
event = self.misp.add_event(event)
|
2016-08-12 14:30:50 +02:00
|
|
|
print(event)
|
2015-09-26 21:00:33 +02:00
|
|
|
|
2017-04-12 17:52:26 +02:00
|
|
|
def add_user(self):
|
|
|
|
email = 'test@misp.local'
|
|
|
|
role_id = '5'
|
|
|
|
org_id = '1'
|
|
|
|
password = 'Password1234!'
|
2017-04-13 16:56:28 +02:00
|
|
|
external_auth_required = False
|
2017-04-12 17:52:26 +02:00
|
|
|
external_auth_key = ''
|
2017-04-13 16:56:28 +02:00
|
|
|
enable_password = False
|
2017-04-12 17:52:26 +02:00
|
|
|
nids_sid = '1238717'
|
|
|
|
server_id = '1'
|
|
|
|
gpgkey = ''
|
|
|
|
certif_public = ''
|
|
|
|
autoalert = False
|
|
|
|
contactalert = False
|
|
|
|
disabled = False
|
|
|
|
change_pw = '0'
|
|
|
|
termsaccepted = False
|
|
|
|
newsread = '0'
|
|
|
|
authkey = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
|
2017-04-13 16:56:28 +02:00
|
|
|
to_check = {'User': {'email': email, 'org_id': org_id, 'role_id': role_id,
|
|
|
|
'password': password, 'external_auth_required': external_auth_required,
|
|
|
|
'external_auth_key': external_auth_key, 'enable_password': enable_password,
|
|
|
|
'nids_sid': nids_sid, 'server_id': server_id, 'gpgkey': gpgkey,
|
|
|
|
'certif_public': certif_public, 'autoalert': autoalert,
|
|
|
|
'contactalert': contactalert, 'disabled': disabled,
|
|
|
|
'change_pw': change_pw, 'termsaccepted': termsaccepted,
|
|
|
|
'newsread': newsread, 'authkey': authkey}}
|
2017-04-12 17:52:26 +02:00
|
|
|
user = self.misp.add_user(email=email,
|
|
|
|
role_id=role_id,
|
|
|
|
org_id=org_id,
|
|
|
|
password=password,
|
|
|
|
external_auth_required=external_auth_required,
|
|
|
|
external_auth_key=external_auth_key,
|
|
|
|
enable_password=enable_password,
|
|
|
|
nids_sid=nids_sid,
|
|
|
|
server_id=server_id,
|
|
|
|
gpgkey=gpgkey,
|
|
|
|
certif_public=certif_public,
|
|
|
|
autoalert=autoalert,
|
|
|
|
contactalert=contactalert,
|
|
|
|
disabled=disabled,
|
|
|
|
change_pw=change_pw,
|
|
|
|
termsaccepted=termsaccepted,
|
|
|
|
newsread=newsread,
|
|
|
|
authkey=authkey)
|
|
|
|
# delete user to allow reuse of test
|
|
|
|
uid = user.get('User').get('id')
|
|
|
|
self.misp.delete_user(uid)
|
|
|
|
# ----------------------------------
|
|
|
|
# test interesting keys only (some keys are modified(password) and some keys are added (lastlogin)
|
2017-04-13 16:56:28 +02:00
|
|
|
tested_keys = ['email', 'org_id', 'role_id', 'server_id', 'autoalert',
|
|
|
|
'authkey', 'gpgkey', 'certif_public', 'nids_sid', 'termsaccepted',
|
|
|
|
'newsread', 'contactalert', 'disabled']
|
|
|
|
for k in tested_keys:
|
|
|
|
self.assertEqual(user.get('User').get(k), to_check.get('User').get(k), "Failed to match input with output on key: {}".format(k))
|
2017-04-12 17:52:26 +02:00
|
|
|
|
|
|
|
def add_organisation(self):
|
2017-04-13 16:56:28 +02:00
|
|
|
name = 'Organisation tests'
|
|
|
|
description = 'This is a test organisation'
|
|
|
|
orgtype = 'Type is a string'
|
|
|
|
nationality = 'French'
|
|
|
|
sector = 'Bank sector'
|
|
|
|
uuid = '16fd2706-8baf-433b-82eb-8c7fada847da'
|
|
|
|
contacts = 'Text field with no limitations'
|
|
|
|
local = False
|
|
|
|
to_check = {'Organisation': {'name': name, 'description': description,
|
|
|
|
'type': orgtype, 'nationality': nationality,
|
|
|
|
'sector': sector, 'uuid': uuid, 'contacts': contacts,
|
|
|
|
'local': local}}
|
2017-04-12 17:52:26 +02:00
|
|
|
org = self.misp.add_organisation(name=name,
|
|
|
|
description=description,
|
|
|
|
type=orgtype,
|
|
|
|
nationality=nationality,
|
|
|
|
sector=sector,
|
|
|
|
uuid=uuid,
|
|
|
|
contacts=contacts,
|
|
|
|
local=local,
|
|
|
|
)
|
|
|
|
# delete organisation to allow reuse of test
|
|
|
|
oid = org.get('Organisation').get('id')
|
|
|
|
self.misp.delete_organisation(oid)
|
|
|
|
# ----------------------------------
|
2017-04-13 16:56:28 +02:00
|
|
|
tested_keys = ['anonymise', 'contacts', 'description', 'local', 'name',
|
|
|
|
'nationality', 'sector', 'type', 'uuid']
|
|
|
|
for k in tested_keys:
|
|
|
|
self.assertEqual(org.get('Organisation').get(k), to_check.get('Organisation').get(k), "Failed to match input with output on key: {}".format(k))
|
2017-04-12 17:52:26 +02:00
|
|
|
|
2015-09-26 21:00:33 +02:00
|
|
|
def test_create_event(self):
|
|
|
|
eventid = self.new_event()
|
|
|
|
time.sleep(1)
|
|
|
|
self.delete(eventid)
|
2015-09-22 15:25:47 +02:00
|
|
|
|
2015-09-26 21:00:33 +02:00
|
|
|
def test_get_event(self):
|
2015-09-22 15:25:47 +02:00
|
|
|
eventid = self.new_event()
|
|
|
|
time.sleep(1)
|
2015-09-26 21:00:33 +02:00
|
|
|
self.get(eventid)
|
2015-09-22 15:25:47 +02:00
|
|
|
time.sleep(1)
|
|
|
|
self.delete(eventid)
|
|
|
|
|
2015-09-26 21:00:33 +02:00
|
|
|
def test_add_event(self):
|
|
|
|
self.add()
|
|
|
|
time.sleep(1)
|
|
|
|
self.delete(1)
|
|
|
|
|
|
|
|
def test_del_attr(self):
|
|
|
|
eventid = self.new_event()
|
|
|
|
time.sleep(1)
|
|
|
|
self.delete_attr(1)
|
|
|
|
time.sleep(1)
|
|
|
|
self.delete(eventid)
|
|
|
|
|
2016-08-16 11:44:08 +02:00
|
|
|
def test_one_or_more(self):
|
|
|
|
self.assertEqual(self.misp._one_or_more(1), (1,))
|
|
|
|
self.assertEqual(self.misp._one_or_more([1]), [1])
|
2015-09-26 21:00:33 +02:00
|
|
|
|
2017-04-12 17:52:26 +02:00
|
|
|
def test_create_user(self):
|
|
|
|
self.add_user()
|
|
|
|
|
|
|
|
def test_create_organisation(self):
|
|
|
|
self.add_organisation()
|
|
|
|
|
2017-12-01 14:02:04 +01:00
|
|
|
def test_describeTypes_sane_default(self):
|
|
|
|
sane_default = self.live_describe_types['sane_defaults']
|
|
|
|
self.assertEqual(sorted(sane_default.keys()), sorted(self.live_describe_types['types']))
|
|
|
|
|
2017-12-01 14:08:51 +01:00
|
|
|
def test_describeTypes_categories(self):
|
2017-12-01 14:02:04 +01:00
|
|
|
category_type_mappings = self.live_describe_types['category_type_mappings']
|
|
|
|
self.assertEqual(sorted(category_type_mappings.keys()), sorted(self.live_describe_types['categories']))
|
|
|
|
|
2017-12-01 14:08:51 +01:00
|
|
|
def test_describeTypes_types_in_categories(self):
|
2017-12-01 14:02:04 +01:00
|
|
|
category_type_mappings = self.live_describe_types['category_type_mappings']
|
|
|
|
for category, types in category_type_mappings.items():
|
2019-08-16 11:28:07 +02:00
|
|
|
existing_types = [t for t in types if t in self.live_describe_types['types']]
|
|
|
|
self.assertEqual(sorted(existing_types), sorted(types))
|
2017-12-01 14:02:04 +01:00
|
|
|
|
2017-12-01 14:08:51 +01:00
|
|
|
def test_describeTypes_types_have_category(self):
|
2017-12-01 14:02:04 +01:00
|
|
|
category_type_mappings = self.live_describe_types['category_type_mappings']
|
|
|
|
all_types = set()
|
|
|
|
for category, types in category_type_mappings.items():
|
|
|
|
all_types.update(types)
|
|
|
|
self.assertEqual(sorted(list(all_types)), sorted(self.live_describe_types['types']))
|
|
|
|
|
2017-12-01 14:08:51 +01:00
|
|
|
def test_describeTypes_sane_default_valid_category(self):
|
2017-12-01 14:02:04 +01:00
|
|
|
sane_default = self.live_describe_types['sane_defaults']
|
|
|
|
categories = self.live_describe_types['categories']
|
|
|
|
for t, sd in sane_default.items():
|
|
|
|
self.assertTrue(sd['to_ids'] in [0, 1])
|
|
|
|
self.assertTrue(sd['default_category'] in categories)
|
|
|
|
|
2017-12-01 14:35:19 +01:00
|
|
|
def test_live_acl(self):
|
|
|
|
query_acl = self.misp.get_live_query_acl()
|
2017-12-01 14:40:52 +01:00
|
|
|
self.assertEqual(query_acl['response'], [])
|
2017-12-01 14:35:19 +01:00
|
|
|
|
2017-12-01 16:15:46 +01:00
|
|
|
def test_recommended_pymisp_version(self):
|
|
|
|
response = self.misp.get_recommended_api_version()
|
|
|
|
recommended_version_tup = tuple(int(x) for x in response['version'].split('.'))
|
|
|
|
pymisp_version_tup = tuple(int(x) for x in __version__.split('.'))[:3]
|
|
|
|
self.assertEqual(recommended_version_tup, pymisp_version_tup)
|
|
|
|
|
2017-12-01 14:02:04 +01:00
|
|
|
|
2015-09-22 15:25:47 +02:00
|
|
|
if __name__ == '__main__':
|
|
|
|
unittest.main()
|