new: add_attributes method in MISPObject (for multiple attributes)

pull/382/head
Raphaël Vinot 2019-04-09 17:54:12 +02:00
parent b1b9f95501
commit 52402c2acf
3 changed files with 48 additions and 2 deletions

View File

@ -1097,9 +1097,11 @@ class MISPObject(AbstractMISP):
'''True if all the relations in the list are defined in the object''' '''True if all the relations in the list are defined in the object'''
return all(relation in self._fast_attribute_access for relation in list_of_relations) return all(relation in self._fast_attribute_access for relation in list_of_relations)
def add_attribute(self, object_relation, **value): def add_attribute(self, object_relation, simple_value=None, **value):
"""Add an attribute. object_relation is required and the value key is a """Add an attribute. object_relation is required and the value key is a
dictionary with all the keys supported by MISPAttribute""" dictionary with all the keys supported by MISPAttribute"""
if simple_value:
value = {'value': simple_value}
if value.get('value') is None: if value.get('value') is None:
return None return None
if self._known_template: if self._known_template:
@ -1118,6 +1120,20 @@ class MISPObject(AbstractMISP):
self.edited = True self.edited = True
return attribute return attribute
def add_attributes(self, object_relation, *attributes):
'''Add multiple attributes with the same object_relation.
Helper for object_relation when multiple is True in the template.
It is the same as calling multiple times add_attribute with the same object_relation.
'''
to_return = []
for attribute in attributes:
if isinstance(attribute, dict):
a = self.add_attribute(object_relation, **attribute)
else:
a = self.add_attribute(object_relation, value=attribute)
to_return.append(a)
return to_return
def to_dict(self, strict=False): def to_dict(self, strict=False):
if strict or self._strict and self._known_template: if strict or self._strict and self._known_template:
self._validate() self._validate()

View File

@ -47,6 +47,8 @@ class AbstractMISPObjectGenerator(MISPObject):
continue continue
if isinstance(value, dict): if isinstance(value, dict):
self.add_attribute(object_relation, **value) self.add_attribute(object_relation, **value)
elif isinstance(value, list):
self.add_attributes(object_relation, *value)
else: else:
# Assume it is the value only # Assume it is the value only
self.add_attribute(object_relation, value=value) self.add_attribute(object_relation, value=value)

View File

@ -21,7 +21,7 @@ logging.disable(logging.CRITICAL)
try: try:
from pymisp import ExpandedPyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject from pymisp import ExpandedPyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject
from pymisp.tools import CSVLoader from pymisp.tools import CSVLoader, DomainIPObject
except ImportError: except ImportError:
if sys.version_info < (3, 6): if sys.version_info < (3, 6):
print('This test suite requires Python 3.6+, breaking.') print('This test suite requires Python 3.6+, breaking.')
@ -918,13 +918,41 @@ class TestComprehensive(unittest.TestCase):
# Update with full event # Update with full event
first = self.user_misp_connector.add_event(first) first = self.user_misp_connector.add_event(first)
first.objects[0].add_attribute('ip', value='8.9.9.8') first.objects[0].add_attribute('ip', value='8.9.9.8')
first.objects[0].add_attribute('ip', '8.9.9.10')
first = self.user_misp_connector.update_event(first) first = self.user_misp_connector.update_event(first)
self.assertEqual(first.objects[0].attributes[2].value, '8.9.9.8') self.assertEqual(first.objects[0].attributes[2].value, '8.9.9.8')
self.assertEqual(first.objects[0].attributes[3].value, '8.9.9.10')
# Update object only # Update object only
misp_object = self.user_misp_connector.get_object(first.objects[0].id) misp_object = self.user_misp_connector.get_object(first.objects[0].id)
misp_object.attributes[2].value = '8.9.9.9' misp_object.attributes[2].value = '8.9.9.9'
misp_object = self.user_misp_connector.update_object(misp_object) misp_object = self.user_misp_connector.update_object(misp_object)
self.assertEqual(misp_object.attributes[2].value, '8.9.9.9') self.assertEqual(misp_object.attributes[2].value, '8.9.9.9')
# Test with add_attributes
second = self.create_simple_event()
ip_dom = MISPObject('domain-ip')
ip_dom.add_attribute('domain', value='google.fr')
ip_dom.add_attributes('ip', {'value': '10.8.8.8', 'to_ids': False}, '10.9.8.8')
ip_dom.add_attributes('ip', '11.8.8.8', '11.9.8.8')
second.add_object(ip_dom)
second = self.user_misp_connector.add_event(second)
self.assertEqual(len(second.objects[0].attributes), 5)
self.assertFalse(second.objects[0].attributes[1].to_ids)
self.assertTrue(second.objects[0].attributes[2].to_ids)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
def test_domain_ip_object(self):
first = self.create_simple_event()
try:
dom_ip_obj = DomainIPObject({'ip': ['1.1.1.1', {'value': '2.2.2.2', 'to_ids': False}],
'first-seen': '20190101',
'last-seen': '2019-02-03',
'domain': 'circl.lu'})
first.add_object(dom_ip_obj)
first = self.user_misp_connector.add_event(first)
self.assertEqual(len(first.objects[0].attributes), 5)
finally: finally:
# Delete event # Delete event
self.admin_misp_connector.delete_event(first.id) self.admin_misp_connector.delete_event(first.id)