mirror of https://github.com/MISP/PyMISP
fix: Properly use the edited flag
parent
9e9bad731d
commit
efb6ca974c
|
@ -2,6 +2,8 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
|
import sys
|
||||||
|
import datetime
|
||||||
import json
|
import json
|
||||||
from json import JSONEncoder
|
from json import JSONEncoder
|
||||||
import collections
|
import collections
|
||||||
|
@ -29,8 +31,9 @@ class AbstractMISP(collections.MutableMapping):
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
super(AbstractMISP, self).__init__()
|
super(AbstractMISP, self).__init__()
|
||||||
self.edited = True
|
self.__edited = True
|
||||||
|
|
||||||
|
@property
|
||||||
def properties(self):
|
def properties(self):
|
||||||
to_return = []
|
to_return = []
|
||||||
for prop, value in vars(self).items():
|
for prop, value in vars(self).items():
|
||||||
|
@ -45,7 +48,7 @@ class AbstractMISP(collections.MutableMapping):
|
||||||
continue
|
continue
|
||||||
setattr(self, prop, value)
|
setattr(self, prop, value)
|
||||||
# We load an existing dictionary, marking it an not-edited
|
# We load an existing dictionary, marking it an not-edited
|
||||||
self.edited = False
|
self.__edited = False
|
||||||
|
|
||||||
def update_not_jsonable(self, *args):
|
def update_not_jsonable(self, *args):
|
||||||
self.__not_jsonable += args
|
self.__not_jsonable += args
|
||||||
|
@ -59,10 +62,19 @@ class AbstractMISP(collections.MutableMapping):
|
||||||
|
|
||||||
def to_dict(self):
|
def to_dict(self):
|
||||||
to_return = {}
|
to_return = {}
|
||||||
for attribute in self.properties():
|
for attribute in self.properties:
|
||||||
val = getattr(self, attribute, None)
|
val = getattr(self, attribute, None)
|
||||||
if val is None:
|
if val is None:
|
||||||
continue
|
continue
|
||||||
|
if attribute == 'timestamp':
|
||||||
|
if self.edited:
|
||||||
|
# In order to be accepted by MISP, the timestamp of an object
|
||||||
|
# needs to be either newer, or None.
|
||||||
|
# If the current object is marked as edited, the easiest is to
|
||||||
|
# skip the timestamp and let MISP deal with it
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
val = self._datetime_to_timestamp(val)
|
||||||
to_return[attribute] = val
|
to_return[attribute] = val
|
||||||
return to_return
|
return to_return
|
||||||
|
|
||||||
|
@ -93,6 +105,16 @@ class AbstractMISP(collections.MutableMapping):
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def edited(self):
|
def edited(self):
|
||||||
|
if self.__edited:
|
||||||
|
return self.__edited
|
||||||
|
for p in self.properties:
|
||||||
|
if self.__edited:
|
||||||
|
break
|
||||||
|
if isinstance(p, AbstractMISP) and p.edited:
|
||||||
|
self.__edited = True
|
||||||
|
elif isinstance(p, list) and all(isinstance(a, AbstractMISP) for a in p):
|
||||||
|
if any(a.edited for a in p):
|
||||||
|
self.__edited = True
|
||||||
return self.__edited
|
return self.__edited
|
||||||
|
|
||||||
@edited.setter
|
@edited.setter
|
||||||
|
@ -103,6 +125,15 @@ class AbstractMISP(collections.MutableMapping):
|
||||||
raise Exception('edited can only be True or False')
|
raise Exception('edited can only be True or False')
|
||||||
|
|
||||||
def __setattr__(self, name, value):
|
def __setattr__(self, name, value):
|
||||||
if name in self.properties():
|
if name in self.properties:
|
||||||
self.__edited = True
|
self.__edited = True
|
||||||
super(AbstractMISP, self).__setattr__(name, value)
|
super(AbstractMISP, self).__setattr__(name, value)
|
||||||
|
|
||||||
|
def _datetime_to_timestamp(self, d):
|
||||||
|
if isinstance(d, (int, str)):
|
||||||
|
# Assume we already have a timestamp
|
||||||
|
return d
|
||||||
|
if sys.version_info >= (3, 3):
|
||||||
|
return d.timestamp()
|
||||||
|
else:
|
||||||
|
return (d - datetime.datetime.utcfromtimestamp(0)).total_seconds()
|
||||||
|
|
|
@ -65,16 +65,6 @@ def _int_to_str(d):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
def _datetime_to_timestamp(d):
|
|
||||||
if isinstance(d, (int, str)):
|
|
||||||
# Assume we already have a timestamp
|
|
||||||
return d
|
|
||||||
if sys.version_info >= (3, 3):
|
|
||||||
return d.timestamp()
|
|
||||||
else:
|
|
||||||
return (d - datetime.datetime.utcfromtimestamp(0)).total_seconds()
|
|
||||||
|
|
||||||
|
|
||||||
class MISPAttribute(AbstractMISP):
|
class MISPAttribute(AbstractMISP):
|
||||||
|
|
||||||
def __init__(self, describe_types=None):
|
def __init__(self, describe_types=None):
|
||||||
|
@ -90,25 +80,26 @@ class MISPAttribute(AbstractMISP):
|
||||||
self.__sane_default = describe_types['sane_defaults']
|
self.__sane_default = describe_types['sane_defaults']
|
||||||
self.Tag = []
|
self.Tag = []
|
||||||
|
|
||||||
def get_known_types(self):
|
@property
|
||||||
|
def known_types(self):
|
||||||
return self._types
|
return self._types
|
||||||
|
|
||||||
def _serialize(self):
|
@property
|
||||||
return '{type}{category}{to_ids}{uuid}{timestamp}{comment}{deleted}{value}'.format(
|
def malware_binary(self):
|
||||||
type=self.type, category=self.category, to_ids=self.to_ids, uuid=self.uuid, timestamp=self.timestamp,
|
if hasattr(self, '_malware_binary'):
|
||||||
comment=self.comment, deleted=self.deleted, value=self.value).encode()
|
return self._malware_binary
|
||||||
|
return None
|
||||||
|
|
||||||
def sign(self, gpg_uid, passphrase=None):
|
@property
|
||||||
if not has_pyme:
|
def tags(self):
|
||||||
raise PyMISPError('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
|
return self.Tag
|
||||||
to_sign = self._serialize()
|
|
||||||
with gpg.Context() as c:
|
@tags.setter
|
||||||
keys = list(c.keylist(gpg_uid))
|
def tags(self, tags):
|
||||||
c.signers = keys[:1]
|
if all(isinstance(x, MISPTag) for x in tags):
|
||||||
if passphrase:
|
self.Tag = tags
|
||||||
c.set_passphrase_cb(lambda *args: passphrase)
|
else:
|
||||||
signed, _ = c.sign(to_sign, mode=mode.DETACH)
|
raise PyMISPError('All the attributes have to be of type MISPAttribute.')
|
||||||
self.sig = base64.b64encode(signed).decode()
|
|
||||||
|
|
||||||
def delete(self):
|
def delete(self):
|
||||||
self.deleted = True
|
self.deleted = True
|
||||||
|
@ -119,23 +110,10 @@ class MISPAttribute(AbstractMISP):
|
||||||
misp_tag.from_dict(name=tag)
|
misp_tag.from_dict(name=tag)
|
||||||
elif isinstance(tag, dict):
|
elif isinstance(tag, dict):
|
||||||
misp_tag.from_dict(**tag)
|
misp_tag.from_dict(**tag)
|
||||||
self.Tag.append(misp_tag)
|
else:
|
||||||
|
raise PyMISPError("The tag is in an invalid format (can be either string, or list): {}".format(tag))
|
||||||
def verify(self, gpg_uid):
|
self.tags.append(misp_tag)
|
||||||
if not has_pyme:
|
self.edited = True
|
||||||
raise PyMISPError('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
|
|
||||||
signed_data = self._serialize()
|
|
||||||
with gpg.Context() as c:
|
|
||||||
keys = list(c.keylist(gpg_uid))
|
|
||||||
try:
|
|
||||||
c.verify(signed_data, signature=base64.b64decode(self.sig), verify=keys[:1])
|
|
||||||
return {self.uuid: True}
|
|
||||||
except Exception:
|
|
||||||
return {self.uuid: False}
|
|
||||||
|
|
||||||
def set_all_values(self, **kwargs):
|
|
||||||
# to be deprecated
|
|
||||||
self.from_dict(**kwargs)
|
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
if hasattr(self, 'value'):
|
if hasattr(self, 'value'):
|
||||||
|
@ -147,8 +125,8 @@ class MISPAttribute(AbstractMISP):
|
||||||
if kwargs['type'] not in self.__category_type_mapping[kwargs['category']]:
|
if kwargs['type'] not in self.__category_type_mapping[kwargs['category']]:
|
||||||
raise NewAttributeError('{} and {} is an invalid combination, type for this category has to be in {}'.format(
|
raise NewAttributeError('{} and {} is an invalid combination, type for this category has to be in {}'.format(
|
||||||
kwargs.get('type'), kwargs.get('category'), (', '.join(self.__category_type_mapping[kwargs['category']]))))
|
kwargs.get('type'), kwargs.get('category'), (', '.join(self.__category_type_mapping[kwargs['category']]))))
|
||||||
# Required
|
|
||||||
self.type = kwargs.pop('type', None)
|
self.type = kwargs.pop('type', None) # Required
|
||||||
if self.type is None:
|
if self.type is None:
|
||||||
raise NewAttributeError('The type of the attribute is required.')
|
raise NewAttributeError('The type of the attribute is required.')
|
||||||
if self.type not in self.get_known_types():
|
if self.type not in self.get_known_types():
|
||||||
|
@ -255,31 +233,64 @@ class MISPAttribute(AbstractMISP):
|
||||||
# not a encrypted zip file, assuming it is a new malware sample
|
# not a encrypted zip file, assuming it is a new malware sample
|
||||||
self._prepare_new_malware_sample()
|
self._prepare_new_malware_sample()
|
||||||
|
|
||||||
|
def to_dict(self):
|
||||||
|
to_return = super(MISPAttribute, self).to_dict()
|
||||||
|
if to_return.get('data'):
|
||||||
|
to_return['data'] = base64.b64encode(self.data.getvalue()).decode()
|
||||||
|
to_return = _int_to_str(to_return)
|
||||||
|
return to_return
|
||||||
|
|
||||||
|
def verify(self, gpg_uid):
|
||||||
|
# Not used
|
||||||
|
if not has_pyme:
|
||||||
|
raise PyMISPError('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
|
||||||
|
signed_data = self._serialize()
|
||||||
|
with gpg.Context() as c:
|
||||||
|
keys = list(c.keylist(gpg_uid))
|
||||||
|
try:
|
||||||
|
c.verify(signed_data, signature=base64.b64decode(self.sig), verify=keys[:1])
|
||||||
|
return {self.uuid: True}
|
||||||
|
except Exception:
|
||||||
|
return {self.uuid: False}
|
||||||
|
|
||||||
|
def _serialize(self):
|
||||||
|
# Not used
|
||||||
|
return '{type}{category}{to_ids}{uuid}{timestamp}{comment}{deleted}{value}'.format(
|
||||||
|
type=self.type, category=self.category, to_ids=self.to_ids, uuid=self.uuid, timestamp=self.timestamp,
|
||||||
|
comment=self.comment, deleted=self.deleted, value=self.value).encode()
|
||||||
|
|
||||||
|
def sign(self, gpg_uid, passphrase=None):
|
||||||
|
# Not used
|
||||||
|
if not has_pyme:
|
||||||
|
raise PyMISPError('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
|
||||||
|
to_sign = self._serialize()
|
||||||
|
with gpg.Context() as c:
|
||||||
|
keys = list(c.keylist(gpg_uid))
|
||||||
|
c.signers = keys[:1]
|
||||||
|
if passphrase:
|
||||||
|
c.set_passphrase_cb(lambda *args: passphrase)
|
||||||
|
signed, _ = c.sign(to_sign, mode=mode.DETACH)
|
||||||
|
self.sig = base64.b64encode(signed).decode()
|
||||||
|
|
||||||
|
def get_known_types(self):
|
||||||
|
# Deprecated
|
||||||
|
return self.known_types
|
||||||
|
|
||||||
def get_malware_binary(self):
|
def get_malware_binary(self):
|
||||||
if hasattr(self, '_malware_binary'):
|
# Deprecated
|
||||||
return self._malware_binary
|
return self.malware_binary
|
||||||
return None
|
|
||||||
|
|
||||||
def _json(self):
|
def _json(self):
|
||||||
# DEPRECATED
|
# DEPRECATED
|
||||||
return self.to_dict()
|
return self.to_dict()
|
||||||
|
|
||||||
def _json_full(self):
|
def _json_full(self):
|
||||||
# DEPRECATED
|
# Deprecated
|
||||||
return self.to_dict()
|
return self.to_dict()
|
||||||
|
|
||||||
def to_dict(self):
|
def set_all_values(self, **kwargs):
|
||||||
to_return = super(MISPAttribute, self).to_dict()
|
# Deprecated
|
||||||
if to_return.get('data'):
|
self.from_dict(**kwargs)
|
||||||
to_return['data'] = base64.b64encode(self.data.getvalue()).decode()
|
|
||||||
|
|
||||||
if self.edited:
|
|
||||||
to_return.pop('timestamp', None)
|
|
||||||
elif to_return.get('timestamp'):
|
|
||||||
to_return['timestamp'] = _datetime_to_timestamp(self.timestamp)
|
|
||||||
|
|
||||||
to_return = _int_to_str(to_return)
|
|
||||||
return to_return
|
|
||||||
|
|
||||||
|
|
||||||
class MISPEvent(AbstractMISP):
|
class MISPEvent(AbstractMISP):
|
||||||
|
@ -304,66 +315,10 @@ class MISPEvent(AbstractMISP):
|
||||||
self.Object = []
|
self.Object = []
|
||||||
self.RelatedEvent = []
|
self.RelatedEvent = []
|
||||||
|
|
||||||
def get_known_types(self):
|
@property
|
||||||
|
def known_types(self):
|
||||||
return self._types
|
return self._types
|
||||||
|
|
||||||
def _serialize(self):
|
|
||||||
return '{date}{threat_level_id}{info}{uuid}{analysis}{timestamp}'.format(
|
|
||||||
date=self.date, threat_level_id=self.threat_level_id, info=self.info,
|
|
||||||
uuid=self.uuid, analysis=self.analysis, timestamp=self.timestamp).encode()
|
|
||||||
|
|
||||||
def _serialize_sigs(self):
|
|
||||||
all_sigs = self.sig
|
|
||||||
for a in self.attributes:
|
|
||||||
all_sigs += a.sig
|
|
||||||
return all_sigs.encode()
|
|
||||||
|
|
||||||
def sign(self, gpg_uid, passphrase=None):
|
|
||||||
if not has_pyme:
|
|
||||||
raise PyMISPError('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
|
|
||||||
to_sign = self._serialize()
|
|
||||||
with gpg.Context() as c:
|
|
||||||
keys = list(c.keylist(gpg_uid))
|
|
||||||
c.signers = keys[:1]
|
|
||||||
if passphrase:
|
|
||||||
c.set_passphrase_cb(lambda *args: passphrase)
|
|
||||||
signed, _ = c.sign(to_sign, mode=mode.DETACH)
|
|
||||||
self.sig = base64.b64encode(signed).decode()
|
|
||||||
for a in self.attributes:
|
|
||||||
a.sign(gpg_uid, passphrase)
|
|
||||||
to_sign_global = self._serialize_sigs()
|
|
||||||
with gpg.Context() as c:
|
|
||||||
keys = list(c.keylist(gpg_uid))
|
|
||||||
c.signers = keys[:1]
|
|
||||||
if passphrase:
|
|
||||||
c.set_passphrase_cb(lambda *args: passphrase)
|
|
||||||
signed, _ = c.sign(to_sign_global, mode=mode.DETACH)
|
|
||||||
self.global_sig = base64.b64encode(signed).decode()
|
|
||||||
|
|
||||||
def verify(self, gpg_uid):
|
|
||||||
if not has_pyme:
|
|
||||||
raise PyMISPError('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
|
|
||||||
to_return = {}
|
|
||||||
signed_data = self._serialize()
|
|
||||||
with gpg.Context() as c:
|
|
||||||
keys = list(c.keylist(gpg_uid))
|
|
||||||
try:
|
|
||||||
c.verify(signed_data, signature=base64.b64decode(self.sig), verify=keys[:1])
|
|
||||||
to_return[self.uuid] = True
|
|
||||||
except Exception:
|
|
||||||
to_return[self.uuid] = False
|
|
||||||
for a in self.attributes:
|
|
||||||
to_return.update(a.verify(gpg_uid))
|
|
||||||
to_verify_global = self._serialize_sigs()
|
|
||||||
with gpg.Context() as c:
|
|
||||||
keys = list(c.keylist(gpg_uid))
|
|
||||||
try:
|
|
||||||
c.verify(to_verify_global, signature=base64.b64decode(self.global_sig), verify=keys[:1])
|
|
||||||
to_return['global'] = True
|
|
||||||
except Exception:
|
|
||||||
to_return['global'] = False
|
|
||||||
return to_return
|
|
||||||
|
|
||||||
def load_file(self, event_path):
|
def load_file(self, event_path):
|
||||||
if not os.path.exists(event_path):
|
if not os.path.exists(event_path):
|
||||||
raise PyMISPError('Invalid path, unable to load the event.')
|
raise PyMISPError('Invalid path, unable to load the event.')
|
||||||
|
@ -402,10 +357,6 @@ class MISPEvent(AbstractMISP):
|
||||||
else:
|
else:
|
||||||
raise NewEventError('Invalid format for the date: {} - {}'.format(date, type(date)))
|
raise NewEventError('Invalid format for the date: {} - {}'.format(date, type(date)))
|
||||||
|
|
||||||
def set_all_values(self, **kwargs):
|
|
||||||
# to be deprecated
|
|
||||||
self.from_dict(**kwargs)
|
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
if hasattr(self, 'info'):
|
if hasattr(self, 'info'):
|
||||||
return '<{self.__class__.__name__}(info={self.info})'.format(self=self)
|
return '<{self.__class__.__name__}(info={self.info})'.format(self=self)
|
||||||
|
@ -484,31 +435,13 @@ class MISPEvent(AbstractMISP):
|
||||||
|
|
||||||
super(MISPEvent, self).from_dict(**kwargs)
|
super(MISPEvent, self).from_dict(**kwargs)
|
||||||
|
|
||||||
def _json(self):
|
|
||||||
# DEPTECATED
|
|
||||||
return self.to_dict()
|
|
||||||
|
|
||||||
def to_dict(self):
|
def to_dict(self):
|
||||||
for o in self.objects:
|
|
||||||
if o.edited:
|
|
||||||
self.edited = True
|
|
||||||
break
|
|
||||||
for a in self.attributes:
|
|
||||||
if a.edited:
|
|
||||||
self.edited = True
|
|
||||||
break
|
|
||||||
|
|
||||||
to_return = super(MISPEvent, self).to_dict()
|
to_return = super(MISPEvent, self).to_dict()
|
||||||
|
|
||||||
if to_return.get('date'):
|
if to_return.get('date'):
|
||||||
to_return['date'] = self.date.isoformat()
|
to_return['date'] = self.date.isoformat()
|
||||||
if self.edited:
|
|
||||||
to_return.pop('timestamp', None)
|
|
||||||
elif to_return.get('timestamp'):
|
|
||||||
to_return['timestamp'] = _datetime_to_timestamp(self.timestamp)
|
|
||||||
|
|
||||||
if to_return.get('publish_timestamp'):
|
if to_return.get('publish_timestamp'):
|
||||||
to_return['publish_timestamp'] = _datetime_to_timestamp(self.publish_timestamp)
|
to_return['publish_timestamp'] = self._datetime_to_timestamp(self.publish_timestamp)
|
||||||
|
|
||||||
to_return = _int_to_str(to_return)
|
to_return = _int_to_str(to_return)
|
||||||
to_return = {'Event': to_return}
|
to_return = {'Event': to_return}
|
||||||
|
@ -520,7 +453,10 @@ class MISPEvent(AbstractMISP):
|
||||||
misp_tag.from_dict(name=tag)
|
misp_tag.from_dict(name=tag)
|
||||||
elif isinstance(tag, dict):
|
elif isinstance(tag, dict):
|
||||||
misp_tag.from_dict(**tag)
|
misp_tag.from_dict(**tag)
|
||||||
self.Tag.append(misp_tag)
|
else:
|
||||||
|
raise PyMISPError("The tag is in an invalid format (can be either string, or list): {}".format(tag))
|
||||||
|
self.tags.append(misp_tag)
|
||||||
|
self.edited = True
|
||||||
|
|
||||||
def get_attribute_tag(self, attribute_identifier):
|
def get_attribute_tag(self, attribute_identifier):
|
||||||
'''Return the tags associated to an attribute or an object attribute.
|
'''Return the tags associated to an attribute or an object attribute.
|
||||||
|
@ -550,6 +486,7 @@ class MISPEvent(AbstractMISP):
|
||||||
attributes.append(a)
|
attributes.append(a)
|
||||||
if not attributes:
|
if not attributes:
|
||||||
raise Exception('No attribute with identifier {} found.'.format(attribute_identifier))
|
raise Exception('No attribute with identifier {} found.'.format(attribute_identifier))
|
||||||
|
self.edited = True
|
||||||
return attributes
|
return attributes
|
||||||
|
|
||||||
def publish(self):
|
def publish(self):
|
||||||
|
@ -570,15 +507,16 @@ class MISPEvent(AbstractMISP):
|
||||||
raise Exception('No attribute with UUID/ID {} found.'.format(attribute_id))
|
raise Exception('No attribute with UUID/ID {} found.'.format(attribute_id))
|
||||||
|
|
||||||
def add_attribute(self, type, value, **kwargs):
|
def add_attribute(self, type, value, **kwargs):
|
||||||
attribute = MISPAttribute()
|
|
||||||
if isinstance(value, list):
|
if isinstance(value, list):
|
||||||
for a in value:
|
for a in value:
|
||||||
self.add_attribute(type, a, **kwargs)
|
self.add_attribute(type=type, value=a, **kwargs)
|
||||||
else:
|
else:
|
||||||
|
attribute = MISPAttribute()
|
||||||
attribute.set_all_values(type=type, value=value, **kwargs)
|
attribute.set_all_values(type=type, value=value, **kwargs)
|
||||||
if not hasattr(self, 'attributes'):
|
if not hasattr(self, 'attributes'):
|
||||||
self.attributes = []
|
self.attributes = []
|
||||||
self.attributes.append(attribute)
|
self.attributes.append(attribute)
|
||||||
|
self.edited = True
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def attributes(self):
|
def attributes(self):
|
||||||
|
@ -618,6 +556,76 @@ class MISPEvent(AbstractMISP):
|
||||||
self.Object.append(tmp_object)
|
self.Object.append(tmp_object)
|
||||||
else:
|
else:
|
||||||
raise InvalidMISPObject("An object to add to an existing Event needs to be either a MISPObject, or a plain python dictionary")
|
raise InvalidMISPObject("An object to add to an existing Event needs to be either a MISPObject, or a plain python dictionary")
|
||||||
|
self.edited = True
|
||||||
|
|
||||||
|
def _serialize(self):
|
||||||
|
return '{date}{threat_level_id}{info}{uuid}{analysis}{timestamp}'.format(
|
||||||
|
date=self.date, threat_level_id=self.threat_level_id, info=self.info,
|
||||||
|
uuid=self.uuid, analysis=self.analysis, timestamp=self.timestamp).encode()
|
||||||
|
|
||||||
|
def _serialize_sigs(self):
|
||||||
|
all_sigs = self.sig
|
||||||
|
for a in self.attributes:
|
||||||
|
all_sigs += a.sig
|
||||||
|
return all_sigs.encode()
|
||||||
|
|
||||||
|
def sign(self, gpg_uid, passphrase=None):
|
||||||
|
if not has_pyme:
|
||||||
|
raise PyMISPError('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
|
||||||
|
to_sign = self._serialize()
|
||||||
|
with gpg.Context() as c:
|
||||||
|
keys = list(c.keylist(gpg_uid))
|
||||||
|
c.signers = keys[:1]
|
||||||
|
if passphrase:
|
||||||
|
c.set_passphrase_cb(lambda *args: passphrase)
|
||||||
|
signed, _ = c.sign(to_sign, mode=mode.DETACH)
|
||||||
|
self.sig = base64.b64encode(signed).decode()
|
||||||
|
for a in self.attributes:
|
||||||
|
a.sign(gpg_uid, passphrase)
|
||||||
|
to_sign_global = self._serialize_sigs()
|
||||||
|
with gpg.Context() as c:
|
||||||
|
keys = list(c.keylist(gpg_uid))
|
||||||
|
c.signers = keys[:1]
|
||||||
|
if passphrase:
|
||||||
|
c.set_passphrase_cb(lambda *args: passphrase)
|
||||||
|
signed, _ = c.sign(to_sign_global, mode=mode.DETACH)
|
||||||
|
self.global_sig = base64.b64encode(signed).decode()
|
||||||
|
|
||||||
|
def verify(self, gpg_uid):
|
||||||
|
if not has_pyme:
|
||||||
|
raise PyMISPError('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
|
||||||
|
to_return = {}
|
||||||
|
signed_data = self._serialize()
|
||||||
|
with gpg.Context() as c:
|
||||||
|
keys = list(c.keylist(gpg_uid))
|
||||||
|
try:
|
||||||
|
c.verify(signed_data, signature=base64.b64decode(self.sig), verify=keys[:1])
|
||||||
|
to_return[self.uuid] = True
|
||||||
|
except Exception:
|
||||||
|
to_return[self.uuid] = False
|
||||||
|
for a in self.attributes:
|
||||||
|
to_return.update(a.verify(gpg_uid))
|
||||||
|
to_verify_global = self._serialize_sigs()
|
||||||
|
with gpg.Context() as c:
|
||||||
|
keys = list(c.keylist(gpg_uid))
|
||||||
|
try:
|
||||||
|
c.verify(to_verify_global, signature=base64.b64decode(self.global_sig), verify=keys[:1])
|
||||||
|
to_return['global'] = True
|
||||||
|
except Exception:
|
||||||
|
to_return['global'] = False
|
||||||
|
return to_return
|
||||||
|
|
||||||
|
def get_known_types(self):
|
||||||
|
# Deprecated
|
||||||
|
return self.known_types
|
||||||
|
|
||||||
|
def set_all_values(self, **kwargs):
|
||||||
|
# Deprecated
|
||||||
|
self.from_dict(**kwargs)
|
||||||
|
|
||||||
|
def _json(self):
|
||||||
|
# DEPTECATED
|
||||||
|
return self.to_dict()
|
||||||
|
|
||||||
|
|
||||||
class MISPTag(AbstractMISP):
|
class MISPTag(AbstractMISP):
|
||||||
|
@ -734,6 +742,7 @@ class MISPObject(AbstractMISP):
|
||||||
pass
|
pass
|
||||||
self.uuid = str(uuid.uuid4())
|
self.uuid = str(uuid.uuid4())
|
||||||
self.__fast_attribute_access = {} # Hashtable object_relation: [attributes]
|
self.__fast_attribute_access = {} # Hashtable object_relation: [attributes]
|
||||||
|
self.Attribute = []
|
||||||
self._default_attributes_parameters = default_attributes_parameters
|
self._default_attributes_parameters = default_attributes_parameters
|
||||||
if self._default_attributes_parameters:
|
if self._default_attributes_parameters:
|
||||||
# Let's clean that up
|
# Let's clean that up
|
||||||
|
@ -787,20 +796,9 @@ class MISPObject(AbstractMISP):
|
||||||
super(MISPObject, self).from_dict(**kwargs)
|
super(MISPObject, self).from_dict(**kwargs)
|
||||||
|
|
||||||
def to_dict(self, strict=False):
|
def to_dict(self, strict=False):
|
||||||
for a in self.attributes:
|
|
||||||
if a.edited:
|
|
||||||
self.edited = True
|
|
||||||
break
|
|
||||||
if strict or self.__strict and self.__known_template:
|
if strict or self.__strict and self.__known_template:
|
||||||
self._validate()
|
self._validate()
|
||||||
# Set the expected key (Attributes)
|
return super(MISPObject, self).to_dict()
|
||||||
self.Attribute = self.attributes
|
|
||||||
to_return = super(MISPObject, self).to_dict()
|
|
||||||
if self.edited:
|
|
||||||
to_return.pop('timestamp', None)
|
|
||||||
elif to_return.get('timestamp'):
|
|
||||||
to_return['timestamp'] = _datetime_to_timestamp(self.timestamp)
|
|
||||||
return to_return
|
|
||||||
|
|
||||||
def to_json(self, strict=False):
|
def to_json(self, strict=False):
|
||||||
if strict or self.__strict and self.__known_template:
|
if strict or self.__strict and self.__known_template:
|
||||||
|
@ -840,6 +838,7 @@ class MISPObject(AbstractMISP):
|
||||||
reference.from_dict(object_uuid=object_uuid, referenced_uuid=referenced_uuid,
|
reference.from_dict(object_uuid=object_uuid, referenced_uuid=referenced_uuid,
|
||||||
relationship_type=relationship_type, comment=comment, **kwargs)
|
relationship_type=relationship_type, comment=comment, **kwargs)
|
||||||
self.ObjectReference.append(reference)
|
self.ObjectReference.append(reference)
|
||||||
|
self.edited = True
|
||||||
|
|
||||||
def get_attributes_by_relation(self, object_relation):
|
def get_attributes_by_relation(self, object_relation):
|
||||||
'''Returns the list of attributes with the given object relation in the object'''
|
'''Returns the list of attributes with the given object relation in the object'''
|
||||||
|
@ -851,7 +850,7 @@ class MISPObject(AbstractMISP):
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def attributes(self):
|
def attributes(self):
|
||||||
return [a for sublist in self.__fast_attribute_access.values() for a in sublist]
|
return self.Attribute
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def references(self):
|
def references(self):
|
||||||
|
@ -873,5 +872,7 @@ class MISPObject(AbstractMISP):
|
||||||
attribute.from_dict(object_relation=object_relation, **dict(self._default_attributes_parameters, **value))
|
attribute.from_dict(object_relation=object_relation, **dict(self._default_attributes_parameters, **value))
|
||||||
if not self.__fast_attribute_access.get(object_relation):
|
if not self.__fast_attribute_access.get(object_relation):
|
||||||
self.__fast_attribute_access[object_relation] = []
|
self.__fast_attribute_access[object_relation] = []
|
||||||
|
self.Attribute.append(attribute)
|
||||||
self.__fast_attribute_access[object_relation].append(attribute)
|
self.__fast_attribute_access[object_relation].append(attribute)
|
||||||
|
self.edited = True
|
||||||
return attribute
|
return attribute
|
||||||
|
|
Loading…
Reference in New Issue