#!/usr/bin/env python # -*- coding: utf-8 -*- import unittest import json import sys from io import BytesIO import glob from pymisp import MISPEvent, MISPSighting, MISPTag from pymisp.exceptions import InvalidMISPObject class TestMISPEvent(unittest.TestCase): def setUp(self): self.maxDiff = None self.mispevent = MISPEvent() def init_event(self): self.mispevent.info = 'This is a test' self.mispevent.distribution = 1 self.mispevent.threat_level_id = 1 self.mispevent.analysis = 1 self.mispevent.set_date("2017-12-31") # test the set date method def test_simple(self): with open('tests/mispevent_testfiles/simple.json', 'r') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) def test_event(self): self.init_event() self.mispevent.publish() with open('tests/mispevent_testfiles/event.json', 'r') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) def test_loadfile(self): self.mispevent.load_file('tests/mispevent_testfiles/event.json') with open('tests/mispevent_testfiles/event.json', 'r') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) def test_event_tag(self): self.init_event() self.mispevent.add_tag('bar') self.mispevent.add_tag(name='baz') new_tag = MISPTag() new_tag.from_dict(name='foo') self.mispevent.add_tag(new_tag) with open('tests/mispevent_testfiles/event_tags.json', 'r') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) def test_attribute(self): self.init_event() a = self.mispevent.add_attribute('filename', 'bar.exe') del a.uuid a = self.mispevent.add_attribute_tag('osint', 'bar.exe') attr_tags = self.mispevent.get_attribute_tag('bar.exe') self.assertEqual(self.mispevent.attributes[0].tags[0].name, 'osint') self.assertEqual(attr_tags[0].name, 'osint') with open('tests/mispevent_testfiles/attribute.json', 'r') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) # Fake setting an attribute ID for testing self.mispevent.attributes[0].id = 42 self.mispevent.delete_attribute(42) with open('tests/mispevent_testfiles/attribute_del.json', 'r') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) def test_object_tag(self): self.mispevent.add_object(name='file', strict=True) a = self.mispevent.objects[0].add_attribute('filename', value='') self.assertEqual(a, None) a = self.mispevent.objects[0].add_attribute('filename', value=None) self.assertEqual(a, None) a = self.mispevent.objects[0].add_attribute('filename', value='bar', Tag=[{'name': 'blah'}]) del a.uuid self.assertEqual(self.mispevent.objects[0].attributes[0].tags[0].name, 'blah') self.assertTrue(self.mispevent.objects[0].has_attributes_by_relation(['filename'])) self.assertEqual(len(self.mispevent.objects[0].get_attributes_by_relation('filename')), 1) self.mispevent.add_object(name='url', strict=True) a = self.mispevent.objects[1].add_attribute('url', value='https://www.circl.lu') del a.uuid self.mispevent.objects[0].uuid = 'a' self.mispevent.objects[1].uuid = 'b' self.mispevent.objects[0].add_reference(self.mispevent.objects[1], 'baz', comment='foo') self.assertEqual(self.mispevent.objects[0].references[0].relationship_type, 'baz') with open('tests/mispevent_testfiles/event_obj_attr_tag.json', 'r') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) @unittest.skip("Not supported on MISP: https://github.com/MISP/MISP/issues/2638 - https://github.com/MISP/PyMISP/issues/168") def test_object_level_tag(self): self.mispevent.add_object(name='file', strict=True) self.mispevent.objects[0].add_attribute('filename', value='bar') self.mispevent.objects[0].add_tag('osint') self.mispevent.objects[0].uuid = 'a' with open('tests/mispevent_testfiles/event_obj_tag.json', 'r') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) def test_malware(self): with open('tests/mispevent_testfiles/simple.json', 'rb') as f: pseudofile = BytesIO(f.read()) self.init_event() a = self.mispevent.add_attribute('malware-sample', 'bar.exe', data=pseudofile) del a.uuid attribute = self.mispevent.attributes[0] self.assertEqual(attribute.malware_binary, pseudofile) with open('tests/mispevent_testfiles/malware.json', 'r') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) def test_existing_malware(self): self.mispevent.load_file('tests/mispevent_testfiles/malware_exist.json') with open('tests/mispevent_testfiles/simple.json', 'rb') as f: pseudofile = BytesIO(f.read()) self.assertEqual( self.mispevent.objects[0].get_attributes_by_relation('malware-sample')[0].malware_binary.read(), pseudofile.read()) def test_sighting(self): sighting = MISPSighting() sighting.from_dict(value='1', type='bar', timestamp=11111111) with open('tests/mispevent_testfiles/sighting.json', 'r') as f: ref_json = json.load(f) self.assertEqual(sighting.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) def test_existing_event(self): self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') with open('tests/mispevent_testfiles/existing_event.json', 'r') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) def test_shadow_attributes_existing(self): self.mispevent.load_file('tests/mispevent_testfiles/shadow.json') with open('tests/mispevent_testfiles/shadow.json', 'r') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) @unittest.skip("Not supported on MISP.") def test_shadow_attributes(self): self.init_event() p = self.mispevent.add_proposal(type='filename', value='baz.jpg') del p.uuid a = self.mispevent.add_attribute('filename', 'bar.exe') del a.uuid p = self.mispevent.attributes[0].add_proposal(type='filename', value='bar.pdf') del p.uuid with open('tests/mispevent_testfiles/proposals.json', 'r') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) def test_default_attributes(self): self.mispevent.add_object(name='file', strict=True) a = self.mispevent.objects[0].add_attribute('filename', value='bar', Tag=[{'name': 'blah'}]) del a.uuid a = self.mispevent.objects[0].add_attribute('pattern-in-file', value='baz') self.assertEqual(a.category, 'Artifacts dropped') del a.uuid self.mispevent.add_object(name='file', strict=False, default_attributes_parameters=self.mispevent.objects[0].attributes[0]) a = self.mispevent.objects[1].add_attribute('filename', value='baz') del a.uuid self.mispevent.objects[0].uuid = 'a' self.mispevent.objects[1].uuid = 'b' with open('tests/mispevent_testfiles/event_obj_def_param.json', 'r') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) def test_obj_default_values(self): self.init_event() self.mispevent.add_object(name='whois', strict=True) a = self.mispevent.objects[0].add_attribute('registrar', value='registar.example.com') del a.uuid a = self.mispevent.objects[0].add_attribute('domain', value='domain.example.com') del a.uuid a = self.mispevent.objects[0].add_attribute('nameserver', value='ns1.example.com') del a.uuid a = self.mispevent.objects[0].add_attribute('nameserver', value='ns2.example.com', disable_correlation=False, to_ids=True, category='External analysis') del a.uuid self.mispevent.objects[0].uuid = 'a' with open('tests/mispevent_testfiles/def_param.json', 'r') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) def test_event_not_edited(self): self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.assertFalse(self.mispevent.edited) def test_event_edited(self): self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.mispevent.info = 'blah' self.assertTrue(self.mispevent.edited) def test_event_tag_edited(self): self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.assertFalse(self.mispevent.edited) self.mispevent.add_tag('foo') self.assertTrue(self.mispevent.edited) def test_event_attribute_edited(self): self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.mispevent.attributes[0].value = 'blah' self.assertTrue(self.mispevent.attributes[0].edited) self.assertFalse(self.mispevent.attributes[1].edited) self.assertTrue(self.mispevent.edited) def test_event_attribute_tag_edited(self): self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.assertFalse(self.mispevent.edited) self.mispevent.attributes[0].tags[0].name = 'blah' self.assertTrue(self.mispevent.attributes[0].tags[0].edited) self.assertFalse(self.mispevent.attributes[0].tags[1].edited) self.assertTrue(self.mispevent.attributes[0].edited) self.assertTrue(self.mispevent.edited) def test_event_attribute_tag_edited_second(self): self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.assertFalse(self.mispevent.edited) self.mispevent.attributes[0].add_tag(name='blah') self.assertTrue(self.mispevent.attributes[0].edited) self.assertTrue(self.mispevent.edited) def test_event_object_edited(self): self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.assertFalse(self.mispevent.edited) self.mispevent.objects[0].comment = 'blah' self.assertTrue(self.mispevent.objects[0].edited) self.assertFalse(self.mispevent.objects[1].edited) self.assertTrue(self.mispevent.edited) def test_event_object_attribute_edited(self): self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.assertFalse(self.mispevent.edited) self.mispevent.objects[0].attributes[0].comment = 'blah' self.assertTrue(self.mispevent.objects[0].attributes[0].edited) self.assertTrue(self.mispevent.objects[0].edited) self.assertTrue(self.mispevent.edited) def test_event_object_attribute_edited_tag(self): self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.assertFalse(self.mispevent.edited) self.mispevent.objects[0].attributes[0].add_tag('blah') self.assertTrue(self.mispevent.objects[0].attributes[0].edited) self.assertTrue(self.mispevent.objects[0].edited) self.assertTrue(self.mispevent.edited) with open('tests/mispevent_testfiles/existing_event_edited.json', 'r') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) def test_obj_by_id(self): self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') misp_obj = self.mispevent.get_object_by_id(1556) self.assertEqual(misp_obj.uuid, '5a3cd604-e11c-4de5-bbbf-c170950d210f') def test_userdefined_object(self): self.init_event() self.mispevent.add_object(name='test_object_template', strict=True, misp_objects_path_custom='tests/mispevent_testfiles') with self.assertRaises(InvalidMISPObject) as e: # Fail on required self.mispevent.to_json(sort_keys=True, indent=2) if sys.version_info >= (3, ): self.assertEqual(e.exception.message, '{\'member3\'} are required.') else: # Python2 bullshit self.assertEqual(e.exception.message, 'set([u\'member3\']) are required.') a = self.mispevent.objects[0].add_attribute('member3', value='foo') del a.uuid with self.assertRaises(InvalidMISPObject) as e: # Fail on requiredOneOf self.mispevent.to_json(sort_keys=True, indent=2) self.assertEqual(e.exception.message, 'At least one of the following attributes is required: member1, member2') a = self.mispevent.objects[0].add_attribute('member1', value='bar') del a.uuid a = self.mispevent.objects[0].add_attribute('member1', value='baz') del a.uuid with self.assertRaises(InvalidMISPObject) as e: # member1 is not a multiple self.mispevent.to_json(sort_keys=True, indent=2) self.assertEqual(e.exception.message, 'Multiple occurrences of member1 is not allowed') self.mispevent.objects[0].attributes = self.mispevent.objects[0].attributes[:2] self.mispevent.objects[0].uuid = 'a' with open('tests/mispevent_testfiles/misp_custom_obj.json', 'r') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) ''' # Reenable that the 1st of jan 2020. @unittest.skipIf(sys.version_info < (3, 6), 'Not supported on python < 3.6') def test_object_templates(self): me = MISPEvent() for template in glob.glob(str(me.misp_objects_path / '*' / 'definition.json')): with open(template) as f: t_json = json.load(f) if 'requiredOneOf' in t_json: obj_relations = set(t_json['attributes'].keys()) subset = set(t_json['requiredOneOf']).issubset(obj_relations) self.assertTrue(subset, f'{t_json["name"]}') if 'required' in t_json: obj_relations = set(t_json['attributes'].keys()) subset = set(t_json['required']).issubset(obj_relations) self.assertTrue(subset, f'{t_json["name"]}') for obj_relation, entry in t_json['attributes'].items(): self.assertTrue(entry['misp-attribute'] in me.describe_types['types']) if 'categories' in entry: subset = set(entry['categories']).issubset(me.describe_types['categories']) self.assertTrue(subset, f'{t_json["name"]} - {obj_relation}') ''' if __name__ == '__main__': unittest.main()