2018-08-12 22:46:21 +02:00
|
|
|
#!/usr/bin/env python3
|
2016-08-17 18:21:50 +02:00
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
2016-08-18 00:23:49 +02:00
|
|
|
import unittest
|
2016-08-17 18:21:50 +02:00
|
|
|
import requests_mock
|
|
|
|
import json
|
2016-09-27 19:47:22 +02:00
|
|
|
import os
|
2017-12-09 13:35:44 +01:00
|
|
|
import six
|
2018-01-26 21:36:09 +01:00
|
|
|
import sys
|
2017-12-09 13:12:04 +01:00
|
|
|
from io import BytesIO
|
2016-08-17 18:21:50 +02:00
|
|
|
|
2016-08-18 13:18:58 +02:00
|
|
|
import pymisp as pm
|
2016-08-17 18:21:50 +02:00
|
|
|
from pymisp import PyMISP
|
2016-12-03 17:29:41 +01:00
|
|
|
# from pymisp import NewEventError
|
2016-09-27 19:47:22 +02:00
|
|
|
from pymisp import MISPEvent
|
2017-08-25 17:14:33 +02:00
|
|
|
from pymisp import MISPEncode
|
2017-09-12 16:46:06 +02:00
|
|
|
|
2017-08-25 17:14:33 +02:00
|
|
|
from pymisp.tools import make_binary_objects
|
2016-08-17 18:21:50 +02:00
|
|
|
|
|
|
|
|
2018-01-24 15:19:31 +01:00
|
|
|
class MockPyMISP(PyMISP):
|
|
|
|
def _send_attributes(self, event, attributes, proposal=False):
|
|
|
|
return attributes
|
|
|
|
|
|
|
|
|
2016-08-17 18:21:50 +02:00
|
|
|
@requests_mock.Mocker()
|
2016-08-18 00:23:49 +02:00
|
|
|
class TestOffline(unittest.TestCase):
|
2016-08-17 18:21:50 +02:00
|
|
|
|
|
|
|
def setUp(self):
|
2016-08-18 00:40:30 +02:00
|
|
|
self.maxDiff = None
|
2016-08-17 18:21:50 +02:00
|
|
|
self.domain = 'http://misp.local/'
|
|
|
|
self.key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
|
2016-12-02 16:53:45 +01:00
|
|
|
with open('tests/misp_event.json', 'r') as f:
|
|
|
|
self.event = {'Event': json.load(f)}
|
|
|
|
with open('tests/new_misp_event.json', 'r') as f:
|
|
|
|
self.new_misp_event = {'Event': json.load(f)}
|
2017-07-27 17:14:49 +02:00
|
|
|
self.resources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../pymisp/data')
|
|
|
|
with open(os.path.join(self.resources_path, 'describeTypes.json'), 'r') as f:
|
2016-12-02 16:53:45 +01:00
|
|
|
self.types = json.load(f)
|
|
|
|
with open('tests/sharing_groups.json', 'r') as f:
|
|
|
|
self.sharing_groups = json.load(f)
|
2016-08-19 10:13:00 +02:00
|
|
|
self.auth_error_msg = {"name": "Authentication failed. Please make sure you pass the API key of an API enabled user along in the Authorization header.",
|
|
|
|
"message": "Authentication failed. Please make sure you pass the API key of an API enabled user along in the Authorization header.",
|
|
|
|
"url": "\/events\/1"}
|
2016-12-02 16:53:45 +01:00
|
|
|
with open('tests/search_index_result.json', 'r') as f:
|
|
|
|
self.search_index_result = json.load(f)
|
2016-08-17 18:21:50 +02:00
|
|
|
|
|
|
|
def initURI(self, m):
|
2016-08-19 10:13:00 +02:00
|
|
|
m.register_uri('GET', self.domain + 'events/1', json=self.auth_error_msg, status_code=403)
|
2017-01-27 13:17:38 +01:00
|
|
|
m.register_uri('GET', self.domain + 'servers/getVersion.json', json={"version": "2.4.62"})
|
|
|
|
m.register_uri('GET', self.domain + 'servers/getPyMISPVersion.json', json={"version": "2.4.62"})
|
2017-01-16 20:47:43 +01:00
|
|
|
m.register_uri('GET', self.domain + 'sharing_groups.json', json=self.sharing_groups)
|
2016-08-17 18:21:50 +02:00
|
|
|
m.register_uri('GET', self.domain + 'attributes/describeTypes.json', json=self.types)
|
|
|
|
m.register_uri('GET', self.domain + 'events/2', json=self.event)
|
2017-01-16 20:47:43 +01:00
|
|
|
m.register_uri('POST', self.domain + 'events/5758ebf5-c898-48e6-9fe9-5665c0a83866', json=self.event)
|
2016-08-18 00:23:49 +02:00
|
|
|
m.register_uri('DELETE', self.domain + 'events/2', json={'message': 'Event deleted.'})
|
|
|
|
m.register_uri('DELETE', self.domain + 'events/3', json={'errors': ['Invalid event'], 'message': 'Invalid event', 'name': 'Invalid event', 'url': '/events/3'})
|
2017-10-25 17:17:25 +02:00
|
|
|
m.register_uri('GET', self.domain + 'attributes/delete/2', json={'message': 'Attribute deleted.'})
|
2017-04-13 17:02:48 +02:00
|
|
|
m.register_uri('POST', self.domain + 'events/index', json=self.search_index_result)
|
2017-12-11 15:27:50 +01:00
|
|
|
m.register_uri('POST', self.domain + 'attributes/edit/' + self.key, json={})
|
|
|
|
m.register_uri('GET', self.domain + 'shadow_attributes/view/None', json={})
|
|
|
|
m.register_uri('GET', self.domain + 'shadow_attributes/view/1', json={})
|
|
|
|
m.register_uri('POST', self.domain + 'events/freeTextImport/1', json={})
|
|
|
|
m.register_uri('POST', self.domain + 'attributes/restSearch', json={})
|
|
|
|
m.register_uri('POST', self.domain + 'attributes/downloadSample', json={})
|
|
|
|
m.register_uri('GET', self.domain + 'tags', json={'Tag': 'foo'})
|
2018-01-24 15:31:09 +01:00
|
|
|
m.register_uri('POST', self.domain + 'events/upload_sample/1', json={})
|
2018-01-24 16:19:53 +01:00
|
|
|
m.register_uri('POST', self.domain + 'tags/attachTagToObject', json={})
|
|
|
|
m.register_uri('POST', self.domain + 'tags/removeTagFromObject', json={})
|
2016-08-17 18:21:50 +02:00
|
|
|
|
|
|
|
def test_getEvent(self, m):
|
|
|
|
self.initURI(m)
|
2016-08-26 18:23:02 +02:00
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
2016-08-18 00:23:49 +02:00
|
|
|
e1 = pymisp.get_event(2)
|
|
|
|
e2 = pymisp.get(2)
|
|
|
|
self.assertEqual(e1, e2)
|
|
|
|
self.assertEqual(self.event, e2)
|
|
|
|
|
|
|
|
def test_updateEvent(self, m):
|
|
|
|
self.initURI(m)
|
2016-08-26 18:23:02 +02:00
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
2017-01-16 20:47:43 +01:00
|
|
|
e0 = pymisp.update_event('5758ebf5-c898-48e6-9fe9-5665c0a83866', json.dumps(self.event))
|
|
|
|
e1 = pymisp.update_event('5758ebf5-c898-48e6-9fe9-5665c0a83866', self.event)
|
2016-08-18 00:23:49 +02:00
|
|
|
self.assertEqual(e0, e1)
|
2016-08-18 00:40:30 +02:00
|
|
|
e2 = pymisp.update(e0)
|
2016-08-18 00:23:49 +02:00
|
|
|
self.assertEqual(e1, e2)
|
|
|
|
self.assertEqual(self.event, e2)
|
|
|
|
|
|
|
|
def test_deleteEvent(self, m):
|
|
|
|
self.initURI(m)
|
2016-08-26 18:23:02 +02:00
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
2016-08-18 00:23:49 +02:00
|
|
|
d = pymisp.delete_event(2)
|
|
|
|
self.assertEqual(d, {'message': 'Event deleted.'})
|
|
|
|
d = pymisp.delete_event(3)
|
|
|
|
self.assertEqual(d, {'errors': ['Invalid event'], 'message': 'Invalid event', 'name': 'Invalid event', 'url': '/events/3'})
|
|
|
|
|
|
|
|
def test_deleteAttribute(self, m):
|
|
|
|
self.initURI(m)
|
2016-08-26 18:23:02 +02:00
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
2016-08-18 00:23:49 +02:00
|
|
|
d = pymisp.delete_attribute(2)
|
2016-08-18 00:40:30 +02:00
|
|
|
self.assertEqual(d, {'message': 'Attribute deleted.'})
|
|
|
|
|
2016-08-18 13:18:58 +02:00
|
|
|
def test_getVersions(self, m):
|
|
|
|
self.initURI(m)
|
2016-08-26 18:23:02 +02:00
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
2016-08-18 13:18:58 +02:00
|
|
|
api_version = pymisp.get_api_version()
|
|
|
|
self.assertEqual(api_version, {'version': pm.__version__})
|
|
|
|
server_version = pymisp.get_version()
|
2017-01-27 13:17:38 +01:00
|
|
|
self.assertEqual(server_version, {"version": "2.4.62"})
|
2016-08-18 13:18:58 +02:00
|
|
|
|
|
|
|
def test_getSharingGroups(self, m):
|
|
|
|
self.initURI(m)
|
2016-08-26 18:23:02 +02:00
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
2016-08-18 13:18:58 +02:00
|
|
|
sharing_groups = pymisp.get_sharing_groups()
|
2018-11-22 15:21:13 +01:00
|
|
|
print(sharing_groups)
|
|
|
|
self.assertEqual(sharing_groups['response'][0], self.sharing_groups[0])
|
2016-08-19 10:13:00 +02:00
|
|
|
|
|
|
|
def test_auth_error(self, m):
|
|
|
|
self.initURI(m)
|
2016-08-26 18:23:02 +02:00
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
2016-08-19 10:13:00 +02:00
|
|
|
error = pymisp.get(1)
|
|
|
|
response = self.auth_error_msg
|
|
|
|
response['errors'] = [response['message']]
|
2016-08-26 18:23:02 +02:00
|
|
|
self.assertEqual(error, response)
|
|
|
|
|
|
|
|
def test_newEvent(self, m):
|
2018-01-24 15:19:31 +01:00
|
|
|
error_empty_info = {'message': 'The event could not be saved.',
|
|
|
|
'name': 'Add event failed.',
|
|
|
|
'errors': ['Error in info: Info cannot be empty.'],
|
|
|
|
'url': '/events/add'}
|
|
|
|
error_empty_info_flatten = {u'message': u'The event could not be saved.',
|
|
|
|
u'name': u'Add event failed.',
|
|
|
|
u'errors': [u"Error in info: Info cannot be empty."],
|
|
|
|
u'url': u'/events/add'}
|
2016-08-26 18:23:02 +02:00
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
|
|
|
m.register_uri('POST', self.domain + 'events', json=error_empty_info)
|
2016-09-28 18:20:37 +02:00
|
|
|
# TODO Add test exception if info field isn't set
|
|
|
|
response = pymisp.new_event(0, 1, 0, 'Foo')
|
2016-08-26 18:23:02 +02:00
|
|
|
self.assertEqual(response, error_empty_info_flatten)
|
|
|
|
m.register_uri('POST', self.domain + 'events', json=self.new_misp_event)
|
|
|
|
response = pymisp.new_event(0, 1, 0, "This is a test.", '2016-08-26', False)
|
|
|
|
self.assertEqual(response, self.new_misp_event)
|
|
|
|
|
2016-09-27 19:47:22 +02:00
|
|
|
def test_eventObject(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
2016-10-10 13:42:06 +02:00
|
|
|
misp_event = MISPEvent(pymisp.describe_types)
|
2016-12-02 16:53:45 +01:00
|
|
|
with open('tests/57c4445b-c548-4654-af0b-4be3950d210f.json', 'r') as f:
|
|
|
|
misp_event.load(f.read())
|
2017-09-12 16:46:06 +02:00
|
|
|
json.dumps(misp_event, cls=MISPEncode)
|
2016-08-19 10:13:00 +02:00
|
|
|
|
2016-12-03 17:29:41 +01:00
|
|
|
def test_searchIndexByTagId(self, m):
|
2016-12-02 16:52:00 +01:00
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
|
|
|
response = pymisp.search_index(tag="1")
|
2016-12-03 17:29:41 +01:00
|
|
|
self.assertEqual(response['response'], self.search_index_result)
|
|
|
|
|
|
|
|
def test_searchIndexByTagName(self, m):
|
2016-12-02 18:52:56 +01:00
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
|
|
|
response = pymisp.search_index(tag='ecsirt:malicious-code="ransomware"')
|
2016-12-03 17:29:41 +01:00
|
|
|
self.assertEqual(response['response'], self.search_index_result)
|
|
|
|
|
2018-01-24 15:19:31 +01:00
|
|
|
def add_hashes(self, event, mock):
|
|
|
|
"""
|
|
|
|
Regression tests for #174
|
|
|
|
"""
|
|
|
|
hashes_fname = mock.add_hashes(event,
|
2018-09-17 22:59:25 +02:00
|
|
|
md5='68b329da9893e34099c7d8ad5cb9c940',
|
|
|
|
sha1='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
|
|
|
|
sha256='01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b',
|
|
|
|
filename='foobar.exe')
|
2018-01-24 15:19:31 +01:00
|
|
|
self.assertEqual(3, len(hashes_fname))
|
|
|
|
for attr in hashes_fname:
|
|
|
|
self.assertTrue(isinstance(attr, pm.mispevent.MISPAttribute))
|
|
|
|
self.assertIn("filename|", attr["type"])
|
|
|
|
|
|
|
|
hashes_only = mock.add_hashes(event, md5='68b329da9893e34099c7d8ad5cb9c940',
|
2018-09-17 22:59:25 +02:00
|
|
|
sha1='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
|
|
|
|
sha256='01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b')
|
2018-01-24 15:19:31 +01:00
|
|
|
self.assertEqual(3, len(hashes_only))
|
|
|
|
for attr in hashes_only:
|
|
|
|
self.assertTrue(isinstance(attr, pm.mispevent.MISPAttribute))
|
|
|
|
self.assertNotIn("filename|", attr["type"])
|
|
|
|
|
|
|
|
def add_regkeys(self, event, mock):
|
2016-12-01 10:49:12 +01:00
|
|
|
regkeys = {
|
2016-12-03 17:29:41 +01:00
|
|
|
'HKLM\\Software\\Microsoft\\Outlook\\Addins\\foo': None,
|
|
|
|
'HKLM\\Software\\Microsoft\\Outlook\\Addins\\bar': 'baz',
|
|
|
|
'HKLM\\Software\\Microsoft\\Outlook\\Addins\\bae': 0,
|
2016-12-01 10:49:12 +01:00
|
|
|
}
|
2018-01-24 15:19:31 +01:00
|
|
|
reg_attr = mock.add_regkeys(event, regkeys)
|
|
|
|
self.assertEqual(3, len(reg_attr))
|
|
|
|
for attr in reg_attr:
|
|
|
|
self.assertTrue(isinstance(attr, pm.mispevent.MISPAttribute))
|
|
|
|
self.assertIn("regkey", attr["type"])
|
|
|
|
|
|
|
|
key = mock.add_regkey(event, 'HKLM\\Software\\Microsoft\\Outlook\\Addins\\foobar')
|
|
|
|
self.assertEqual(len(key), 1)
|
|
|
|
self.assertEqual(key[0]["type"], "regkey")
|
|
|
|
|
|
|
|
key = mock.add_regkey(event, 'HKLM\\Software\\Microsoft\\Outlook\\Addins\\foobar', rvalue='foobar')
|
|
|
|
self.assertEqual(len(key), 1)
|
|
|
|
self.assertEqual(key[0]["type"], "regkey|value")
|
|
|
|
self.assertIn("foobar|foobar", key[0]["value"])
|
|
|
|
|
|
|
|
def test_addAttributes(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
p = MockPyMISP(self.domain, self.key)
|
|
|
|
evt = p.get(1)
|
|
|
|
|
|
|
|
self.add_hashes(evt, p)
|
|
|
|
self.add_regkeys(evt, p)
|
|
|
|
|
|
|
|
p.av_detection_link(evt, 'https://foocorp.com')
|
|
|
|
p.add_detection_name(evt, 'WATERMELON')
|
|
|
|
p.add_filename(evt, 'foobar.exe')
|
2016-12-01 10:49:12 +01:00
|
|
|
p.add_pattern(evt, '.*foobar.*', in_memory=True)
|
|
|
|
p.add_pattern(evt, '.*foobar.*', in_file=True)
|
2018-01-24 15:19:31 +01:00
|
|
|
p.add_mutex(evt, 'foo')
|
2016-12-01 10:49:12 +01:00
|
|
|
p.add_pipe(evt, 'foo')
|
|
|
|
p.add_pipe(evt, '\\.\\pipe\\foo')
|
2018-01-24 15:19:31 +01:00
|
|
|
|
|
|
|
self.assertRaises(pm.PyMISPError, p.add_pattern, evt, '.*foobar.*', in_memory=False, in_file=False)
|
|
|
|
|
|
|
|
self.assertEqual(3, len(p.add_pipe(evt, ['foo', 'bar', 'baz'])))
|
|
|
|
self.assertEqual(3, len(p.add_pipe(evt, ['foo', 'bar', '\\.\\pipe\\baz'])))
|
|
|
|
|
|
|
|
self.assertEqual(1, len(p.add_mutex(evt, '\\BaseNamedObjects\\foo')))
|
|
|
|
self.assertEqual(3, len(p.add_mutex(evt, ['foo', 'bar', 'baz'])))
|
|
|
|
self.assertEqual(3, len(p.add_mutex(evt, ['foo', 'bar', '\\BaseNamedObjects\\baz'])))
|
2016-12-01 10:49:12 +01:00
|
|
|
p.add_yara(evt, 'rule Foo {}')
|
2018-01-24 15:19:31 +01:00
|
|
|
self.assertEqual(2, len(p.add_yara(evt, ['rule Foo {}', 'rule Bar {}'])))
|
2016-12-01 10:49:12 +01:00
|
|
|
p.add_ipdst(evt, '1.2.3.4')
|
2018-01-24 15:19:31 +01:00
|
|
|
self.assertEqual(2, len(p.add_ipdst(evt, ['1.2.3.4', '5.6.7.8'])))
|
2016-12-01 10:49:12 +01:00
|
|
|
p.add_ipsrc(evt, '1.2.3.4')
|
2018-01-24 15:19:31 +01:00
|
|
|
self.assertEqual(2, len(p.add_ipsrc(evt, ['1.2.3.4', '5.6.7.8'])))
|
2016-12-01 10:49:12 +01:00
|
|
|
p.add_hostname(evt, 'a.foobar.com')
|
2018-01-24 15:19:31 +01:00
|
|
|
self.assertEqual(2, len(p.add_hostname(evt, ['a.foobar.com', 'a.foobaz.com'])))
|
2016-12-01 10:49:12 +01:00
|
|
|
p.add_domain(evt, 'foobar.com')
|
2018-01-24 15:19:31 +01:00
|
|
|
self.assertEqual(2, len(p.add_domain(evt, ['foobar.com', 'foobaz.com'])))
|
2016-12-01 10:49:12 +01:00
|
|
|
p.add_domain_ip(evt, 'foo.com', '1.2.3.4')
|
2018-01-24 15:19:31 +01:00
|
|
|
self.assertEqual(2, len(p.add_domain_ip(evt, 'foo.com', ['1.2.3.4', '5.6.7.8'])))
|
|
|
|
self.assertEqual(2, len(p.add_domains_ips(evt, {'foo.com': '1.2.3.4', 'bar.com': '4.5.6.7'})))
|
|
|
|
|
2016-12-01 10:49:12 +01:00
|
|
|
p.add_url(evt, 'https://example.com')
|
2018-01-24 15:19:31 +01:00
|
|
|
self.assertEqual(2, len(p.add_url(evt, ['https://example.com', 'http://foo.com'])))
|
|
|
|
|
2016-12-01 10:49:12 +01:00
|
|
|
p.add_useragent(evt, 'Mozilla')
|
2018-01-24 15:19:31 +01:00
|
|
|
self.assertEqual(2, len(p.add_useragent(evt, ['Mozilla', 'Godzilla'])))
|
|
|
|
|
2016-12-01 10:49:12 +01:00
|
|
|
p.add_traffic_pattern(evt, 'blabla')
|
|
|
|
p.add_snort(evt, 'blaba')
|
|
|
|
p.add_net_other(evt, 'blabla')
|
|
|
|
p.add_email_src(evt, 'foo@bar.com')
|
2016-12-03 17:29:41 +01:00
|
|
|
p.add_email_dst(evt, 'foo@bar.com')
|
2016-12-01 10:49:12 +01:00
|
|
|
p.add_email_subject(evt, 'you won the lottery')
|
|
|
|
p.add_email_attachment(evt, 'foo.doc')
|
|
|
|
p.add_target_email(evt, 'foo@bar.com')
|
|
|
|
p.add_target_user(evt, 'foo')
|
2016-12-03 17:29:41 +01:00
|
|
|
p.add_target_machine(evt, 'foobar')
|
|
|
|
p.add_target_org(evt, 'foobar')
|
|
|
|
p.add_target_location(evt, 'foobar')
|
2016-12-01 10:49:12 +01:00
|
|
|
p.add_target_external(evt, 'foobar')
|
|
|
|
p.add_threat_actor(evt, 'WATERMELON')
|
|
|
|
p.add_internal_link(evt, 'foobar')
|
|
|
|
p.add_internal_comment(evt, 'foobar')
|
|
|
|
p.add_internal_text(evt, 'foobar')
|
|
|
|
p.add_internal_other(evt, 'foobar')
|
2017-03-09 16:41:07 +01:00
|
|
|
p.add_attachment(evt, "testFile")
|
|
|
|
|
2017-12-09 13:12:04 +01:00
|
|
|
def make_objects(self, path=None, pseudofile=None, filename=None):
|
2017-08-25 17:14:33 +02:00
|
|
|
to_return = {'objects': [], 'references': []}
|
2017-12-09 13:12:04 +01:00
|
|
|
if path:
|
|
|
|
fo, peo, seos = make_binary_objects(path)
|
|
|
|
else:
|
|
|
|
fo, peo, seos = make_binary_objects(pseudofile=pseudofile, filename=filename)
|
2017-08-25 17:14:33 +02:00
|
|
|
|
|
|
|
if seos:
|
|
|
|
for s in seos:
|
2018-05-03 21:36:40 +02:00
|
|
|
for a in s.attributes:
|
|
|
|
del a.uuid
|
2017-08-25 17:14:33 +02:00
|
|
|
to_return['objects'].append(s)
|
2017-08-28 19:01:53 +02:00
|
|
|
if s.ObjectReference:
|
|
|
|
to_return['references'] += s.ObjectReference
|
2017-08-25 17:14:33 +02:00
|
|
|
|
|
|
|
if peo:
|
2018-05-03 21:36:40 +02:00
|
|
|
for a in peo.attributes:
|
|
|
|
del a.uuid
|
2017-08-25 17:14:33 +02:00
|
|
|
to_return['objects'].append(peo)
|
2017-08-28 19:01:53 +02:00
|
|
|
if peo.ObjectReference:
|
|
|
|
to_return['references'] += peo.ObjectReference
|
2017-08-25 17:14:33 +02:00
|
|
|
|
|
|
|
if fo:
|
2018-05-03 21:36:40 +02:00
|
|
|
for a in fo.attributes:
|
|
|
|
del a.uuid
|
2017-08-25 17:14:33 +02:00
|
|
|
to_return['objects'].append(fo)
|
2017-08-28 19:01:53 +02:00
|
|
|
if fo.ObjectReference:
|
|
|
|
to_return['references'] += fo.ObjectReference
|
2017-12-09 13:12:04 +01:00
|
|
|
|
|
|
|
# Remove UUIDs for comparing the objects.
|
|
|
|
for o in to_return['objects']:
|
|
|
|
o.pop('uuid')
|
|
|
|
for o in to_return['references']:
|
|
|
|
o.pop('referenced_uuid')
|
|
|
|
o.pop('object_uuid')
|
2017-08-25 17:14:33 +02:00
|
|
|
return json.dumps(to_return, cls=MISPEncode)
|
|
|
|
|
2017-12-09 13:12:04 +01:00
|
|
|
def test_objects_pseudofile(self, m):
|
2017-12-09 13:35:44 +01:00
|
|
|
if six.PY2:
|
|
|
|
return unittest.SkipTest()
|
2017-12-09 13:12:04 +01:00
|
|
|
paths = ['cmd.exe', 'tmux', 'MachO-OSX-x64-ls']
|
|
|
|
try:
|
|
|
|
for path in paths:
|
|
|
|
with open(os.path.join('tests', 'viper-test-files', 'test_files', path), 'rb') as f:
|
|
|
|
pseudo = BytesIO(f.read())
|
|
|
|
json_blob = self.make_objects(pseudofile=pseudo, filename=path)
|
|
|
|
# Compare pseudo file / path
|
|
|
|
filepath_blob = self.make_objects(os.path.join('tests', 'viper-test-files', 'test_files', path))
|
|
|
|
self.assertEqual(json_blob, filepath_blob)
|
|
|
|
except IOError: # Can be replaced with FileNotFoundError when support for python 2 is dropped
|
|
|
|
return unittest.SkipTest()
|
|
|
|
print(json_blob)
|
|
|
|
|
2017-08-25 17:14:33 +02:00
|
|
|
def test_objects(self, m):
|
|
|
|
paths = ['cmd.exe', 'tmux', 'MachO-OSX-x64-ls']
|
2017-09-20 11:15:33 +02:00
|
|
|
try:
|
|
|
|
for path in paths:
|
|
|
|
json_blob = self.make_objects(os.path.join('tests',
|
|
|
|
'viper-test-files', 'test_files', path))
|
|
|
|
except IOError: # Can be replaced with FileNotFoundError when support for python 2 is dropped
|
|
|
|
return unittest.SkipTest()
|
2017-08-25 17:14:33 +02:00
|
|
|
print(json_blob)
|
|
|
|
|
2017-12-01 10:36:09 +01:00
|
|
|
def test_describeTypes_sane_default(self, m):
|
|
|
|
sane_default = self.types['result']['sane_defaults']
|
|
|
|
self.assertEqual(sorted(sane_default.keys()), sorted(self.types['result']['types']))
|
|
|
|
|
|
|
|
def test_describeTypes_categories(self, m):
|
|
|
|
category_type_mappings = self.types['result']['category_type_mappings']
|
|
|
|
self.assertEqual(sorted(category_type_mappings.keys()), sorted(self.types['result']['categories']))
|
|
|
|
|
|
|
|
def test_describeTypes_types_in_categories(self, m):
|
|
|
|
category_type_mappings = self.types['result']['category_type_mappings']
|
|
|
|
for category, types in category_type_mappings.items():
|
|
|
|
existing_types = [t for t in types if t in self.types['result']['types']]
|
|
|
|
self.assertEqual(sorted(existing_types), sorted(types))
|
|
|
|
|
|
|
|
def test_describeTypes_types_have_category(self, m):
|
|
|
|
category_type_mappings = self.types['result']['category_type_mappings']
|
|
|
|
all_types = set()
|
|
|
|
for category, types in category_type_mappings.items():
|
|
|
|
all_types.update(types)
|
|
|
|
self.assertEqual(sorted(list(all_types)), sorted(self.types['result']['types']))
|
|
|
|
|
|
|
|
def test_describeTypes_sane_default_valid_category(self, m):
|
|
|
|
sane_default = self.types['result']['sane_defaults']
|
|
|
|
categories = self.types['result']['categories']
|
|
|
|
for t, sd in sane_default.items():
|
|
|
|
self.assertTrue(sd['to_ids'] in [0, 1])
|
|
|
|
self.assertTrue(sd['default_category'] in categories)
|
|
|
|
|
2017-12-11 14:00:43 +01:00
|
|
|
def test_flatten_error_messages_singular(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
2018-09-17 22:59:25 +02:00
|
|
|
pymisp.get(1)
|
2017-12-11 14:00:43 +01:00
|
|
|
response = self.auth_error_msg
|
|
|
|
response['error'] = ['foo', 'bar', 'baz']
|
|
|
|
messages = pymisp.flatten_error_messages(response)
|
|
|
|
self.assertEqual(["foo", "bar", "baz"], messages)
|
|
|
|
|
|
|
|
def test_flatten_error_messages_plural(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
|
|
|
error = pymisp.get(1)
|
2018-01-24 15:19:31 +01:00
|
|
|
self.assertIn("Authentication failed", error["message"])
|
2017-12-11 14:00:43 +01:00
|
|
|
response = self.auth_error_msg
|
|
|
|
response['errors'] = {'foo': 42, 'bar': False, 'baz': ['oo', 'ka']}
|
|
|
|
messages = pymisp.flatten_error_messages(response)
|
2017-12-11 14:18:06 +01:00
|
|
|
self.assertEqual(set(['42 (foo)', 'False (bar)', 'oo', 'ka']), set(messages))
|
2017-12-11 14:00:43 +01:00
|
|
|
|
|
|
|
def test_flatten_error_messages_nested(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
|
|
|
error = pymisp.get(1)
|
2018-01-24 15:19:31 +01:00
|
|
|
self.assertIn("Authentication failed", error["message"])
|
2017-12-11 14:00:43 +01:00
|
|
|
response = self.auth_error_msg
|
|
|
|
response['errors'] = {
|
|
|
|
'fo': {'o': 42}, 'ba': {'r': True}, 'b': {'a': ['z']}, 'd': {'e': {'e': ['p']}}}
|
|
|
|
messages = pymisp.flatten_error_messages(response)
|
2017-12-11 14:18:06 +01:00
|
|
|
self.assertEqual(set(['Error in o: 42', 'Error in r: True', 'Error in a: z', "Error in e: {'e': ['p']}"]), set(messages))
|
2017-12-11 14:00:43 +01:00
|
|
|
|
|
|
|
def test_test_connection(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
|
|
|
self.assertTrue(pymisp.test_connection())
|
|
|
|
|
2017-12-11 15:27:50 +01:00
|
|
|
def test_change_toids(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
|
|
|
self.assertEqual({}, pymisp.change_toids(self.key, 1))
|
|
|
|
|
|
|
|
def test_change_toids_invalid(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
|
|
|
try:
|
2018-09-17 22:59:25 +02:00
|
|
|
pymisp.change_toids(self.key, 42)
|
2017-12-11 15:27:50 +01:00
|
|
|
self.assertFalse('Exception required for off domain value')
|
|
|
|
except Exception:
|
|
|
|
pass
|
|
|
|
|
|
|
|
def test_proposal_view_default(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
|
|
|
self.assertEqual({}, pymisp.proposal_view())
|
|
|
|
|
|
|
|
def test_proposal_view_event_1(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
|
|
|
self.assertEqual({}, pymisp.proposal_view(event_id=1))
|
|
|
|
|
|
|
|
def test_proposal_view_event_overdetermined(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
|
|
|
self.assertTrue(pymisp.proposal_view(event_id=1, proposal_id=42).get('error') is not None)
|
|
|
|
|
|
|
|
def test_freetext(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
|
|
|
self.assertEqual({}, pymisp.freetext(1, 'foo', adhereToWarninglists=True, distribution=42))
|
|
|
|
|
|
|
|
def test_freetext_offdomain(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
|
|
|
try:
|
2018-09-17 22:59:25 +02:00
|
|
|
pymisp.freetext(1, None, adhereToWarninglists='hard')
|
2017-12-11 15:27:50 +01:00
|
|
|
self.assertFalse('Exception required for off domain value')
|
|
|
|
except Exception:
|
|
|
|
pass
|
|
|
|
|
|
|
|
def test_get_yara(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
|
|
|
self.assertEqual((False, None), pymisp.get_yara(1))
|
|
|
|
|
|
|
|
def test_download_samples(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
|
|
|
self.assertEqual((False, None), pymisp.download_samples())
|
|
|
|
|
2018-01-24 15:31:09 +01:00
|
|
|
def test_sample_upload(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
2018-09-17 22:59:25 +02:00
|
|
|
pymisp.upload_sample("tmux", "tests/viper-test-files/test_files/tmux", 1)
|
|
|
|
pymisp.upload_sample("tmux", "non_existing_file", 1)
|
|
|
|
pymisp.upload_sample("tmux", b"binblob", 1)
|
2018-01-24 15:31:09 +01:00
|
|
|
|
2017-12-11 15:27:50 +01:00
|
|
|
def test_get_all_tags(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
|
|
|
self.assertEqual({'Tag': 'foo'}, pymisp.get_all_tags())
|
|
|
|
|
2018-01-24 16:19:53 +01:00
|
|
|
def test_tag_event(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
|
|
|
uuid = self.event["Event"]["uuid"]
|
|
|
|
pymisp.tag(uuid, "foo")
|
|
|
|
|
|
|
|
self.assertRaises(pm.PyMISPError, pymisp.tag, "test_uuid", "foo")
|
|
|
|
self.assertRaises(pm.PyMISPError, pymisp.tag, uuid.replace("a", "z"), "foo")
|
|
|
|
|
|
|
|
def test_untag_event(self, m):
|
|
|
|
self.initURI(m)
|
|
|
|
pymisp = PyMISP(self.domain, self.key)
|
|
|
|
uuid = self.event["Event"]["uuid"]
|
|
|
|
pymisp.untag(uuid, "foo")
|
2017-12-01 10:36:09 +01:00
|
|
|
|
2018-01-28 20:40:09 +01:00
|
|
|
|
2016-08-19 10:13:00 +02:00
|
|
|
if __name__ == '__main__':
|
|
|
|
unittest.main()
|