mirror of https://github.com/MISP/PyMISP
				
				
				
			chg: Allow to pass a directory with custom object templates
							parent
							
								
									dfac2e2a05
								
							
						
					
					
						commit
						8d9a5af8d3
					
				|  | @ -10,7 +10,6 @@ from zipfile import ZipFile | |||
| import hashlib | ||||
| import sys | ||||
| import uuid | ||||
| from collections import Counter | ||||
| 
 | ||||
| from . import deprecated | ||||
| from .abstract import AbstractMISP | ||||
|  | @ -810,17 +809,22 @@ class MISPObject(AbstractMISP): | |||
|             In this case the ObjectReference needs to be pushed manually and cannot be in the JSON dump. | ||||
| 
 | ||||
|         :default_attributes_parameters: Used as template for the attributes if they are not overwritten in add_attribute | ||||
| 
 | ||||
|         :misp_objects_path_custom: Path to custom object templates | ||||
|         ''' | ||||
|         super(MISPObject, self).__init__(**kwargs) | ||||
|         self.__strict = strict | ||||
|         self.name = name | ||||
|         if kwargs.get('misp_objects_path', None): | ||||
|             self.__misp_objects_path = kwargs.get('misp_objects_path', None) | ||||
|         else: | ||||
|             self.__misp_objects_path = os.path.join( | ||||
|                 os.path.abspath(os.path.dirname(sys.modules['pymisp'].__file__)), | ||||
|                 'data', 'misp-objects', 'objects') | ||||
|         if os.path.exists(os.path.join(self.__misp_objects_path, self.name, 'definition.json')): | ||||
|         misp_objects_path = os.path.join( | ||||
|             os.path.abspath(os.path.dirname(sys.modules['pymisp'].__file__)), | ||||
|             'data', 'misp-objects', 'objects') | ||||
|         misp_objects_path_custom = kwargs.get('misp_objects_path_custom') | ||||
|         if misp_objects_path_custom and os.path.exists(os.path.join(misp_objects_path_custom, self.name, 'definition.json')): | ||||
|             # Use the local object path by default if provided (allows to overwrite a default template) | ||||
|             template_path = os.path.join(misp_objects_path_custom, self.name, 'definition.json') | ||||
|             self.__known_template = True | ||||
|         elif os.path.exists(os.path.join(misp_objects_path, self.name, 'definition.json')): | ||||
|             template_path = os.path.join(misp_objects_path, self.name, 'definition.json') | ||||
|             self.__known_template = True | ||||
|         else: | ||||
|             if self.__strict: | ||||
|  | @ -828,7 +832,7 @@ class MISPObject(AbstractMISP): | |||
|             else: | ||||
|                 self.__known_template = False | ||||
|         if self.__known_template: | ||||
|             with open(os.path.join(self.__misp_objects_path, self.name, 'definition.json'), 'r') as f: | ||||
|             with open(template_path, 'r') as f: | ||||
|                 self.__definition = json.load(f) | ||||
|             setattr(self, 'meta-category', self.__definition['meta-category']) | ||||
|             self.template_uuid = self.__definition['uuid'] | ||||
|  | @ -962,23 +966,21 @@ class MISPObject(AbstractMISP): | |||
| 
 | ||||
|     def _validate(self): | ||||
|         """Make sure the object we're creating has the required fields""" | ||||
|         all_object_relations = [] | ||||
|         for a in self.attributes: | ||||
|             all_object_relations.append(a.object_relation) | ||||
|         count_relations = dict(Counter(all_object_relations)) | ||||
|         for key, counter in count_relations.items(): | ||||
|             if counter == 1: | ||||
|                 continue | ||||
|             if not self.__definition['attributes'][key].get('multiple'): | ||||
|                 raise InvalidMISPObject('Multiple occurrences of {} is not allowed'.format(key)) | ||||
|         all_attribute_names = set(count_relations.keys()) | ||||
|         if self.__definition.get('requiredOneOf'): | ||||
|             if not set(self.__definition['requiredOneOf']) & all_attribute_names: | ||||
|                 raise InvalidMISPObject('At least one of the following attributes is required: {}'.format(', '.join(self.__definition['requiredOneOf']))) | ||||
|         if self.__definition.get('required'): | ||||
|             for r in self.__definition.get('required'): | ||||
|                 if r not in all_attribute_names: | ||||
|                     raise InvalidMISPObject('{} is required'.format(r)) | ||||
|             required_missing = set(self.__definition.get('required')) - set(self.__fast_attribute_access.keys()) | ||||
|             if required_missing: | ||||
|                 raise InvalidMISPObject('{} are required.'.format(required_missing)) | ||||
|         if self.__definition.get('requiredOneOf'): | ||||
|             if not set(self.__definition['requiredOneOf']) & set(self.__fast_attribute_access.keys()): | ||||
|                 # We ecpect at least one of the object_relation in requiredOneOf, and it isn't the case | ||||
|                 raise InvalidMISPObject('At least one of the following attributes is required: {}'.format(', '.join(self.__definition['requiredOneOf']))) | ||||
|         for rel, attrs in self.__fast_attribute_access.items(): | ||||
|             if len(attrs) == 1: | ||||
|                 # object_relation's here only once, everything's cool, moving on | ||||
|                 continue | ||||
|             if not self.__definition['attributes'][rel].get('multiple'): | ||||
|                 # object_relation's here more than once, but it isn't allowed in the template. | ||||
|                 raise InvalidMISPObject('Multiple occurrences of {} is not allowed'.format(rel)) | ||||
|         return True | ||||
| 
 | ||||
|     def __repr__(self): | ||||
|  |  | |||
|  | @ -0,0 +1,29 @@ | |||
| { | ||||
|   "requiredOneOf": [ | ||||
|     "member1", | ||||
|     "member2" | ||||
|   ], | ||||
|   "required": [ | ||||
|       "member3" | ||||
|   ], | ||||
|   "attributes": { | ||||
|     "member1": { | ||||
|       "description": "FirstMember", | ||||
|       "misp-attribute": "text" | ||||
|     }, | ||||
|     "member2": { | ||||
|       "description": "SecondMember", | ||||
|       "misp-attribute": "text", | ||||
|       "multiple": true | ||||
|     }, | ||||
|     "member3": { | ||||
|       "description": "Thirdmember", | ||||
|       "misp-attribute": "text" | ||||
|     } | ||||
|   }, | ||||
|   "version": 1, | ||||
|   "description": "TestTemplate.", | ||||
|   "meta-category": "file", | ||||
|   "uuid": "4ec55cc6-9e49-4c64-b794-03c25c1a6589", | ||||
|   "name": "test_object_template" | ||||
| } | ||||
|  | @ -6,6 +6,7 @@ import json | |||
| from io import BytesIO | ||||
| 
 | ||||
| from pymisp import MISPEvent, MISPSighting, MISPTag | ||||
| from pymisp.exceptions import InvalidMISPObject | ||||
| 
 | ||||
| 
 | ||||
| class TestMISPEvent(unittest.TestCase): | ||||
|  | @ -233,6 +234,27 @@ class TestMISPEvent(unittest.TestCase): | |||
|         misp_obj = self.mispevent.get_object_by_id(1556) | ||||
|         self.assertEqual(misp_obj.uuid, '5a3cd604-e11c-4de5-bbbf-c170950d210f') | ||||
| 
 | ||||
|     def test_userdefined_object(self): | ||||
|         self.init_event() | ||||
|         self.mispevent.add_object(name='test_object_template', strict=True, misp_objects_path_custom='tests/mispevent_testfiles') | ||||
|         with self.assertRaises(InvalidMISPObject) as e: | ||||
|             # Fail on required | ||||
|             self.mispevent.to_json() | ||||
|         self.assertEqual(e.exception.message, '{\'member3\'} are required.') | ||||
| 
 | ||||
|         self.mispevent.objects[0].add_attribute('member3', value='foo') | ||||
|         with self.assertRaises(InvalidMISPObject) as e: | ||||
|             # Fail on requiredOneOf | ||||
|             self.mispevent.to_json() | ||||
|         self.assertEqual(e.exception.message, 'At least one of the following attributes is required: member1, member2') | ||||
| 
 | ||||
|         self.mispevent.objects[0].add_attribute('member1', value='bar') | ||||
|         self.mispevent.objects[0].add_attribute('member1', value='baz') | ||||
|         with self.assertRaises(InvalidMISPObject) as e: | ||||
|             # member1 is not a multiple | ||||
|             self.mispevent.to_json() | ||||
|         self.assertEqual(e.exception.message, 'Multiple occurrences of member1 is not allowed') | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|     unittest.main() | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Raphaël Vinot
						Raphaël Vinot