new: Test cases for CSV loader, add cleaner methods in ExpandedPyMISP

pull/382/head
Raphaël Vinot 2019-04-03 17:46:52 +02:00
parent e5a42b812f
commit 1de4c9d0b9
3 changed files with 56 additions and 26 deletions

View File

@ -107,6 +107,16 @@ class ExpandedPyMISP(PyMISP):
o.from_dict(**created_object)
return o
def update_object(self, misp_object: MISPObject):
updated_object = super().edit_object(misp_object)
if isinstance(updated_object, str):
raise NewEventError(f'Unexpected response from server: {updated_object}')
elif 'errors' in updated_object:
return updated_object
o = MISPObject(misp_object.name)
o.from_dict(**updated_object)
return o
def add_event(self, event: MISPEvent):
created_event = super().add_event(event)
if isinstance(created_event, str):

View File

@ -230,10 +230,13 @@ class MISPAttribute(AbstractMISP):
if kwargs.get('event_id'):
self.event_id = int(kwargs.pop('event_id'))
if kwargs.get('timestamp'):
if sys.version_info >= (3, 3):
self.timestamp = datetime.datetime.fromtimestamp(int(kwargs.pop('timestamp')), datetime.timezone.utc)
ts = kwargs.pop('timestamp')
if isinstance(ts, datetime.datetime):
self.timestamp = ts
elif sys.version_info >= (3, 3):
self.timestamp = datetime.datetime.fromtimestamp(int(ts), datetime.timezone.utc)
else:
self.timestamp = datetime.datetime.fromtimestamp(int(kwargs.pop('timestamp')), UTC())
self.timestamp = datetime.datetime.fromtimestamp(int(ts), UTC())
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
@ -1044,10 +1047,13 @@ class MISPObject(AbstractMISP):
raise NewAttributeError('{} is invalid, the distribution has to be in 0, 1, 2, 3, 4, 5'.format(self.distribution))
if kwargs.get('timestamp'):
if sys.version_info >= (3, 3):
self.timestamp = datetime.datetime.fromtimestamp(int(kwargs.pop('timestamp')), datetime.timezone.utc)
ts = kwargs.pop('timestamp')
if isinstance(ts, datetime.datetime):
self.timestamp = ts
elif sys.version_info >= (3, 3):
self.timestamp = datetime.datetime.fromtimestamp(int(ts), datetime.timezone.utc)
else:
self.timestamp = datetime.datetime.fromtimestamp(int(kwargs.pop('timestamp')), UTC())
self.timestamp = datetime.datetime.fromtimestamp(int(ts), UTC())
if kwargs.get('Attribute'):
for a in kwargs.pop('Attribute'):
self.add_attribute(**a)

View File

@ -11,6 +11,7 @@ from datetime import datetime, timedelta, date
from io import BytesIO
import re
import json
from pathlib import Path
import time
from uuid import uuid4
@ -20,6 +21,7 @@ logging.disable(logging.CRITICAL)
try:
from pymisp import ExpandedPyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject
from pymisp.tools import CSVLoader
except ImportError:
if sys.version_info < (3, 6):
print('This test suite requires Python 3.6+, breaking.')
@ -497,17 +499,17 @@ class TestComprehensive(unittest.TestCase):
# Object - add
o = MISPObject('file')
o.add_attribute('filename', value='blah.exe')
new_obj = self.user_misp_connector.add_object(first.id, o.template_uuid, o)
new_obj = self.user_misp_connector.add_object(first.id, o)
# FIXME: Add helper that returns a MISPObject
self.assertEqual(new_obj['Object']['distribution'], str(Distribution.inherit.value))
self.assertEqual(new_obj['Object']['Attribute'][0]['distribution'], str(Distribution.inherit.value))
self.assertEqual(new_obj.distribution, int(Distribution.inherit.value))
self.assertEqual(new_obj.attributes[0].distribution, int(Distribution.inherit.value))
# Object - edit
clean_obj = MISPObject(strict=True, **new_obj['Object'])
clean_obj.from_dict(**new_obj['Object'])
clean_obj = MISPObject(name=new_obj.name, strict=True)
clean_obj.from_dict(**new_obj)
clean_obj.add_attribute('filename', value='blah.exe')
new_obj = self.user_misp_connector.edit_object(clean_obj)
for a in new_obj['Object']['Attribute']:
self.assertEqual(a['distribution'], str(Distribution.inherit.value))
new_obj = self.user_misp_connector.update_object(clean_obj)
for a in new_obj.attributes:
self.assertEqual(a.distribution, int(Distribution.inherit.value))
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
@ -945,23 +947,17 @@ class TestComprehensive(unittest.TestCase):
first = self.user_misp_connector.add_event(first)
fo, peo, seos = make_binary_objects('tests/viper-test-files/test_files/whoami.exe')
for s in seos:
template_id = self.user_misp_connector.get_object_template_id(s.template_uuid)
r = self.user_misp_connector.add_object(first.id, template_id, s)
self.assertTrue('Object' in r, r)
self.assertEqual(r['Object']['name'], 'pe-section', r)
r = self.user_misp_connector.add_object(first.id, s)
self.assertEqual(r.name, 'pe-section', r)
template_id = self.user_misp_connector.get_object_template_id(peo.template_uuid)
r = self.user_misp_connector.add_object(first.id, template_id, peo)
self.assertTrue('Object' in r, r)
self.assertEqual(r['Object']['name'], 'pe', r)
r = self.user_misp_connector.add_object(first.id, peo)
self.assertEqual(r.name, 'pe', r)
for ref in peo.ObjectReference:
r = self.user_misp_connector.add_object_reference(ref)
self.assertTrue('ObjectReference' in r, r)
template_id = self.user_misp_connector.get_object_template_id(fo.template_uuid)
r = self.user_misp_connector.add_object(first.id, template_id, fo)
self.assertTrue('Object' in r, r)
self.assertEqual(r['Object']['name'], 'file', r)
r = self.user_misp_connector.add_object(first.id, fo)
self.assertEqual(r.name, 'file', r)
for ref in fo.ObjectReference:
r = self.user_misp_connector.add_object_reference(ref)
self.assertTrue('ObjectReference' in r, r)
@ -1080,6 +1076,24 @@ class TestComprehensive(unittest.TestCase):
# Delete event
self.admin_misp_connector.delete_event(first.id)
def test_csv_loader(self):
csv1 = CSVLoader(template_name='file', csv_path=Path('tests/csv_testfiles/valid_fieldnames.csv'))
event = MISPEvent()
event.info = 'Test event from CSV loader'
for o in csv1.load():
event.add_object(**o)
csv2 = CSVLoader(template_name='file', csv_path=Path('tests/csv_testfiles/invalid_fieldnames.csv'),
fieldnames=['SHA1', 'fileName', 'size-in-bytes'], has_fieldnames=True)
try:
first = self.user_misp_connector.add_event(event)
for o in csv2.load():
new_object = self.user_misp_connector.add_object(first.id, o)
self.assertEqual(len(new_object.attributes), 3)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
@unittest.skip("Currently failing")
def test_search_type_event_csv(self):
try: