mirror of https://github.com/MISP/PyMISP
Merge branch 'neok0-add-objects-template-file-path'
commit
5520cf1261
|
@ -10,7 +10,6 @@ from zipfile import ZipFile
|
||||||
import hashlib
|
import hashlib
|
||||||
import sys
|
import sys
|
||||||
import uuid
|
import uuid
|
||||||
from collections import Counter
|
|
||||||
|
|
||||||
from . import deprecated
|
from . import deprecated
|
||||||
from .abstract import AbstractMISP
|
from .abstract import AbstractMISP
|
||||||
|
@ -810,14 +809,22 @@ class MISPObject(AbstractMISP):
|
||||||
In this case the ObjectReference needs to be pushed manually and cannot be in the JSON dump.
|
In this case the ObjectReference needs to be pushed manually and cannot be in the JSON dump.
|
||||||
|
|
||||||
:default_attributes_parameters: Used as template for the attributes if they are not overwritten in add_attribute
|
:default_attributes_parameters: Used as template for the attributes if they are not overwritten in add_attribute
|
||||||
|
|
||||||
|
:misp_objects_path_custom: Path to custom object templates
|
||||||
'''
|
'''
|
||||||
super(MISPObject, self).__init__(**kwargs)
|
super(MISPObject, self).__init__(**kwargs)
|
||||||
self.__strict = strict
|
self.__strict = strict
|
||||||
self.name = name
|
self.name = name
|
||||||
self.__misp_objects_path = os.path.join(
|
misp_objects_path = os.path.join(
|
||||||
os.path.abspath(os.path.dirname(sys.modules['pymisp'].__file__)),
|
os.path.abspath(os.path.dirname(sys.modules['pymisp'].__file__)),
|
||||||
'data', 'misp-objects', 'objects')
|
'data', 'misp-objects', 'objects')
|
||||||
if os.path.exists(os.path.join(self.__misp_objects_path, self.name, 'definition.json')):
|
misp_objects_path_custom = kwargs.get('misp_objects_path_custom')
|
||||||
|
if misp_objects_path_custom and os.path.exists(os.path.join(misp_objects_path_custom, self.name, 'definition.json')):
|
||||||
|
# Use the local object path by default if provided (allows to overwrite a default template)
|
||||||
|
template_path = os.path.join(misp_objects_path_custom, self.name, 'definition.json')
|
||||||
|
self.__known_template = True
|
||||||
|
elif os.path.exists(os.path.join(misp_objects_path, self.name, 'definition.json')):
|
||||||
|
template_path = os.path.join(misp_objects_path, self.name, 'definition.json')
|
||||||
self.__known_template = True
|
self.__known_template = True
|
||||||
else:
|
else:
|
||||||
if self.__strict:
|
if self.__strict:
|
||||||
|
@ -825,7 +832,7 @@ class MISPObject(AbstractMISP):
|
||||||
else:
|
else:
|
||||||
self.__known_template = False
|
self.__known_template = False
|
||||||
if self.__known_template:
|
if self.__known_template:
|
||||||
with open(os.path.join(self.__misp_objects_path, self.name, 'definition.json'), 'r') as f:
|
with open(template_path, 'r') as f:
|
||||||
self.__definition = json.load(f)
|
self.__definition = json.load(f)
|
||||||
setattr(self, 'meta-category', self.__definition['meta-category'])
|
setattr(self, 'meta-category', self.__definition['meta-category'])
|
||||||
self.template_uuid = self.__definition['uuid']
|
self.template_uuid = self.__definition['uuid']
|
||||||
|
@ -959,23 +966,21 @@ class MISPObject(AbstractMISP):
|
||||||
|
|
||||||
def _validate(self):
|
def _validate(self):
|
||||||
"""Make sure the object we're creating has the required fields"""
|
"""Make sure the object we're creating has the required fields"""
|
||||||
all_object_relations = []
|
|
||||||
for a in self.attributes:
|
|
||||||
all_object_relations.append(a.object_relation)
|
|
||||||
count_relations = dict(Counter(all_object_relations))
|
|
||||||
for key, counter in count_relations.items():
|
|
||||||
if counter == 1:
|
|
||||||
continue
|
|
||||||
if not self.__definition['attributes'][key].get('multiple'):
|
|
||||||
raise InvalidMISPObject('Multiple occurrences of {} is not allowed'.format(key))
|
|
||||||
all_attribute_names = set(count_relations.keys())
|
|
||||||
if self.__definition.get('requiredOneOf'):
|
|
||||||
if not set(self.__definition['requiredOneOf']) & all_attribute_names:
|
|
||||||
raise InvalidMISPObject('At least one of the following attributes is required: {}'.format(', '.join(self.__definition['requiredOneOf'])))
|
|
||||||
if self.__definition.get('required'):
|
if self.__definition.get('required'):
|
||||||
for r in self.__definition.get('required'):
|
required_missing = set(self.__definition.get('required')) - set(self.__fast_attribute_access.keys())
|
||||||
if r not in all_attribute_names:
|
if required_missing:
|
||||||
raise InvalidMISPObject('{} is required'.format(r))
|
raise InvalidMISPObject('{} are required.'.format(required_missing))
|
||||||
|
if self.__definition.get('requiredOneOf'):
|
||||||
|
if not set(self.__definition['requiredOneOf']) & set(self.__fast_attribute_access.keys()):
|
||||||
|
# We ecpect at least one of the object_relation in requiredOneOf, and it isn't the case
|
||||||
|
raise InvalidMISPObject('At least one of the following attributes is required: {}'.format(', '.join(self.__definition['requiredOneOf'])))
|
||||||
|
for rel, attrs in self.__fast_attribute_access.items():
|
||||||
|
if len(attrs) == 1:
|
||||||
|
# object_relation's here only once, everything's cool, moving on
|
||||||
|
continue
|
||||||
|
if not self.__definition['attributes'][rel].get('multiple'):
|
||||||
|
# object_relation's here more than once, but it isn't allowed in the template.
|
||||||
|
raise InvalidMISPObject('Multiple occurrences of {} is not allowed'.format(rel))
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
{
|
||||||
|
"requiredOneOf": [
|
||||||
|
"member1",
|
||||||
|
"member2"
|
||||||
|
],
|
||||||
|
"required": [
|
||||||
|
"member3"
|
||||||
|
],
|
||||||
|
"attributes": {
|
||||||
|
"member1": {
|
||||||
|
"description": "FirstMember",
|
||||||
|
"misp-attribute": "text"
|
||||||
|
},
|
||||||
|
"member2": {
|
||||||
|
"description": "SecondMember",
|
||||||
|
"misp-attribute": "text",
|
||||||
|
"multiple": true
|
||||||
|
},
|
||||||
|
"member3": {
|
||||||
|
"description": "Thirdmember",
|
||||||
|
"misp-attribute": "text"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"version": 1,
|
||||||
|
"description": "TestTemplate.",
|
||||||
|
"meta-category": "file",
|
||||||
|
"uuid": "4ec55cc6-9e49-4c64-b794-03c25c1a6589",
|
||||||
|
"name": "test_object_template"
|
||||||
|
}
|
|
@ -6,6 +6,7 @@ import json
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
|
||||||
from pymisp import MISPEvent, MISPSighting, MISPTag
|
from pymisp import MISPEvent, MISPSighting, MISPTag
|
||||||
|
from pymisp.exceptions import InvalidMISPObject
|
||||||
|
|
||||||
|
|
||||||
class TestMISPEvent(unittest.TestCase):
|
class TestMISPEvent(unittest.TestCase):
|
||||||
|
@ -233,6 +234,27 @@ class TestMISPEvent(unittest.TestCase):
|
||||||
misp_obj = self.mispevent.get_object_by_id(1556)
|
misp_obj = self.mispevent.get_object_by_id(1556)
|
||||||
self.assertEqual(misp_obj.uuid, '5a3cd604-e11c-4de5-bbbf-c170950d210f')
|
self.assertEqual(misp_obj.uuid, '5a3cd604-e11c-4de5-bbbf-c170950d210f')
|
||||||
|
|
||||||
|
def test_userdefined_object(self):
|
||||||
|
self.init_event()
|
||||||
|
self.mispevent.add_object(name='test_object_template', strict=True, misp_objects_path_custom='tests/mispevent_testfiles')
|
||||||
|
with self.assertRaises(InvalidMISPObject) as e:
|
||||||
|
# Fail on required
|
||||||
|
self.mispevent.to_json()
|
||||||
|
self.assertEqual(e.exception.message, '{\'member3\'} are required.')
|
||||||
|
|
||||||
|
self.mispevent.objects[0].add_attribute('member3', value='foo')
|
||||||
|
with self.assertRaises(InvalidMISPObject) as e:
|
||||||
|
# Fail on requiredOneOf
|
||||||
|
self.mispevent.to_json()
|
||||||
|
self.assertEqual(e.exception.message, 'At least one of the following attributes is required: member1, member2')
|
||||||
|
|
||||||
|
self.mispevent.objects[0].add_attribute('member1', value='bar')
|
||||||
|
self.mispevent.objects[0].add_attribute('member1', value='baz')
|
||||||
|
with self.assertRaises(InvalidMISPObject) as e:
|
||||||
|
# member1 is not a multiple
|
||||||
|
self.mispevent.to_json()
|
||||||
|
self.assertEqual(e.exception.message, 'Multiple occurrences of member1 is not allowed')
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
Loading…
Reference in New Issue