chg: Multiple changes

* Fix timestamp dump (properly enforce UTC)
* Properly handle proposals
* Add many getter/setter
* Add dedicated test cases for MISPEvent and other objects
pull/164/head
Raphaël Vinot 2018-01-03 14:36:10 +01:00
parent c68b69b422
commit bb1aac5720
16 changed files with 5367 additions and 54 deletions

View File

@ -26,7 +26,7 @@ install:
- popd
script:
- nosetests --with-coverage --cover-package=pymisp tests/test_offline.py
- nosetests --with-coverage --cover-package=pymisp tests/test_*.py
after_success:
- codecov

View File

@ -15,6 +15,21 @@ logger = logging.getLogger('pymisp')
if six.PY2:
logger.warning("You're using python 2, it is strongly recommended to use python >=3.5")
# This is required because Python 2 is a pain.
from datetime import tzinfo, timedelta
class UTC(tzinfo):
"""UTC"""
def utcoffset(self, dt):
return timedelta(0)
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return timedelta(0)
class MISPEncode(JSONEncoder):
@ -80,6 +95,8 @@ class AbstractMISP(collections.MutableMapping):
val = getattr(self, attribute, None)
if val is None:
continue
elif isinstance(val, list) and len(val) == 0:
continue
if attribute == 'timestamp':
if self.edited:
# In order to be accepted by MISP, the timestamp of an object
@ -98,7 +115,7 @@ class AbstractMISP(collections.MutableMapping):
def to_json(self):
"""Dump recursively any class of type MISPAbstract to a json string"""
return json.dumps(self, cls=MISPEncode)
return json.dumps(self, cls=MISPEncode, sort_keys=True, indent=2)
def __getitem__(self, key):
try:
@ -150,10 +167,10 @@ class AbstractMISP(collections.MutableMapping):
def _datetime_to_timestamp(self, d):
"""Convert a datetime.datetime object to a timestamp (int)"""
if isinstance(d, (int, str)):
if isinstance(d, (int, str)) or (sys.version_info < (3, 0) and isinstance(d, unicode)):
# Assume we already have a timestamp
return d
if sys.version_info >= (3, 3):
return d.timestamp()
return int(d.timestamp())
else:
return (d - datetime.datetime.utcfromtimestamp(0)).total_seconds()
return int((d - datetime.datetime.fromtimestamp(0, UTC())).total_seconds())

View File

@ -25,6 +25,21 @@ logger = logging.getLogger('pymisp')
if six.PY2:
logger.warning("You're using python 2, it is strongly recommended to use python >=3.5")
# This is required because Python 2 is a pain.
from datetime import tzinfo, timedelta
class UTC(tzinfo):
"""UTC"""
def utcoffset(self, dt):
return timedelta(0)
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return timedelta(0)
try:
from dateutil.parser import parse
except ImportError:
@ -80,44 +95,86 @@ class MISPAttribute(AbstractMISP):
self.__category_type_mapping = describe_types['category_type_mappings']
self.__sane_default = describe_types['sane_defaults']
self.Tag = []
self.ShadowAttribute = []
@property
def known_types(self):
"""Returns a list of all the known MISP attributes types"""
return self._types
@property
def malware_binary(self):
"""Returns a BytesIO of the malware (if the attribute has one, obvs)."""
if hasattr(self, '_malware_binary'):
return self._malware_binary
return None
@property
def tags(self):
"""Returns a lost of tags associated to this Attribute"""
return self.Tag
@tags.setter
def tags(self, tags):
"""Set a list of prepared MISPTag."""
if all(isinstance(x, MISPTag) for x in tags):
self.Tag = tags
else:
raise PyMISPError('All the attributes have to be of type MISPAttribute.')
raise PyMISPError('All the attributes have to be of type MISPTag.')
@property
def shadow_attributes(self):
return self.ShadowAttribute
@shadow_attributes.setter
def shadow_attributes(self, shadow_attributes):
"""Set a list of prepared MISPShadowAttribute."""
if all(isinstance(x, MISPShadowAttribute) for x in shadow_attributes):
self.ShadowAttribute = shadow_attributes
else:
raise PyMISPError('All the attributes have to be of type MISPShadowAttribute.')
def delete(self):
"""Mark the attribute as deleted (soft delete)"""
self.deleted = True
def add_tag(self, **tag):
def add_tag(self, tag=None, **kwargs):
"""Add a tag to the attribute (by name or a MISPTag object)"""
misp_tag = MISPTag()
if isinstance(tag, str):
misp_tag = MISPTag()
misp_tag.from_dict(name=tag)
elif isinstance(tag, MISPTag):
misp_tag = tag
elif isinstance(tag, dict):
misp_tag = MISPTag()
misp_tag.from_dict(**tag)
elif kwargs:
misp_tag = MISPTag()
misp_tag.from_dict(**kwargs)
else:
raise PyMISPError("The tag is in an invalid format (can be either string, or list): {}".format(tag))
raise PyMISPError("The tag is in an invalid format (can be either string, MISPTag, or an expanded dict): {}".format(tag))
self.tags.append(misp_tag)
self.edited = True
def add_proposal(self, shadow_attribute=None, **kwargs):
"""Alias for add_shadow_attribute"""
self.add_shadow_attribute(shadow_attribute, **kwargs)
def add_shadow_attribute(self, shadow_attribute=None, **kwargs):
"""Add a tag to the attribute (by name or a MISPTag object)"""
if isinstance(shadow_attribute, MISPShadowAttribute):
misp_shadow_attribute = shadow_attribute
elif isinstance(shadow_attribute, dict):
misp_shadow_attribute = MISPShadowAttribute()
misp_shadow_attribute.from_dict(**shadow_attribute)
elif kwargs:
misp_shadow_attribute = MISPShadowAttribute()
misp_shadow_attribute.from_dict(**kwargs)
else:
raise PyMISPError("The shadow_attribute is in an invalid format (can be either string, MISPShadowAttribute, or an expanded dict): {}".format(shadow_attribute))
self.shadow_attributes.append(misp_shadow_attribute)
self.edited = True
def from_dict(self, **kwargs):
if kwargs.get('type') and kwargs.get('category'):
if kwargs['type'] not in self.__category_type_mapping[kwargs['category']]:
@ -165,12 +222,18 @@ class MISPAttribute(AbstractMISP):
if kwargs.get('event_id'):
self.event_id = int(kwargs.pop('event_id'))
if kwargs.get('timestamp'):
self.timestamp = datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=int(kwargs.pop('timestamp')))
if sys.version_info >= (3, 3):
self.timestamp = datetime.datetime.fromtimestamp(int(kwargs.pop('timestamp')), datetime.timezone.utc)
else:
self.timestamp = datetime.datetime.fromtimestamp(int(kwargs.pop('timestamp')), UTC())
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
if kwargs.get('Tag'):
for tag in kwargs.pop('Tag'):
self.add_tag(**tag)
self.add_tag(tag)
if kwargs.get('ShadowAttribute'):
for s_attr in kwargs.pop('ShadowAttribute'):
self.add_shadow_attribute(s_attr)
# If the user wants to disable correlation, let them. Defaults to False.
self.disable_correlation = kwargs.pop("disable_correlation", False)
@ -240,7 +303,7 @@ class MISPAttribute(AbstractMISP):
return '<{self.__class__.__name__}(type={self.type}, value={self.value})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
def verify(self, gpg_uid):
def verify(self, gpg_uid): # pragma: no cover
# Not used
if not has_pyme:
raise PyMISPError('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
@ -253,13 +316,13 @@ class MISPAttribute(AbstractMISP):
except Exception:
return {self.uuid: False}
def _serialize(self):
def _serialize(self): # pragma: no cover
# Not used
return '{type}{category}{to_ids}{uuid}{timestamp}{comment}{deleted}{value}'.format(
type=self.type, category=self.category, to_ids=self.to_ids, uuid=self.uuid, timestamp=self.timestamp,
comment=self.comment, deleted=self.deleted, value=self.value).encode()
def sign(self, gpg_uid, passphrase=None):
def sign(self, gpg_uid, passphrase=None): # pragma: no cover
# Not used
if not has_pyme:
raise PyMISPError('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
@ -273,23 +336,23 @@ class MISPAttribute(AbstractMISP):
self.sig = base64.b64encode(signed).decode()
@deprecated
def get_known_types(self):
def get_known_types(self): # pragma: no cover
return self.known_types
@deprecated
def get_malware_binary(self):
def get_malware_binary(self): # pragma: no cover
return self.malware_binary
@deprecated
def _json(self):
def _json(self): # pragma: no cover
return self.to_dict()
@deprecated
def _json_full(self):
def _json_full(self): # pragma: no cover
return self.to_dict()
@deprecated
def set_all_values(self, **kwargs):
def set_all_values(self, **kwargs): # pragma: no cover
self.from_dict(**kwargs)
@ -314,6 +377,7 @@ class MISPEvent(AbstractMISP):
self.Attribute = []
self.Object = []
self.RelatedEvent = []
self.ShadowAttribute = []
@property
def known_types(self):
@ -330,6 +394,17 @@ class MISPEvent(AbstractMISP):
else:
raise PyMISPError('All the attributes have to be of type MISPAttribute.')
@property
def shadow_attributes(self):
return self.ShadowAttribute
@shadow_attributes.setter
def shadow_attributes(self, shadow_attributes):
if all(isinstance(x, MISPShadowAttribute) for x in shadow_attributes):
self.ShadowAttribute = shadow_attributes
else:
raise PyMISPError('All the attributes have to be of type MISPShadowAttribute.')
@property
def related_events(self):
return self.RelatedEvent
@ -338,10 +413,24 @@ class MISPEvent(AbstractMISP):
def objects(self):
return self.Object
@objects.setter
def objects(self, objects):
if all(isinstance(x, MISPObject) for x in objects):
self.Object = objects
else:
raise PyMISPError('All the attributes have to be of type MISPObject.')
@property
def tags(self):
return self.Tag
@tags.setter
def tags(self, tags):
if all(isinstance(x, MISPTag) for x in tags):
self.Tag = tags
else:
raise PyMISPError('All the attributes have to be of type MISPTag.')
def load_file(self, event_path):
"""Load a JSON dump from a file on the disk"""
if not os.path.exists(event_path):
@ -363,7 +452,9 @@ class MISPEvent(AbstractMISP):
if not event:
raise PyMISPError('Invalid event')
# Invalid event created by MISP up to 2.4.52 (attribute_count is none instead of '0')
if event.get('Event') and event.get('Event').get('attribute_count') is None:
if (event.get('Event') and
'attribute_count' in event.get('Event') and
event.get('Event').get('attribute_count') is None):
event['Event']['attribute_count'] = '0'
jsonschema.validate(event, self.__json_schema)
e = event.get('Event')
@ -426,9 +517,15 @@ class MISPEvent(AbstractMISP):
if kwargs.get('org_id'):
self.org_id = int(kwargs.pop('org_id'))
if kwargs.get('timestamp'):
self.timestamp = datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=int(kwargs.pop('timestamp')))
if sys.version_info >= (3, 3):
self.timestamp = datetime.datetime.fromtimestamp(int(kwargs.pop('timestamp')), datetime.timezone.utc)
else:
self.timestamp = datetime.datetime.fromtimestamp(int(kwargs.pop('timestamp')), UTC())
if kwargs.get('publish_timestamp'):
self.publish_timestamp = datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=int(kwargs.pop('publish_timestamp')))
if sys.version_info >= (3, 3):
self.publish_timestamp = datetime.datetime.fromtimestamp(int(kwargs.pop('publish_timestamp')), datetime.timezone.utc)
else:
self.publish_timestamp = datetime.datetime.fromtimestamp(int(kwargs.pop('publish_timestamp')), UTC())
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
if kwargs.get('RelatedEvent'):
@ -438,10 +535,10 @@ class MISPEvent(AbstractMISP):
self.RelatedEvent.append(sub_event)
if kwargs.get('Tag'):
for tag in kwargs.pop('Tag'):
self.add_tag(**tag)
self.add_tag(tag)
if kwargs.get('Object'):
for obj in kwargs.pop('Object'):
self.add_object(**obj)
self.add_object(obj)
super(MISPEvent, self).from_dict(**kwargs)
@ -449,6 +546,8 @@ class MISPEvent(AbstractMISP):
to_return = super(MISPEvent, self).to_dict()
if to_return.get('date'):
if isinstance(self.date, datetime.datetime):
self.date = self.date.date()
to_return['date'] = self.date.isoformat()
if to_return.get('publish_timestamp'):
to_return['publish_timestamp'] = self._datetime_to_timestamp(self.publish_timestamp)
@ -457,15 +556,40 @@ class MISPEvent(AbstractMISP):
to_return = {'Event': to_return}
return to_return
def add_tag(self, **tag):
def add_proposal(self, shadow_attribute=None, **kwargs):
"""Alias for add_shadow_attribute"""
self.add_shadow_attribute(shadow_attribute, **kwargs)
def add_shadow_attribute(self, shadow_attribute=None, **kwargs):
"""Add a tag to the attribute (by name or a MISPTag object)"""
misp_tag = MISPTag()
if isinstance(tag, str):
misp_tag.from_dict(name=tag)
elif isinstance(tag, dict):
misp_tag.from_dict(**tag)
if isinstance(shadow_attribute, MISPShadowAttribute):
misp_shadow_attribute = shadow_attribute
elif isinstance(shadow_attribute, dict):
misp_shadow_attribute = MISPShadowAttribute()
misp_shadow_attribute.from_dict(**shadow_attribute)
elif kwargs:
misp_shadow_attribute = MISPShadowAttribute()
misp_shadow_attribute.from_dict(**kwargs)
else:
raise PyMISPError("The tag is in an invalid format (can be either string, or list): {}".format(tag))
raise PyMISPError("The shadow_attribute is in an invalid format (can be either string, MISPShadowAttribute, or an expanded dict): {}".format(shadow_attribute))
self.shadow_attributes.append(misp_shadow_attribute)
self.edited = True
def add_tag(self, tag=None, **kwargs):
"""Add a tag to the attribute (by name or a MISPTag object)"""
if isinstance(tag, str):
misp_tag = MISPTag()
misp_tag.from_dict(name=tag)
elif isinstance(tag, MISPTag):
misp_tag = tag
elif isinstance(tag, dict):
misp_tag = MISPTag()
misp_tag.from_dict(**tag)
elif kwargs:
misp_tag = MISPTag()
misp_tag.from_dict(**kwargs)
else:
raise PyMISPError("The tag is in an invalid format (can be either string, MISPTag, or an expanded dict): {}".format(tag))
self.tags.append(misp_tag)
self.edited = True
@ -484,7 +608,7 @@ class MISPEvent(AbstractMISP):
def add_attribute_tag(self, tag, attribute_identifier):
'''Add a tag to an existing attribute, raise an Exception if the attribute doesn't exists.
:tag: Tag name
:tag: Tag name as a string, MISPTag instance, or dictionary
:attribute_identifier: can be an ID, UUID, or the value.
'''
attributes = []
@ -539,16 +663,23 @@ class MISPEvent(AbstractMISP):
return obj
raise InvalidMISPObject('Object with {} does not exists in ths event'.format(object_id))
def add_object(self, **obj):
def add_object(self, obj=None, **kwargs):
"""Add an object to the Event, either by passing a MISPObject, or a dictionary"""
if isinstance(obj, MISPObject):
self.Object.append(obj)
misp_obj = obj
elif isinstance(obj, dict):
tmp_object = MISPObject(obj['name'])
tmp_object.from_dict(**obj)
self.Object.append(tmp_object)
misp_obj = MISPObject(name=obj.pop('name'), strict=obj.pop('strict', False),
default_attributes_parameters=obj.pop('default_attributes_parameters', {}),
**obj)
misp_obj.from_dict(**obj)
elif kwargs:
misp_obj = MISPObject(name=kwargs.pop('name'), strict=kwargs.pop('strict', False),
default_attributes_parameters=kwargs.pop('default_attributes_parameters', {}),
**kwargs)
misp_obj.from_dict(**kwargs)
else:
raise InvalidMISPObject("An object to add to an existing Event needs to be either a MISPObject, or a plain python dictionary")
self.Object.append(misp_obj)
self.edited = True
def __repr__(self):
@ -561,14 +692,14 @@ class MISPEvent(AbstractMISP):
date=self.date, threat_level_id=self.threat_level_id, info=self.info,
uuid=self.uuid, analysis=self.analysis, timestamp=self.timestamp).encode()
def _serialize_sigs(self):
def _serialize_sigs(self): # pragma: no cover
# Not used
all_sigs = self.sig
for a in self.attributes:
all_sigs += a.sig
return all_sigs.encode()
def sign(self, gpg_uid, passphrase=None):
def sign(self, gpg_uid, passphrase=None): # pragma: no cover
# Not used
if not has_pyme:
raise PyMISPError('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
@ -591,7 +722,7 @@ class MISPEvent(AbstractMISP):
signed, _ = c.sign(to_sign_global, mode=mode.DETACH)
self.global_sig = base64.b64encode(signed).decode()
def verify(self, gpg_uid):
def verify(self, gpg_uid): # pragma: no cover
# Not used
if not has_pyme:
raise PyMISPError('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
@ -617,15 +748,15 @@ class MISPEvent(AbstractMISP):
return to_return
@deprecated
def get_known_types(self):
def get_known_types(self): # pragma: no cover
return self.known_types
@deprecated
def set_all_values(self, **kwargs):
def set_all_values(self, **kwargs): # pragma: no cover
self.from_dict(**kwargs)
@deprecated
def _json(self):
def _json(self): # pragma: no cover
return self.to_dict()
@ -712,7 +843,7 @@ class MISPObjectAttribute(MISPAttribute):
self.type = kwargs.pop('type', None)
if self.type is None:
self.type = self.__definition.get('misp-attribute')
self.disable_correlation = kwargs.pop('disable_correlation', None)
self.disable_correlation = kwargs.pop('disable_correlation', False)
if self.disable_correlation is None:
# The correlation can be disabled by default in the object definition.
# Use this value if it isn't overloaded by the object
@ -729,6 +860,12 @@ class MISPObjectAttribute(MISPAttribute):
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
class MISPShadowAttribute(MISPAttribute):
def __init__(self):
super(MISPShadowAttribute, self).__init__()
class MISPObject(AbstractMISP):
def __init__(self, name, strict=False, standalone=False, default_attributes_parameters={}, **kwargs):
@ -769,8 +906,12 @@ class MISPObject(AbstractMISP):
self.__fast_attribute_access = {} # Hashtable object_relation: [attributes]
self.ObjectReference = []
self.Attribute = []
self.Tag = []
self._default_attributes_parameters = default_attributes_parameters
# self.Tag = [] See https://github.com/MISP/PyMISP/issues/168
if isinstance(default_attributes_parameters, MISPAttribute):
# Just make sure we're not modifying an existing MISPAttribute
self._default_attributes_parameters = default_attributes_parameters.to_dict()
else:
self._default_attributes_parameters = default_attributes_parameters
if self._default_attributes_parameters:
# Let's clean that up
self._default_attributes_parameters.pop('value', None) # duh
@ -821,9 +962,11 @@ class MISPObject(AbstractMISP):
if kwargs.get('ObjectReference'):
for r in kwargs.pop('ObjectReference'):
self.add_reference(**r)
if kwargs.get('Tag'):
for tag in kwargs.pop('Tag'):
self.add_tag(**tag)
# Not supported yet - https://github.com/MISP/PyMISP/issues/168
# if kwargs.get('Tag'):
# for tag in kwargs.pop('Tag'):
# self.add_tag(tag)
super(MISPObject, self).from_dict(**kwargs)
@ -841,11 +984,35 @@ class MISPObject(AbstractMISP):
self.ObjectReference.append(reference)
self.edited = True
def add_tag(self, name, **kwargs):
tag = MISPTag()
tag.from_dict(name=name, **kwargs)
self.Tag.append(tag)
self.edited = True
# Not supported yet - https://github.com/MISP/PyMISP/issues/168
# @property
# def tags(self):
# return self.Tag
# @tags.setter
# def tags(self, tags):
# if all(isinstance(x, MISPTag) for x in tags):
# self.Tag = tags
# else:
# raise PyMISPError('All the attributes have to be of type MISPTag.')
# def add_tag(self, tag=None, **kwargs):
# """Add a tag to the attribute (by name or a MISPTag object)"""
# if isinstance(tag, str):
# misp_tag = MISPTag()
# misp_tag.from_dict(name=tag)
# elif isinstance(tag, MISPTag):
# misp_tag = tag
# elif isinstance(tag, dict):
# misp_tag = MISPTag()
# misp_tag.from_dict(**tag)
# elif kwargs:
# misp_tag = MISPTag()
# misp_tag.from_dict(**kwargs)
# else:
# raise PyMISPError("The tag is in an invalid format (can be either string, MISPTag, or an expanded dict): {}".format(tag))
# self.tags.append(misp_tag)
# self.edited = True
def get_attributes_by_relation(self, object_relation):
'''Returns the list of attributes with the given object relation in the object'''

View File

@ -0,0 +1,23 @@
{
"Event": {
"Attribute": [
{
"Tag": [
{
"name": "osint"
}
],
"category": "Payload delivery",
"disable_correlation": false,
"to_ids": true,
"type": "filename",
"value": "bar.exe"
}
],
"analysis": "1",
"date": "2017-12-31",
"distribution": "1",
"info": "This is a test",
"threat_level_id": "1"
}
}

View File

@ -0,0 +1,25 @@
{
"Event": {
"Attribute": [
{
"Tag": [
{
"name": "osint"
}
],
"category": "Payload delivery",
"deleted": true,
"disable_correlation": false,
"id": "42",
"to_ids": true,
"type": "filename",
"value": "bar.exe"
}
],
"analysis": "1",
"date": "2017-12-31",
"distribution": "1",
"info": "This is a test",
"threat_level_id": "1"
}
}

View File

@ -0,0 +1,10 @@
{
"Event": {
"analysis": "1",
"date": "2017-12-31",
"distribution": "1",
"info": "This is a test",
"published": true,
"threat_level_id": "1"
}
}

View File

@ -0,0 +1,59 @@
{
"Event": {
"Object": [
{
"Attribute": [
{
"Tag": [
{
"name": "blah"
}
],
"category": "Payload delivery",
"disable_correlation": false,
"object_relation": "filename",
"to_ids": true,
"type": "filename",
"value": "bar"
}
],
"ObjectReference": [
{
"comment": "foo",
"object_uuid": "a",
"referenced_uuid": "b",
"relationship_type": "baz"
}
],
"description": "File object describing a file with meta-information",
"distribution": 5,
"meta-category": "file",
"name": "file",
"sharing_group_id": 0,
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": 8,
"uuid": "a"
},
{
"Attribute": [
{
"category": "External analysis",
"disable_correlation": false,
"object_relation": "url",
"to_ids": true,
"type": "url",
"value": "https://www.circl.lu"
}
],
"description": "url object describes an url along with its normalized field (like extracted using faup parsing library) and its metadata.",
"distribution": 5,
"meta-category": "network",
"name": "url",
"sharing_group_id": 0,
"template_uuid": "60efb77b-40b5-4c46-871b-ed1ed999fce5",
"template_version": 5,
"uuid": "b"
}
]
}
}

View File

@ -0,0 +1,56 @@
{
"Event": {
"Object": [
{
"Attribute": [
{
"Tag": [
{
"name": "blah"
}
],
"category": "Payload delivery",
"disable_correlation": false,
"object_relation": "filename",
"to_ids": true,
"type": "filename",
"value": "bar"
}
],
"description": "File object describing a file with meta-information",
"distribution": 5,
"meta-category": "file",
"name": "file",
"sharing_group_id": 0,
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": 8,
"uuid": "a"
},
{
"Attribute": [
{
"Tag": [
{
"name": "blah"
}
],
"category": "Payload delivery",
"disable_correlation": false,
"object_relation": "filename",
"to_ids": true,
"type": "filename",
"value": "baz"
}
],
"description": "File object describing a file with meta-information",
"distribution": 5,
"meta-category": "file",
"name": "file",
"sharing_group_id": 0,
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": 8,
"uuid": "b"
}
]
}
}

View File

@ -0,0 +1,31 @@
{
"Event": {
"Object": [
{
"Attribute": [
{
"category": "Payload delivery",
"disable_correlation": false,
"object_relation": "filename",
"to_ids": true,
"type": "filename",
"value": "bar"
}
],
"Tag": [
{
"name": "osint"
}
],
"description": "File object describing a file with meta-information",
"distribution": 5,
"meta-category": "file",
"name": "file",
"sharing_group_id": 0,
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": 8,
"uuid": "a"
}
]
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,21 @@
{
"Event": {
"Attribute": [
{
"category": "Payload delivery",
"data": "ewogICJFdmVudCI6IHsKICB9Cn0K",
"disable_correlation": false,
"encrypt": true,
"malware_filename": "bar.exe",
"to_ids": true,
"type": "malware-sample",
"value": "bar.exe|7637beddacbeac59d44469b2b120b9e6"
}
],
"analysis": "1",
"date": "2017-12-31",
"distribution": "1",
"info": "This is a test",
"threat_level_id": "1"
}
}

View File

@ -0,0 +1,36 @@
{
"Event": {
"Attribute": [
{
"ShadowAttribute": [
{
"category": "Payload delivery",
"disable_correlation": false,
"to_ids": true,
"type": "filename",
"value": "bar.pdf"
}
],
"category": "Payload delivery",
"disable_correlation": false,
"to_ids": true,
"type": "filename",
"value": "bar.exe"
}
],
"ShadowAttribute": [
{
"category": "Payload delivery",
"disable_correlation": false,
"to_ids": true,
"type": "filename",
"value": "baz.jpg"
}
],
"analysis": "1",
"date": "2017-12-31",
"distribution": "1",
"info": "This is a test",
"threat_level_id": "1"
}
}

View File

@ -0,0 +1,149 @@
{
"Event": {
"Attribute": [
{
"ShadowAttribute": [
{
"Org": {
"id": "1",
"name": "CIRCL",
"uuid": "55f6ea5e-2c60-40e5-964f-47a8950d210f"
},
"category": "Artifacts dropped",
"comment": "",
"disable_correlation": false,
"event_id": "6676",
"event_uuid": "5a4cb19a-f550-437f-bd29-48ed950d210f",
"id": "3770",
"old_id": "811578",
"org_id": "1",
"proposal_to_delete": false,
"timestamp": "1514975846",
"to_ids": true,
"type": "filename",
"uuid": "5a4cb1c7-fa84-45fa-8d27-4822950d210f",
"value": "blah.exe.jpg"
}
],
"category": "Artifacts dropped",
"comment": "",
"deleted": false,
"disable_correlation": false,
"distribution": "5",
"event_id": "6676",
"id": "811578",
"object_id": "0",
"sharing_group_id": "0",
"timestamp": "1514975687",
"to_ids": false,
"type": "filename",
"uuid": "5a4cb1c7-fa84-45fa-8d27-4822950d210f",
"value": "blah.exe"
}
],
"Object": [
{
"Attribute": [
{
"ShadowAttribute": [
{
"Org": {
"id": "1",
"name": "CIRCL",
"uuid": "55f6ea5e-2c60-40e5-964f-47a8950d210f"
},
"category": "Payload delivery",
"comment": "",
"disable_correlation": false,
"event_id": "6676",
"event_uuid": "5a4cb19a-f550-437f-bd29-48ed950d210f",
"id": "3771",
"old_id": "811579",
"org_id": "1",
"proposal_to_delete": false,
"timestamp": "1514976196",
"to_ids": true,
"type": "filename",
"uuid": "5a4cb2b8-4748-4c72-96e6-4588950d210f",
"value": "baz.png.exe"
}
],
"category": "Payload delivery",
"comment": "",
"deleted": false,
"disable_correlation": false,
"distribution": "5",
"event_id": "6676",
"id": "811579",
"object_id": "2278",
"object_relation": "filename",
"sharing_group_id": "0",
"timestamp": "1514975928",
"to_ids": true,
"type": "filename",
"uuid": "5a4cb2b8-4748-4c72-96e6-4588950d210f",
"value": "baz.png"
},
{
"category": "Other",
"comment": "",
"deleted": false,
"disable_correlation": true,
"distribution": "5",
"event_id": "6676",
"id": "811580",
"object_id": "2278",
"object_relation": "state",
"sharing_group_id": "0",
"timestamp": "1514975928",
"to_ids": false,
"type": "text",
"uuid": "5a4cb2b9-92b4-4d3a-82df-4e86950d210f",
"value": "Malicious"
}
],
"comment": "",
"deleted": false,
"description": "File object describing a file with meta-information",
"distribution": "5",
"event_id": "6676",
"id": "2278",
"meta-category": "file",
"name": "file",
"sharing_group_id": "0",
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": "7",
"timestamp": "1514975928",
"uuid": "5a4cb2b8-7958-4323-852c-4d2a950d210f"
}
],
"Org": {
"id": "1",
"name": "CIRCL",
"uuid": "55f6ea5e-2c60-40e5-964f-47a8950d210f"
},
"Orgc": {
"id": "1",
"name": "CIRCL",
"uuid": "55f6ea5e-2c60-40e5-964f-47a8950d210f"
},
"analysis": "2",
"attribute_count": "3",
"date": "2018-01-03",
"disable_correlation": false,
"distribution": "0",
"event_creator_email": "raphael.vinot@circl.lu",
"id": "6676",
"info": "Test proposals / ShadowAttributes",
"locked": false,
"org_id": "1",
"orgc_id": "1",
"proposal_email_lock": true,
"publish_timestamp": "0",
"published": false,
"sharing_group_id": "0",
"threat_level_id": "1",
"timestamp": "1514975929",
"uuid": "5a4cb19a-f550-437f-bd29-48ed950d210f"
}
}

View File

@ -0,0 +1,5 @@
{
"timestamp": 11111111,
"type": "bar",
"value": "1"
}

View File

@ -0,0 +1,4 @@
{
"Event": {
}
}

137
tests/test_mispevent.py Normal file
View File

@ -0,0 +1,137 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import unittest
import json
from io import BytesIO
from pymisp import MISPEvent, MISPSighting
class TestMISPEvent(unittest.TestCase):
def setUp(self):
self.maxDiff = None
self.mispevent = MISPEvent()
def init_event(self):
self.mispevent.info = 'This is a test'
self.mispevent.distribution = 1
self.mispevent.threat_level_id = 1
self.mispevent.analysis = 1
self.mispevent.set_date("2017-12-31") # test the set date method
def test_simple(self):
with open('tests/mispevent_testfiles/simple.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_event(self):
self.init_event()
self.mispevent.publish()
with open('tests/mispevent_testfiles/event.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_loadfile(self):
self.mispevent.load_file('tests/mispevent_testfiles/event.json')
with open('tests/mispevent_testfiles/event.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_attribute(self):
self.init_event()
self.mispevent.add_attribute('filename', 'bar.exe')
self.mispevent.add_attribute_tag('osint', 'bar.exe')
attr_tags = self.mispevent.get_attribute_tag('bar.exe')
self.assertEqual(self.mispevent.attributes[0].tags[0].name, 'osint')
self.assertEqual(attr_tags[0].name, 'osint')
with open('tests/mispevent_testfiles/attribute.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
# Fake setting an attribute ID for testing
self.mispevent.attributes[0].id = 42
self.mispevent.delete_attribute(42)
with open('tests/mispevent_testfiles/attribute_del.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_object_tag(self):
self.mispevent.add_object(name='file', strict=True)
self.mispevent.objects[0].add_attribute('filename', value='bar', Tag=[{'name': 'blah'}])
self.assertEqual(self.mispevent.objects[0].attributes[0].tags[0].name, 'blah')
self.assertTrue(self.mispevent.objects[0].has_attributes_by_relation(['filename']))
self.assertEqual(len(self.mispevent.objects[0].get_attributes_by_relation('filename')), 1)
self.mispevent.add_object(name='url', strict=True)
self.mispevent.objects[1].add_attribute('url', value='https://www.circl.lu')
self.mispevent.objects[0].uuid = 'a'
self.mispevent.objects[1].uuid = 'b'
self.mispevent.objects[0].add_reference('b', 'baz', comment='foo')
self.assertEqual(self.mispevent.objects[0].references[0].relationship_type, 'baz')
with open('tests/mispevent_testfiles/event_obj_attr_tag.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
@unittest.skip("Not supported on MISP: https://github.com/MISP/MISP/issues/2638 - https://github.com/MISP/PyMISP/issues/168")
def test_object_level_tag(self):
self.mispevent.add_object(name='file', strict=True)
self.mispevent.objects[0].add_attribute('filename', value='bar')
self.mispevent.objects[0].add_tag('osint')
self.mispevent.objects[0].uuid = 'a'
with open('tests/mispevent_testfiles/event_obj_tag.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_malware(self):
with open('tests/mispevent_testfiles/simple.json', 'rb') as f:
pseudofile = BytesIO(f.read())
self.init_event()
self.mispevent.add_attribute('malware-sample', 'bar.exe', data=pseudofile)
attribute = self.mispevent.attributes[0]
self.assertEqual(attribute.malware_binary, pseudofile)
with open('tests/mispevent_testfiles/malware.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_sighting(self):
sighting = MISPSighting()
sighting.from_dict(value='1', type='bar', timestamp=11111111)
with open('tests/mispevent_testfiles/sighting.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(sighting.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_existing_event(self):
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
with open('tests/mispevent_testfiles/existing_event.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_shadow_attributes_existing(self):
self.mispevent.load_file('tests/mispevent_testfiles/shadow.json')
with open('tests/mispevent_testfiles/shadow.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_shadow_attributes(self):
self.init_event()
self.mispevent.add_proposal(type='filename', value='baz.jpg')
self.mispevent.add_attribute('filename', 'bar.exe')
self.mispevent.attributes[0].add_proposal(type='filename', value='bar.pdf')
with open('tests/mispevent_testfiles/proposals.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_default_attributes(self):
self.mispevent.add_object(name='file', strict=True)
self.mispevent.objects[0].add_attribute('filename', value='bar', Tag=[{'name': 'blah'}])
self.mispevent.add_object(name='file', strict=False, default_attributes_parameters=self.mispevent.objects[0].attributes[0])
self.mispevent.objects[1].add_attribute('filename', value='baz')
self.mispevent.objects[0].uuid = 'a'
self.mispevent.objects[1].uuid = 'b'
with open('tests/mispevent_testfiles/event_obj_def_param.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
if __name__ == '__main__':
unittest.main()