From 2b1546ab43a9e1ceabf7a9394293d9b32b4026ee Mon Sep 17 00:00:00 2001 From: Louis LCE Date: Wed, 24 Jan 2018 15:19:31 +0100 Subject: [PATCH 1/4] Improve and refactor attributes tests --- .travis.yml | 2 +- tests/test_offline.py | 119 +++++++++++++++++++++++++++++------------- 2 files changed, 85 insertions(+), 36 deletions(-) diff --git a/.travis.yml b/.travis.yml index 0bd5639..852e7b6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,7 +26,7 @@ install: - popd script: - - nosetests --with-coverage --cover-package=pymisp tests/test_*.py + - nosetests --with-coverage --cover-package=pymisp,tests --cover-tests tests/test_*.py after_success: - codecov diff --git a/tests/test_offline.py b/tests/test_offline.py index 15f99b0..53c1705 100644 --- a/tests/test_offline.py +++ b/tests/test_offline.py @@ -17,6 +17,11 @@ from pymisp import MISPEncode from pymisp.tools import make_binary_objects +class MockPyMISP(PyMISP): + def _send_attributes(self, event, attributes, proposal=False): + return attributes + + @requests_mock.Mocker() class TestOffline(unittest.TestCase): @@ -114,8 +119,14 @@ class TestOffline(unittest.TestCase): self.assertEqual(error, response) def test_newEvent(self, m): - error_empty_info = {'message': 'The event could not be saved.', 'name': 'Add event failed.', 'errors': ['Error in info: Info cannot be empty.'], 'url': '/events/add'} - error_empty_info_flatten = {u'message': u'The event could not be saved.', u'name': u'Add event failed.', u'errors': [u"Error in info: Info cannot be empty."], u'url': u'/events/add'} + error_empty_info = {'message': 'The event could not be saved.', + 'name': 'Add event failed.', + 'errors': ['Error in info: Info cannot be empty.'], + 'url': '/events/add'} + error_empty_info_flatten = {u'message': u'The event could not be saved.', + u'name': u'Add event failed.', + u'errors': [u"Error in info: Info cannot be empty."], + u'url': u'/events/add'} self.initURI(m) pymisp = PyMISP(self.domain, self.key) m.register_uri('POST', self.domain + 'events', json=error_empty_info) @@ -146,59 +157,95 @@ class TestOffline(unittest.TestCase): response = pymisp.search_index(tag='ecsirt:malicious-code="ransomware"') self.assertEqual(response['response'], self.search_index_result) - def test_addAttributes(self, m): - class MockPyMISP(PyMISP): - def _send_attributes(self, event, attributes, proposal=False): - return len(attributes) - self.initURI(m) - p = MockPyMISP(self.domain, self.key) - evt = p.get(1) - self.assertEqual(3, p.add_hashes(evt, md5='68b329da9893e34099c7d8ad5cb9c940', + def add_hashes(self, event, mock): + """ + Regression tests for #174 + """ + hashes_fname = mock.add_hashes(event, + md5='68b329da9893e34099c7d8ad5cb9c940', sha1='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', sha256='01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b', - filename='foobar.exe')) - self.assertEqual(3, p.add_hashes(evt, md5='68b329da9893e34099c7d8ad5cb9c940', + filename='foobar.exe') + self.assertEqual(3, len(hashes_fname)) + for attr in hashes_fname: + self.assertTrue(isinstance(attr, pm.mispevent.MISPAttribute)) + self.assertIn("filename|", attr["type"]) + + hashes_only = mock.add_hashes(event, md5='68b329da9893e34099c7d8ad5cb9c940', sha1='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', - sha256='01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b')) - p.av_detection_link(evt, 'https://foocorp.com') - p.add_detection_name(evt, 'WATERMELON') - p.add_filename(evt, 'foobar.exe') - p.add_regkey(evt, 'HKLM\\Software\\Microsoft\\Outlook\\Addins\\foobar') - p.add_regkey(evt, 'HKLM\\Software\\Microsoft\\Outlook\\Addins\\foobar', rvalue='foobar') + sha256='01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b') + self.assertEqual(3, len(hashes_only)) + for attr in hashes_only: + self.assertTrue(isinstance(attr, pm.mispevent.MISPAttribute)) + self.assertNotIn("filename|", attr["type"]) + + def add_regkeys(self, event, mock): regkeys = { 'HKLM\\Software\\Microsoft\\Outlook\\Addins\\foo': None, 'HKLM\\Software\\Microsoft\\Outlook\\Addins\\bar': 'baz', 'HKLM\\Software\\Microsoft\\Outlook\\Addins\\bae': 0, } - self.assertEqual(3, p.add_regkeys(evt, regkeys)) + reg_attr = mock.add_regkeys(event, regkeys) + self.assertEqual(3, len(reg_attr)) + for attr in reg_attr: + self.assertTrue(isinstance(attr, pm.mispevent.MISPAttribute)) + self.assertIn("regkey", attr["type"]) + + key = mock.add_regkey(event, 'HKLM\\Software\\Microsoft\\Outlook\\Addins\\foobar') + self.assertEqual(len(key), 1) + self.assertEqual(key[0]["type"], "regkey") + + key = mock.add_regkey(event, 'HKLM\\Software\\Microsoft\\Outlook\\Addins\\foobar', rvalue='foobar') + self.assertEqual(len(key), 1) + self.assertEqual(key[0]["type"], "regkey|value") + self.assertIn("foobar|foobar", key[0]["value"]) + + + def test_addAttributes(self, m): + self.initURI(m) + p = MockPyMISP(self.domain, self.key) + evt = p.get(1) + + self.add_hashes(evt, p) + self.add_regkeys(evt, p) + + p.av_detection_link(evt, 'https://foocorp.com') + p.add_detection_name(evt, 'WATERMELON') + p.add_filename(evt, 'foobar.exe') p.add_pattern(evt, '.*foobar.*', in_memory=True) p.add_pattern(evt, '.*foobar.*', in_file=True) - self.assertRaises(pm.PyMISPError, p.add_pattern, evt, '.*foobar.*', in_memory=False, in_file=False) + p.add_mutex(evt, 'foo') p.add_pipe(evt, 'foo') p.add_pipe(evt, '\\.\\pipe\\foo') - self.assertEqual(3, p.add_pipe(evt, ['foo', 'bar', 'baz'])) - self.assertEqual(3, p.add_pipe(evt, ['foo', 'bar', '\\.\\pipe\\baz'])) - p.add_mutex(evt, 'foo') - self.assertEqual(1, p.add_mutex(evt, '\\BaseNamedObjects\\foo')) - self.assertEqual(3, p.add_mutex(evt, ['foo', 'bar', 'baz'])) - self.assertEqual(3, p.add_mutex(evt, ['foo', 'bar', '\\BaseNamedObjects\\baz'])) + + self.assertRaises(pm.PyMISPError, p.add_pattern, evt, '.*foobar.*', in_memory=False, in_file=False) + + self.assertEqual(3, len(p.add_pipe(evt, ['foo', 'bar', 'baz']))) + self.assertEqual(3, len(p.add_pipe(evt, ['foo', 'bar', '\\.\\pipe\\baz']))) + + self.assertEqual(1, len(p.add_mutex(evt, '\\BaseNamedObjects\\foo'))) + self.assertEqual(3, len(p.add_mutex(evt, ['foo', 'bar', 'baz']))) + self.assertEqual(3, len(p.add_mutex(evt, ['foo', 'bar', '\\BaseNamedObjects\\baz']))) p.add_yara(evt, 'rule Foo {}') - self.assertEqual(2, p.add_yara(evt, ['rule Foo {}', 'rule Bar {}'])) + self.assertEqual(2, len(p.add_yara(evt, ['rule Foo {}', 'rule Bar {}']))) p.add_ipdst(evt, '1.2.3.4') - self.assertEqual(2, p.add_ipdst(evt, ['1.2.3.4', '5.6.7.8'])) + self.assertEqual(2, len(p.add_ipdst(evt, ['1.2.3.4', '5.6.7.8']))) p.add_ipsrc(evt, '1.2.3.4') - self.assertEqual(2, p.add_ipsrc(evt, ['1.2.3.4', '5.6.7.8'])) + self.assertEqual(2, len(p.add_ipsrc(evt, ['1.2.3.4', '5.6.7.8']))) p.add_hostname(evt, 'a.foobar.com') - self.assertEqual(2, p.add_hostname(evt, ['a.foobar.com', 'a.foobaz.com'])) + self.assertEqual(2, len(p.add_hostname(evt, ['a.foobar.com', 'a.foobaz.com']))) p.add_domain(evt, 'foobar.com') - self.assertEqual(2, p.add_domain(evt, ['foobar.com', 'foobaz.com'])) + self.assertEqual(2, len(p.add_domain(evt, ['foobar.com', 'foobaz.com']))) p.add_domain_ip(evt, 'foo.com', '1.2.3.4') - self.assertEqual(2, p.add_domain_ip(evt, 'foo.com', ['1.2.3.4', '5.6.7.8'])) - self.assertEqual(2, p.add_domains_ips(evt, {'foo.com': '1.2.3.4', 'bar.com': '4.5.6.7'})) + self.assertEqual(2, len(p.add_domain_ip(evt, 'foo.com', ['1.2.3.4', '5.6.7.8']))) + self.assertEqual(2, len(p.add_domains_ips(evt, {'foo.com': '1.2.3.4', 'bar.com': '4.5.6.7'}))) + p.add_url(evt, 'https://example.com') - self.assertEqual(2, p.add_url(evt, ['https://example.com', 'http://foo.com'])) + self.assertEqual(2, len(p.add_url(evt, ['https://example.com', 'http://foo.com']))) + p.add_useragent(evt, 'Mozilla') - self.assertEqual(2, p.add_useragent(evt, ['Mozilla', 'Godzilla'])) + self.assertEqual(2, len(p.add_useragent(evt, ['Mozilla', 'Godzilla']))) + p.add_traffic_pattern(evt, 'blabla') p.add_snort(evt, 'blaba') p.add_net_other(evt, 'blabla') @@ -317,6 +364,7 @@ class TestOffline(unittest.TestCase): self.initURI(m) pymisp = PyMISP(self.domain, self.key) error = pymisp.get(1) + self.assertIn("Authentication failed", error["message"]) response = self.auth_error_msg response['errors'] = {'foo': 42, 'bar': False, 'baz': ['oo', 'ka']} messages = pymisp.flatten_error_messages(response) @@ -326,6 +374,7 @@ class TestOffline(unittest.TestCase): self.initURI(m) pymisp = PyMISP(self.domain, self.key) error = pymisp.get(1) + self.assertIn("Authentication failed", error["message"]) response = self.auth_error_msg response['errors'] = { 'fo': {'o': 42}, 'ba': {'r': True}, 'b': {'a': ['z']}, 'd': {'e': {'e': ['p']}}} From 7b6276fbf8def3c10bda4a5bd1ae61844248d746 Mon Sep 17 00:00:00 2001 From: Louis LCE Date: Wed, 24 Jan 2018 15:21:08 +0100 Subject: [PATCH 2/4] Add warning when failing to import dependencies --- pymisp/mispevent.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index eae001e..26a98fc 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -43,11 +43,13 @@ if six.PY2: try: from dateutil.parser import parse except ImportError: + logger.exception("Cannot import dateutil") pass try: import jsonschema except ImportError: + logger.exception("Cannot import jsonschema") pass try: From c80bef0e0d49f8a47834ecdc4c2b3681a23975a9 Mon Sep 17 00:00:00 2001 From: Louis LCE Date: Wed, 24 Jan 2018 15:31:09 +0100 Subject: [PATCH 3/4] Add a simple test for uploading samples --- tests/test_offline.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/test_offline.py b/tests/test_offline.py index 53c1705..6a03b2f 100644 --- a/tests/test_offline.py +++ b/tests/test_offline.py @@ -63,6 +63,7 @@ class TestOffline(unittest.TestCase): m.register_uri('POST', self.domain + 'attributes/restSearch', json={}) m.register_uri('POST', self.domain + 'attributes/downloadSample', json={}) m.register_uri('GET', self.domain + 'tags', json={'Tag': 'foo'}) + m.register_uri('POST', self.domain + 'events/upload_sample/1', json={}) def test_getEvent(self, m): self.initURI(m) @@ -439,6 +440,13 @@ class TestOffline(unittest.TestCase): pymisp = PyMISP(self.domain, self.key) self.assertEqual((False, None), pymisp.download_samples()) + + def test_sample_upload(self, m): + self.initURI(m) + pymisp = PyMISP(self.domain, self.key) + upload = pymisp.upload_sample("tmux", "tests/viper-test-files/test_files/tmux" , 1) + + def test_get_all_tags(self, m): self.initURI(m) pymisp = PyMISP(self.domain, self.key) From 3d6fa51a7aa8a3cf7847b735211107096bc0f535 Mon Sep 17 00:00:00 2001 From: Louis LCE Date: Wed, 24 Jan 2018 16:19:53 +0100 Subject: [PATCH 4/4] Add tag test --- tests/test_offline.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/tests/test_offline.py b/tests/test_offline.py index 6a03b2f..acfe800 100644 --- a/tests/test_offline.py +++ b/tests/test_offline.py @@ -64,6 +64,8 @@ class TestOffline(unittest.TestCase): m.register_uri('POST', self.domain + 'attributes/downloadSample', json={}) m.register_uri('GET', self.domain + 'tags', json={'Tag': 'foo'}) m.register_uri('POST', self.domain + 'events/upload_sample/1', json={}) + m.register_uri('POST', self.domain + 'tags/attachTagToObject', json={}) + m.register_uri('POST', self.domain + 'tags/removeTagFromObject', json={}) def test_getEvent(self, m): self.initURI(m) @@ -440,18 +442,30 @@ class TestOffline(unittest.TestCase): pymisp = PyMISP(self.domain, self.key) self.assertEqual((False, None), pymisp.download_samples()) - def test_sample_upload(self, m): self.initURI(m) pymisp = PyMISP(self.domain, self.key) upload = pymisp.upload_sample("tmux", "tests/viper-test-files/test_files/tmux" , 1) - def test_get_all_tags(self, m): self.initURI(m) pymisp = PyMISP(self.domain, self.key) self.assertEqual({'Tag': 'foo'}, pymisp.get_all_tags()) + def test_tag_event(self, m): + self.initURI(m) + pymisp = PyMISP(self.domain, self.key) + uuid = self.event["Event"]["uuid"] + pymisp.tag(uuid, "foo") + + self.assertRaises(pm.PyMISPError, pymisp.tag, "test_uuid", "foo") + self.assertRaises(pm.PyMISPError, pymisp.tag, uuid.replace("a", "z"), "foo") + + def test_untag_event(self, m): + self.initURI(m) + pymisp = PyMISP(self.domain, self.key) + uuid = self.event["Event"]["uuid"] + pymisp.untag(uuid, "foo") if __name__ == '__main__': unittest.main()