diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 1528e04..90f7cfb 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -10,7 +10,6 @@ from zipfile import ZipFile import hashlib import sys import uuid -from collections import Counter from . import deprecated from .abstract import AbstractMISP @@ -810,14 +809,22 @@ class MISPObject(AbstractMISP): In this case the ObjectReference needs to be pushed manually and cannot be in the JSON dump. :default_attributes_parameters: Used as template for the attributes if they are not overwritten in add_attribute + + :misp_objects_path_custom: Path to custom object templates ''' super(MISPObject, self).__init__(**kwargs) self.__strict = strict self.name = name - self.__misp_objects_path = os.path.join( + misp_objects_path = os.path.join( os.path.abspath(os.path.dirname(sys.modules['pymisp'].__file__)), 'data', 'misp-objects', 'objects') - if os.path.exists(os.path.join(self.__misp_objects_path, self.name, 'definition.json')): + misp_objects_path_custom = kwargs.get('misp_objects_path_custom') + if misp_objects_path_custom and os.path.exists(os.path.join(misp_objects_path_custom, self.name, 'definition.json')): + # Use the local object path by default if provided (allows to overwrite a default template) + template_path = os.path.join(misp_objects_path_custom, self.name, 'definition.json') + self.__known_template = True + elif os.path.exists(os.path.join(misp_objects_path, self.name, 'definition.json')): + template_path = os.path.join(misp_objects_path, self.name, 'definition.json') self.__known_template = True else: if self.__strict: @@ -825,7 +832,7 @@ class MISPObject(AbstractMISP): else: self.__known_template = False if self.__known_template: - with open(os.path.join(self.__misp_objects_path, self.name, 'definition.json'), 'r') as f: + with open(template_path, 'r') as f: self.__definition = json.load(f) setattr(self, 'meta-category', self.__definition['meta-category']) self.template_uuid = self.__definition['uuid'] @@ -959,23 +966,21 @@ class MISPObject(AbstractMISP): def _validate(self): """Make sure the object we're creating has the required fields""" - all_object_relations = [] - for a in self.attributes: - all_object_relations.append(a.object_relation) - count_relations = dict(Counter(all_object_relations)) - for key, counter in count_relations.items(): - if counter == 1: - continue - if not self.__definition['attributes'][key].get('multiple'): - raise InvalidMISPObject('Multiple occurrences of {} is not allowed'.format(key)) - all_attribute_names = set(count_relations.keys()) - if self.__definition.get('requiredOneOf'): - if not set(self.__definition['requiredOneOf']) & all_attribute_names: - raise InvalidMISPObject('At least one of the following attributes is required: {}'.format(', '.join(self.__definition['requiredOneOf']))) if self.__definition.get('required'): - for r in self.__definition.get('required'): - if r not in all_attribute_names: - raise InvalidMISPObject('{} is required'.format(r)) + required_missing = set(self.__definition.get('required')) - set(self.__fast_attribute_access.keys()) + if required_missing: + raise InvalidMISPObject('{} are required.'.format(required_missing)) + if self.__definition.get('requiredOneOf'): + if not set(self.__definition['requiredOneOf']) & set(self.__fast_attribute_access.keys()): + # We ecpect at least one of the object_relation in requiredOneOf, and it isn't the case + raise InvalidMISPObject('At least one of the following attributes is required: {}'.format(', '.join(self.__definition['requiredOneOf']))) + for rel, attrs in self.__fast_attribute_access.items(): + if len(attrs) == 1: + # object_relation's here only once, everything's cool, moving on + continue + if not self.__definition['attributes'][rel].get('multiple'): + # object_relation's here more than once, but it isn't allowed in the template. + raise InvalidMISPObject('Multiple occurrences of {} is not allowed'.format(rel)) return True def __repr__(self): diff --git a/tests/mispevent_testfiles/test_object_template/definition.json b/tests/mispevent_testfiles/test_object_template/definition.json new file mode 100644 index 0000000..283b9de --- /dev/null +++ b/tests/mispevent_testfiles/test_object_template/definition.json @@ -0,0 +1,29 @@ +{ + "requiredOneOf": [ + "member1", + "member2" + ], + "required": [ + "member3" + ], + "attributes": { + "member1": { + "description": "FirstMember", + "misp-attribute": "text" + }, + "member2": { + "description": "SecondMember", + "misp-attribute": "text", + "multiple": true + }, + "member3": { + "description": "Thirdmember", + "misp-attribute": "text" + } + }, + "version": 1, + "description": "TestTemplate.", + "meta-category": "file", + "uuid": "4ec55cc6-9e49-4c64-b794-03c25c1a6589", + "name": "test_object_template" +} diff --git a/tests/test_mispevent.py b/tests/test_mispevent.py index 3ffe461..0921de1 100644 --- a/tests/test_mispevent.py +++ b/tests/test_mispevent.py @@ -6,6 +6,7 @@ import json from io import BytesIO from pymisp import MISPEvent, MISPSighting, MISPTag +from pymisp.exceptions import InvalidMISPObject class TestMISPEvent(unittest.TestCase): @@ -233,6 +234,27 @@ class TestMISPEvent(unittest.TestCase): misp_obj = self.mispevent.get_object_by_id(1556) self.assertEqual(misp_obj.uuid, '5a3cd604-e11c-4de5-bbbf-c170950d210f') + def test_userdefined_object(self): + self.init_event() + self.mispevent.add_object(name='test_object_template', strict=True, misp_objects_path_custom='tests/mispevent_testfiles') + with self.assertRaises(InvalidMISPObject) as e: + # Fail on required + self.mispevent.to_json() + self.assertEqual(e.exception.message, '{\'member3\'} are required.') + + self.mispevent.objects[0].add_attribute('member3', value='foo') + with self.assertRaises(InvalidMISPObject) as e: + # Fail on requiredOneOf + self.mispevent.to_json() + self.assertEqual(e.exception.message, 'At least one of the following attributes is required: member1, member2') + + self.mispevent.objects[0].add_attribute('member1', value='bar') + self.mispevent.objects[0].add_attribute('member1', value='baz') + with self.assertRaises(InvalidMISPObject) as e: + # member1 is not a multiple + self.mispevent.to_json() + self.assertEqual(e.exception.message, 'Multiple occurrences of member1 is not allowed') + if __name__ == '__main__': unittest.main()