chg: Add support for rapidjson, refactoring and code cleanup.

pull/470/head
Raphaël Vinot 2019-10-09 16:07:40 +02:00
parent 898bc96ea6
commit 02659a5782
9 changed files with 154 additions and 123 deletions

View File

@ -5,7 +5,7 @@ import argparse
import json
try:
from pymisp import MISPEncode, AbstractMISP
from pymisp import pymisp_json_default, AbstractMISP
from pymisp.tools import make_binary_objects
except ImportError:
pass
@ -51,7 +51,8 @@ def make_objects(path):
to_return['objects'].append(fo)
if fo.ObjectReference:
to_return['references'] += fo.ObjectReference
return json.dumps(to_return, cls=MISPEncode)
return json.dumps(to_return, default=pymisp_json_default)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Extract indicators out of binaries and returns MISP objects.')

View File

@ -31,7 +31,7 @@ try:
warning_2020()
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa
from .api import PyMISP # noqa
from .abstract import AbstractMISP, MISPEncode, MISPTag, Distribution, ThreatLevel, Analysis # noqa
from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation # noqa
from .tools import AbstractMISPObjectGenerator # noqa
from .tools import Neo4j # noqa

View File

@ -3,8 +3,23 @@
import sys
import datetime
import json
from deprecated import deprecated
from json import JSONEncoder
try:
from rapidjson import load
from rapidjson import loads
from rapidjson import dumps
import rapidjson
HAS_RAPIDJSON = True
except ImportError:
from json import load
from json import loads
from json import dumps
import json
HAS_RAPIDJSON = False
import logging
from enum import Enum
@ -13,7 +28,6 @@ from .exceptions import PyMISPInvalidFormat
logger = logging.getLogger('pymisp')
if sys.version_info < (3, 0):
from collections import MutableMapping
import os
@ -22,7 +36,7 @@ if sys.version_info < (3, 0):
resources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
misp_objects_path = os.path.join(resources_path, 'misp-objects', 'objects')
with open(os.path.join(resources_path, 'describeTypes.json'), 'r') as f:
describe_types = json.load(f)['result']
describe_types = load(f)['result']
# This is required because Python 2 is a pain.
from datetime import tzinfo, timedelta
@ -55,9 +69,9 @@ if sys.version_info < (3, 0):
return data
with open(path, 'rb') as f:
if OLD_PY3:
data = json.loads(f.read().decode())
data = loads(f.read().decode())
else:
data = json.load(f)
data = load(f)
file_cache[path] = (mtime, data)
return data
@ -69,7 +83,7 @@ else:
resources_path = Path(__file__).parent / 'data'
misp_objects_path = resources_path / 'misp-objects' / 'objects'
with (resources_path / 'describeTypes.json').open('r') as f:
describe_types = json.load(f)['result']
describe_types = load(f)['result']
class MISPFileCache(object):
# cache up to 150 JSON structures in class attribute
@ -78,7 +92,7 @@ else:
@lru_cache(maxsize=150)
def _load_json(path):
with path.open('rb') as f:
data = json.load(f)
data = load(f)
return data
if (3, 0) <= sys.version_info < (3, 6):
@ -117,8 +131,8 @@ def _int_to_str(d):
return d
@deprecated(reason=" Use method default=pymisp_json_default instead of cls=MISPEncode", version='2.4.117', action='default')
class MISPEncode(JSONEncoder):
def default(self, obj):
if isinstance(obj, AbstractMISP):
return obj.jsonable()
@ -129,6 +143,26 @@ class MISPEncode(JSONEncoder):
return JSONEncoder.default(self, obj)
if HAS_RAPIDJSON:
def pymisp_json_default(obj):
if isinstance(obj, AbstractMISP):
return obj.jsonable()
elif isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
elif isinstance(obj, Enum):
return obj.value
return rapidjson.default(obj)
else:
def pymisp_json_default(obj):
if isinstance(obj, AbstractMISP):
return obj.jsonable()
elif isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
elif isinstance(obj, Enum):
return obj.value
return json.default(obj)
class AbstractMISP(MutableMapping, MISPFileCache):
__resources_path = resources_path
__misp_objects_path = misp_objects_path
@ -179,13 +213,6 @@ class AbstractMISP(MutableMapping, MISPFileCache):
misp_objects_path = Path(misp_objects_path)
self.__misp_objects_path = misp_objects_path
@property
def properties(self):
"""All the class public properties that will be dumped in the dictionary, and the JSON export.
Note: all the properties starting with a `_` (private), or listed in __not_jsonable will be skipped.
"""
return [k for k in vars(self).keys() if not (k[0] == '_' or k in self.__not_jsonable)]
def from_dict(self, **kwargs):
"""Loading all the parameters as class properties, if they aren't `None`.
This method aims to be called when all the properties requiring a special
@ -209,21 +236,21 @@ class AbstractMISP(MutableMapping, MISPFileCache):
def from_json(self, json_string):
"""Load a JSON string"""
self.from_dict(**json.loads(json_string))
self.from_dict(**loads(json_string))
def to_dict(self):
"""Dump the lass to a dictionary.
"""Dump the class to a dictionary.
This method automatically removes the timestamp recursively in every object
that has been edited is order to let MISP update the event accordingly."""
is_edited = self.edited
to_return = {}
for attribute in self.properties:
val = getattr(self, attribute, None)
for attribute, val in self.items():
if val is None:
continue
elif isinstance(val, list) and len(val) == 0:
continue
if attribute == 'timestamp':
if not self.__force_timestamps and self.edited:
if not self.__force_timestamps and is_edited:
# In order to be accepted by MISP, the timestamp of an object
# needs to be either newer, or None.
# If the current object is marked as edited, the easiest is to
@ -239,13 +266,15 @@ class AbstractMISP(MutableMapping, MISPFileCache):
"""This method is used by the JSON encoder"""
return self.to_dict()
def to_json(self):
def to_json(self, sort_keys=False, indent=None):
"""Dump recursively any class of type MISPAbstract to a json string"""
return json.dumps(self, cls=MISPEncode, sort_keys=True, indent=2)
return dumps(self, default=pymisp_json_default, sort_keys=sort_keys, indent=indent)
def __getitem__(self, key):
try:
return getattr(self, key)
if key[0] != '_' and key not in self.__not_jsonable:
return self.__dict__[key]
raise KeyError
except AttributeError:
# Expected by pop and other dict-related methods
raise KeyError
@ -257,10 +286,10 @@ class AbstractMISP(MutableMapping, MISPFileCache):
delattr(self, key)
def __iter__(self):
return iter(self.to_dict())
return iter({k: v for k, v in self.__dict__.items() if not (k[0] == '_' or k in self.__not_jsonable)})
def __len__(self):
return len(self.to_dict())
return len([k for k in self.__dict__.keys() if not (k[0] == '_' or k in self.__not_jsonable)])
@property
def edited(self):
@ -268,15 +297,14 @@ class AbstractMISP(MutableMapping, MISPFileCache):
to the parent objects"""
if self.__edited:
return self.__edited
for p in self.properties:
if self.__edited:
break
val = getattr(self, p)
for p, val in self.items():
if isinstance(val, AbstractMISP) and val.edited:
self.__edited = True
break
elif isinstance(val, list) and all(isinstance(a, AbstractMISP) for a in val):
if any(a.edited for a in val):
self.__edited = True
break
return self.__edited
@edited.setter
@ -288,9 +316,10 @@ class AbstractMISP(MutableMapping, MISPFileCache):
raise Exception('edited can only be True or False')
def __setattr__(self, name, value):
if name != '_AbstractMISP__edited':
if not self.__edited and name in self.properties:
self.__edited = True
if name[0] != '_' and not self.__edited and name in self.keys():
# The private members don't matter
# If we already have a key with that name, we're modifying it.
self.__edited = True
super(AbstractMISP, self).__setattr__(name, value)
def _datetime_to_timestamp(self, d):

View File

@ -19,7 +19,7 @@ from deprecated import deprecated
from . import __version__, warning_2020
from .exceptions import PyMISPError, SearchError, NoURL, NoKey, PyMISPEmptyResponse
from .mispevent import MISPEvent, MISPAttribute, MISPUser, MISPOrganisation, MISPSighting, MISPFeed, MISPObject, MISPSharingGroup
from .abstract import AbstractMISP, MISPEncode, MISPFileCache, describe_types
from .abstract import AbstractMISP, pymisp_json_default, MISPFileCache, describe_types
logger = logging.getLogger('pymisp')
@ -162,7 +162,7 @@ class PyMISP(MISPFileCache): # pragma: no cover
if isinstance(data, dict):
# Remove None values.
data = {k: v for k, v in data.items() if v is not None}
data = json.dumps(data, cls=MISPEncode)
data = json.dumps(data, default=pymisp_json_default)
req = requests.Request(request_type, url, data=data)
if self.asynch and background_callback is not None:
local_session = FuturesSession
@ -604,7 +604,7 @@ class PyMISP(MISPFileCache): # pragma: no cover
else:
data = attributes.to_json()
# _prepare_request(...) returns a requests.Response Object
resp = self._prepare_request('POST', url, json.dumps(data, cls=MISPEncode))
resp = self._prepare_request('POST', url, json.dumps(data, default=pymisp_json_default))
try:
responses.append(resp.json())
except Exception:
@ -1058,7 +1058,7 @@ class PyMISP(MISPFileCache): # pragma: no cover
url = urljoin(self.root_url, 'shadow_attributes/{}/{}'.format(path, id))
if path in ['add', 'edit']:
query = {'request': {'ShadowAttribute': attribute}}
response = self._prepare_request('POST', url, json.dumps(query, cls=MISPEncode))
response = self._prepare_request('POST', url, json.dumps(query, default=pymisp_json_default))
elif path == 'view':
response = self._prepare_request('GET', url)
else: # accept or discard

View File

@ -19,7 +19,7 @@ from . import __version__
from .exceptions import MISPServerError, PyMISPUnexpectedResponse, PyMISPNotImplementedYet, PyMISPError, NoURL, NoKey
from .api import everything_broken, PyMISP
from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObject, MISPUser, MISPOrganisation, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity
from .abstract import MISPEncode, MISPTag, AbstractMISP, describe_types
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types
SearchType = TypeVar('SearchType', str, int)
# str: string to search / list: values to search (OR) / dict: {'OR': [list], 'NOT': [list], 'AND': [list]}
@ -2109,7 +2109,7 @@ class ExpandedPyMISP(PyMISP):
if isinstance(data, dict): # Else, we can directly json encode.
# Remove None values.
data = {k: v for k, v in data.items() if v is not None}
data = json.dumps(data, cls=MISPEncode)
data = json.dumps(data, default=pymisp_json_default)
if kw_params:
# CakePHP params in URL

View File

@ -198,8 +198,8 @@ class MISPAttribute(AbstractMISP):
return misp_sighting
def from_dict(self, **kwargs):
if kwargs.get('Attribute'):
kwargs = kwargs.get('Attribute')
if 'Attribute' in kwargs:
kwargs = kwargs['Attribute']
if kwargs.get('type') and kwargs.get('category'):
if kwargs['type'] not in self.__category_type_mapping[kwargs['category']]:
if self.__strict:
@ -272,14 +272,11 @@ class MISPAttribute(AbstractMISP):
raise NewAttributeError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id))
if kwargs.get('Tag'):
for tag in kwargs.pop('Tag'):
self.add_tag(tag)
[self.add_tag(tag) for tag in kwargs.pop('Tag')]
if kwargs.get('Sighting'):
for sighting in kwargs.pop('Sighting'):
self.add_sighting(sighting)
[self.add_sighting(sighting) for sighting in kwargs.pop('Sighting')]
if kwargs.get('ShadowAttribute'):
for s_attr in kwargs.pop('ShadowAttribute'):
self.add_shadow_attribute(s_attr)
[self.add_shadow_attribute(s_attr) for s_attr in kwargs.pop('ShadowAttribute')]
# If the user wants to disable correlation, let them. Defaults to False.
self.disable_correlation = kwargs.pop("disable_correlation", False)
@ -535,8 +532,8 @@ class MISPEvent(AbstractMISP):
raise NewEventError('Invalid format for the date: {} - {}'.format(date, type(date)))
def from_dict(self, **kwargs):
if kwargs.get('Event'):
kwargs = kwargs.get('Event')
if 'Event' in kwargs:
kwargs = kwargs['Event']
# Required value
self.info = kwargs.pop('info', None)
if self.info is None:
@ -568,8 +565,7 @@ class MISPEvent(AbstractMISP):
if kwargs.get('date'):
self.set_date(kwargs.pop('date'))
if kwargs.get('Attribute'):
for a in kwargs.pop('Attribute'):
self.add_attribute(**a)
[self.add_attribute(**a) for a in kwargs.pop('Attribute')]
# All other keys
if kwargs.get('id'):
@ -596,11 +592,9 @@ class MISPEvent(AbstractMISP):
sub_event.load(rel_event)
self.RelatedEvent.append({'Event': sub_event})
if kwargs.get('Tag'):
for tag in kwargs.pop('Tag'):
self.add_tag(tag)
[self.add_tag(tag) for tag in kwargs.pop('Tag')]
if kwargs.get('Object'):
for obj in kwargs.pop('Object'):
self.add_object(obj)
[self.add_object(obj) for obj in kwargs.pop('Object')]
if kwargs.get('Org'):
self.Org = MISPOrganisation()
self.Org.from_dict(**kwargs.pop('Org'))
@ -860,8 +854,8 @@ class MISPObjectReference(AbstractMISP):
super(MISPObjectReference, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('ObjectReference'):
kwargs = kwargs.get('ObjectReference')
if 'ObjectReference' in kwargs:
kwargs = kwargs['ObjectReference']
super(MISPObjectReference, self).from_dict(**kwargs)
def __repr__(self):
@ -876,8 +870,8 @@ class MISPObjectTemplate(AbstractMISP):
super(MISPObjectTemplate, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('ObjectTemplate'):
kwargs = kwargs.get('ObjectTemplate')
if 'ObjectTemplate' in kwargs:
kwargs = kwargs['ObjectTemplate']
super(MISPObjectTemplate, self).from_dict(**kwargs)
@ -887,8 +881,8 @@ class MISPUser(AbstractMISP):
super(MISPUser, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('User'):
kwargs = kwargs.get('User')
if 'User' in kwargs:
kwargs = kwargs['User']
super(MISPUser, self).from_dict(**kwargs)
def __repr__(self):
@ -903,8 +897,8 @@ class MISPOrganisation(AbstractMISP):
super(MISPOrganisation, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Organisation'):
kwargs = kwargs.get('Organisation')
if 'Organisation' in kwargs:
kwargs = kwargs['Organisation']
super(MISPOrganisation, self).from_dict(**kwargs)
@ -914,8 +908,8 @@ class MISPFeed(AbstractMISP):
super(MISPFeed, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Feed'):
kwargs = kwargs.get('Feed')
if 'Feed' in kwargs:
kwargs = kwargs['Feed']
super(MISPFeed, self).from_dict(**kwargs)
@ -925,8 +919,8 @@ class MISPWarninglist(AbstractMISP):
super(MISPWarninglist, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Warninglist'):
kwargs = kwargs.get('Warninglist')
if 'Warninglist' in kwargs:
kwargs = kwargs['Warninglist']
super(MISPWarninglist, self).from_dict(**kwargs)
@ -936,8 +930,8 @@ class MISPTaxonomy(AbstractMISP):
super(MISPTaxonomy, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Taxonomy'):
kwargs = kwargs.get('Taxonomy')
if 'Taxonomy' in kwargs:
kwargs = kwargs['Taxonomy']
super(MISPTaxonomy, self).from_dict(**kwargs)
@ -947,8 +941,8 @@ class MISPGalaxy(AbstractMISP):
super(MISPGalaxy, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Galaxy'):
kwargs = kwargs.get('Galaxy')
if 'Galaxy' in kwargs:
kwargs = kwargs['Galaxy']
super(MISPGalaxy, self).from_dict(**kwargs)
@ -958,8 +952,8 @@ class MISPNoticelist(AbstractMISP):
super(MISPNoticelist, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Noticelist'):
kwargs = kwargs.get('Noticelist')
if 'Noticelist' in kwargs:
kwargs = kwargs['Noticelist']
super(MISPNoticelist, self).from_dict(**kwargs)
@ -969,8 +963,8 @@ class MISPRole(AbstractMISP):
super(MISPRole, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Role'):
kwargs = kwargs.get('Role')
if 'Role' in kwargs:
kwargs = kwargs['Role']
super(MISPRole, self).from_dict(**kwargs)
@ -980,8 +974,8 @@ class MISPServer(AbstractMISP):
super(MISPServer, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Server'):
kwargs = kwargs.get('Server')
if 'Server' in kwargs:
kwargs = kwargs['Server']
super(MISPServer, self).from_dict(**kwargs)
@ -991,8 +985,8 @@ class MISPSharingGroup(AbstractMISP):
super(MISPSharingGroup, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('SharingGroup'):
kwargs = kwargs.get('SharingGroup')
if 'SharingGroup' in kwargs:
kwargs = kwargs['SharingGroup']
super(MISPSharingGroup, self).from_dict(**kwargs)
@ -1002,8 +996,8 @@ class MISPLog(AbstractMISP):
super(MISPLog, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Log'):
kwargs = kwargs.get('Log')
if 'Log' in kwargs:
kwargs = kwargs['Log']
super(MISPLog, self).from_dict(**kwargs)
def __repr__(self):
@ -1016,8 +1010,8 @@ class MISPEventDelegation(AbstractMISP):
super(MISPEventDelegation, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('EventDelegation'):
kwargs = kwargs.get('EventDelegation')
if 'EventDelegation' in kwargs:
kwargs = kwargs['EventDelegation']
super(MISPEventDelegation, self).from_dict(**kwargs)
def __repr__(self):
@ -1039,8 +1033,8 @@ class MISPSighting(AbstractMISP):
:type: Type of the sighting
:timestamp: Timestamp associated to the sighting
"""
if kwargs.get('Sighting'):
kwargs = kwargs.get('Sighting')
if 'Sighting' in kwargs:
kwargs = kwargs['Sighting']
super(MISPSighting, self).from_dict(**kwargs)
def __repr__(self):
@ -1062,6 +1056,8 @@ class MISPObjectAttribute(MISPAttribute):
def from_dict(self, object_relation, value, **kwargs):
self.object_relation = object_relation
self.value = value
if 'Attribute' in kwargs:
kwargs = kwargs['Attribute']
# Initialize the new MISPAttribute
# Get the misp attribute type from the definition
self.type = kwargs.pop('type', None)
@ -1078,7 +1074,10 @@ class MISPObjectAttribute(MISPAttribute):
self.to_ids = self._definition.get('to_ids')
if not self.type:
raise NewAttributeError("The type of the attribute is required. Is the object template missing?")
super(MISPObjectAttribute, self).from_dict(**dict(self, **kwargs))
if sys.version_info < (3, 5):
super(MISPObjectAttribute, self).from_dict(**dict(self, **kwargs))
else:
super(MISPObjectAttribute, self).from_dict(**{**self, **kwargs})
def __repr__(self):
if hasattr(self, 'value'):
@ -1092,8 +1091,8 @@ class MISPShadowAttribute(AbstractMISP):
super(MISPShadowAttribute, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('ShadowAttribute'):
kwargs = kwargs.get('ShadowAttribute')
if 'ShadowAttribute' in kwargs:
kwargs = kwargs['ShadowAttribute']
super(MISPShadowAttribute, self).from_dict(**kwargs)
def __repr__(self):
@ -1108,8 +1107,8 @@ class MISPCommunity(AbstractMISP):
super(MISPCommunity, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Community'):
kwargs = kwargs.get('Community')
if 'Community' in kwargs:
kwargs = kwargs['Community']
super(MISPCommunity, self).from_dict(**kwargs)
def __repr__(self):
@ -1231,8 +1230,8 @@ class MISPObject(AbstractMISP):
raise PyMISPError('All the attributes have to be of type MISPObjectReference.')
def from_dict(self, **kwargs):
if kwargs.get('Object'):
kwargs = kwargs.get('Object')
if 'Object' in kwargs:
kwargs = kwargs['Object']
if self._known_template:
if kwargs.get('template_uuid') and kwargs['template_uuid'] != self.template_uuid:
if self._strict:
@ -1260,11 +1259,9 @@ class MISPObject(AbstractMISP):
else:
self.timestamp = datetime.datetime.fromtimestamp(int(ts), UTC())
if kwargs.get('Attribute'):
for a in kwargs.pop('Attribute'):
self.add_attribute(**a)
[self.add_attribute(**a) for a in kwargs.pop('Attribute')]
if kwargs.get('ObjectReference'):
for r in kwargs.pop('ObjectReference'):
self.add_reference(**r)
[self.add_reference(**r) for r in kwargs.pop('ObjectReference')]
# Not supported yet - https://github.com/MISP/PyMISP/issues/168
# if kwargs.get('Tag'):
@ -1323,7 +1320,10 @@ class MISPObject(AbstractMISP):
else:
attribute = MISPObjectAttribute({})
# Overwrite the parameters of self._default_attributes_parameters with the ones of value
attribute.from_dict(object_relation=object_relation, **dict(self._default_attributes_parameters, **value))
if sys.version_info < (3, 5):
attribute.from_dict(object_relation=object_relation, **dict(self._default_attributes_parameters, **value))
else:
attribute.from_dict(object_relation=object_relation, **{**self._default_attributes_parameters, **value})
self.__fast_attribute_access[object_relation].append(attribute)
self.Attribute.append(attribute)
self.edited = True

View File

@ -3919,7 +3919,7 @@
"date": "2017-12-14",
"distribution": "3",
"id": "9616",
"info": "OSINT - Attackers Deploy New ICS Attack Framework “TRITON” and Cause Operational Disruption to Critical Infrastructure",
"info": "OSINT - Attackers Deploy New ICS Attack Framework \"TRITON\" and Cause Operational Disruption to Critical Infrastructure",
"org_id": "2",
"orgc_id": "2",
"published": false,
@ -4019,7 +4019,7 @@
"date": "2017-10-23",
"distribution": "3",
"id": "9208",
"info": "Talos: “Cyber Conflict” Decoy Document Used In Real Cyber Conflict",
"info": "Talos: \"Cyber Conflict\" Decoy Document Used In Real Cyber Conflict",
"org_id": "291",
"orgc_id": "291",
"published": true,

View File

@ -3922,7 +3922,7 @@
"date": "2017-12-14",
"distribution": "3",
"id": "9616",
"info": "OSINT - Attackers Deploy New ICS Attack Framework “TRITON” and Cause Operational Disruption to Critical Infrastructure",
"info": "OSINT - Attackers Deploy New ICS Attack Framework \"TRITON\" and Cause Operational Disruption to Critical Infrastructure",
"org_id": "2",
"orgc_id": "2",
"published": false,
@ -4022,7 +4022,7 @@
"date": "2017-10-23",
"distribution": "3",
"id": "9208",
"info": "Talos: “Cyber Conflict” Decoy Document Used In Real Cyber Conflict",
"info": "Talos: \"Cyber Conflict\" Decoy Document Used In Real Cyber Conflict",
"org_id": "291",
"orgc_id": "291",
"published": true,

View File

@ -26,20 +26,20 @@ class TestMISPEvent(unittest.TestCase):
def test_simple(self):
with open('tests/mispevent_testfiles/simple.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_event(self):
self.init_event()
self.mispevent.publish()
with open('tests/mispevent_testfiles/event.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_loadfile(self):
self.mispevent.load_file('tests/mispevent_testfiles/event.json')
with open('tests/mispevent_testfiles/event.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_event_tag(self):
self.init_event()
@ -50,7 +50,7 @@ class TestMISPEvent(unittest.TestCase):
self.mispevent.add_tag(new_tag)
with open('tests/mispevent_testfiles/event_tags.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_attribute(self):
self.init_event()
@ -62,13 +62,13 @@ class TestMISPEvent(unittest.TestCase):
self.assertEqual(attr_tags[0].name, 'osint')
with open('tests/mispevent_testfiles/attribute.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
# Fake setting an attribute ID for testing
self.mispevent.attributes[0].id = 42
self.mispevent.delete_attribute(42)
with open('tests/mispevent_testfiles/attribute_del.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_object_tag(self):
self.mispevent.add_object(name='file', strict=True)
@ -90,7 +90,7 @@ class TestMISPEvent(unittest.TestCase):
self.assertEqual(self.mispevent.objects[0].references[0].relationship_type, 'baz')
with open('tests/mispevent_testfiles/event_obj_attr_tag.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
@unittest.skip("Not supported on MISP: https://github.com/MISP/MISP/issues/2638 - https://github.com/MISP/PyMISP/issues/168")
def test_object_level_tag(self):
@ -100,7 +100,7 @@ class TestMISPEvent(unittest.TestCase):
self.mispevent.objects[0].uuid = 'a'
with open('tests/mispevent_testfiles/event_obj_tag.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_malware(self):
with open('tests/mispevent_testfiles/simple.json', 'rb') as f:
@ -112,7 +112,7 @@ class TestMISPEvent(unittest.TestCase):
self.assertEqual(attribute.malware_binary, pseudofile)
with open('tests/mispevent_testfiles/malware.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_existing_malware(self):
self.mispevent.load_file('tests/mispevent_testfiles/malware_exist.json')
@ -127,19 +127,20 @@ class TestMISPEvent(unittest.TestCase):
sighting.from_dict(value='1', type='bar', timestamp=11111111)
with open('tests/mispevent_testfiles/sighting.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(sighting.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
self.assertEqual(sighting.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_existing_event(self):
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
with open('tests/mispevent_testfiles/existing_event.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_shadow_attributes_existing(self):
self.mispevent.load_file('tests/mispevent_testfiles/shadow.json')
with open('tests/mispevent_testfiles/shadow.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
@unittest.skip("Not supported on MISP.")
def test_shadow_attributes(self):
@ -152,7 +153,7 @@ class TestMISPEvent(unittest.TestCase):
del p.uuid
with open('tests/mispevent_testfiles/proposals.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_default_attributes(self):
self.mispevent.add_object(name='file', strict=True)
@ -165,7 +166,7 @@ class TestMISPEvent(unittest.TestCase):
self.mispevent.objects[1].uuid = 'b'
with open('tests/mispevent_testfiles/event_obj_def_param.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_obj_default_values(self):
self.init_event()
@ -181,7 +182,7 @@ class TestMISPEvent(unittest.TestCase):
self.mispevent.objects[0].uuid = 'a'
with open('tests/mispevent_testfiles/def_param.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_event_not_edited(self):
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
@ -246,7 +247,7 @@ class TestMISPEvent(unittest.TestCase):
self.assertTrue(self.mispevent.edited)
with open('tests/mispevent_testfiles/existing_event_edited.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_obj_by_id(self):
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
@ -258,7 +259,7 @@ class TestMISPEvent(unittest.TestCase):
self.mispevent.add_object(name='test_object_template', strict=True, misp_objects_path_custom='tests/mispevent_testfiles')
with self.assertRaises(InvalidMISPObject) as e:
# Fail on required
self.mispevent.to_json()
self.mispevent.to_json(sort_keys=True, indent=2)
if sys.version_info >= (3, ):
self.assertEqual(e.exception.message, '{\'member3\'} are required.')
else:
@ -269,7 +270,7 @@ class TestMISPEvent(unittest.TestCase):
del a.uuid
with self.assertRaises(InvalidMISPObject) as e:
# Fail on requiredOneOf
self.mispevent.to_json()
self.mispevent.to_json(sort_keys=True, indent=2)
self.assertEqual(e.exception.message, 'At least one of the following attributes is required: member1, member2')
a = self.mispevent.objects[0].add_attribute('member1', value='bar')
@ -278,14 +279,14 @@ class TestMISPEvent(unittest.TestCase):
del a.uuid
with self.assertRaises(InvalidMISPObject) as e:
# member1 is not a multiple
self.mispevent.to_json()
self.mispevent.to_json(sort_keys=True, indent=2)
self.assertEqual(e.exception.message, 'Multiple occurrences of member1 is not allowed')
self.mispevent.objects[0].attributes = self.mispevent.objects[0].attributes[:2]
self.mispevent.objects[0].uuid = 'a'
with open('tests/mispevent_testfiles/misp_custom_obj.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
if __name__ == '__main__':