mirror of https://github.com/MISP/PyMISP
798 lines
32 KiB
Python
798 lines
32 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import datetime
|
|
import time
|
|
import json
|
|
from json import JSONEncoder
|
|
import os
|
|
import base64
|
|
from io import BytesIO
|
|
from zipfile import ZipFile
|
|
import hashlib
|
|
import sys
|
|
import uuid
|
|
from collections import Counter
|
|
|
|
from .abstract import AbstractMISP
|
|
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError
|
|
|
|
|
|
import six # Remove that import when discarding python2 support.
|
|
|
|
if six.PY2:
|
|
import warnings
|
|
warnings.warn("You're using python 2, it is strongly recommended to use python >=3.5")
|
|
|
|
try:
|
|
from dateutil.parser import parse
|
|
except ImportError:
|
|
pass
|
|
|
|
try:
|
|
import jsonschema
|
|
except ImportError:
|
|
pass
|
|
|
|
try:
|
|
# pyme renamed to gpg the 2016-10-28
|
|
import gpg
|
|
from gpg.constants.sig import mode
|
|
has_pyme = True
|
|
except ImportError:
|
|
try:
|
|
# pyme renamed to gpg the 2016-10-28
|
|
import pyme as gpg
|
|
from pyme.constants.sig import mode
|
|
has_pyme = True
|
|
except ImportError:
|
|
has_pyme = False
|
|
|
|
# Least dirty way to support python 2 and 3
|
|
try:
|
|
basestring
|
|
unicode
|
|
except NameError:
|
|
basestring = str
|
|
unicode = str
|
|
|
|
|
|
class MISPAttribute(object):
|
|
|
|
def __init__(self, describe_types=None):
|
|
if not describe_types:
|
|
self.ressources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
|
|
with open(os.path.join(self.ressources_path, 'describeTypes.json'), 'r') as f:
|
|
t = json.load(f)
|
|
describe_types = t['result']
|
|
self.describe_types = describe_types
|
|
self.categories = describe_types['categories']
|
|
self.types = describe_types['types']
|
|
self.category_type_mapping = describe_types['category_type_mappings']
|
|
self.sane_default = describe_types['sane_defaults']
|
|
self._reinitialize_attribute()
|
|
|
|
def _reinitialize_attribute(self):
|
|
# Default values
|
|
self.category = None
|
|
self.type = None
|
|
self.value = None
|
|
self.to_ids = False
|
|
self.comment = ''
|
|
self.distribution = 5
|
|
|
|
# other possible values
|
|
self.data = None
|
|
self.encrypt = False
|
|
self.id = None
|
|
self.uuid = None
|
|
self.timestamp = None
|
|
self.sharing_group_id = None
|
|
self.deleted = None
|
|
self.sig = None
|
|
self.SharingGroup = []
|
|
self.ShadowAttribute = []
|
|
self.disable_correlation = False
|
|
self.RelatedAttribute = []
|
|
self.Tag = []
|
|
|
|
def _serialize(self):
|
|
return '{type}{category}{to_ids}{uuid}{timestamp}{comment}{deleted}{value}'.format(
|
|
type=self.type, category=self.category, to_ids=self.to_ids, uuid=self.uuid, timestamp=self.timestamp,
|
|
comment=self.comment, deleted=self.deleted, value=self.value).encode()
|
|
|
|
def sign(self, gpg_uid, passphrase=None):
|
|
if not has_pyme:
|
|
raise PyMISPError('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
|
|
to_sign = self._serialize()
|
|
with gpg.Context() as c:
|
|
keys = list(c.keylist(gpg_uid))
|
|
c.signers = keys[:1]
|
|
if passphrase:
|
|
c.set_passphrase_cb(lambda *args: passphrase)
|
|
signed, _ = c.sign(to_sign, mode=mode.DETACH)
|
|
self.sig = base64.b64encode(signed).decode()
|
|
|
|
def delete(self):
|
|
self.deleted = True
|
|
|
|
def add_tag(self, tag):
|
|
self.Tag.append({'name': tag})
|
|
|
|
def verify(self, gpg_uid):
|
|
if not has_pyme:
|
|
raise PyMISPError('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
|
|
signed_data = self._serialize()
|
|
with gpg.Context() as c:
|
|
keys = list(c.keylist(gpg_uid))
|
|
try:
|
|
c.verify(signed_data, signature=base64.b64decode(self.sig), verify=keys[:1])
|
|
return {self.uuid: True}
|
|
except:
|
|
return {self.uuid: False}
|
|
|
|
def set_all_values(self, **kwargs):
|
|
# to be deprecated
|
|
self.from_dict(**kwargs)
|
|
|
|
def from_dict(self, **kwargs):
|
|
if kwargs.get('type') and kwargs.get('category'):
|
|
if kwargs['type'] not in self.category_type_mapping[kwargs['category']]:
|
|
raise NewAttributeError('{} and {} is an invalid combination, type for this category has to be in {}'.format(kwargs.get('type'), kwargs.get('category'), (', '.join(self.category_type_mapping[kwargs['category']]))))
|
|
# Required
|
|
if kwargs.get('type'):
|
|
self.type = kwargs['type']
|
|
if self.type not in self.types:
|
|
raise NewAttributeError('{} is invalid, type has to be in {}'.format(self.type, (', '.join(self.types))))
|
|
elif not self.type:
|
|
raise NewAttributeError('The type of the attribute is required.')
|
|
|
|
type_defaults = self.sane_default[self.type]
|
|
|
|
self.value = kwargs.get('value')
|
|
|
|
if self.value is None:
|
|
raise NewAttributeError('The value of the attribute is required.')
|
|
|
|
# Default values
|
|
if kwargs.get('category'):
|
|
self.category = kwargs['category']
|
|
if self.category not in self.categories:
|
|
raise NewAttributeError('{} is invalid, category has to be in {}'.format(self.category, (', '.join(self.categories))))
|
|
else:
|
|
self.category = type_defaults['default_category']
|
|
|
|
self.to_ids = kwargs.get('to_ids')
|
|
if self.to_ids is None:
|
|
self.to_ids = bool(int(type_defaults['to_ids']))
|
|
if not isinstance(self.to_ids, bool):
|
|
raise NewAttributeError('{} is invalid, to_ids has to be True or False'.format(self.to_ids))
|
|
|
|
if kwargs.get('comment'):
|
|
self.comment = kwargs['comment']
|
|
if kwargs.get('distribution') is not None:
|
|
self.distribution = int(kwargs['distribution'])
|
|
if self.distribution not in [0, 1, 2, 3, 4, 5]:
|
|
raise NewAttributeError('{} is invalid, the distribution has to be in 0, 1, 2, 3, 4, 5'.format(self.distribution))
|
|
|
|
# other possible values
|
|
if kwargs.get('data'):
|
|
self.data = kwargs['data']
|
|
self._load_data()
|
|
if kwargs.get('id'):
|
|
self.id = int(kwargs['id'])
|
|
if kwargs.get('uuid'):
|
|
self.uuid = kwargs['uuid']
|
|
if kwargs.get('timestamp'):
|
|
self.timestamp = datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=int(kwargs['timestamp']))
|
|
if kwargs.get('sharing_group_id'):
|
|
self.sharing_group_id = int(kwargs['sharing_group_id'])
|
|
if kwargs.get('deleted'):
|
|
self.deleted = kwargs['deleted']
|
|
if kwargs.get('SharingGroup'):
|
|
self.SharingGroup = kwargs['SharingGroup']
|
|
if kwargs.get('ShadowAttribute'):
|
|
self.ShadowAttribute = kwargs['ShadowAttribute']
|
|
if kwargs.get('sig'):
|
|
self.sig = kwargs['sig']
|
|
if kwargs.get('Tag'):
|
|
self.Tag = [t for t in kwargs['Tag'] if t]
|
|
|
|
# If the user wants to disable correlation, let them. Defaults to False.
|
|
self.disable_correlation = kwargs.get("disable_correlation", False)
|
|
if self.disable_correlation is None:
|
|
self.disable_correlation = False
|
|
|
|
def _prepare_new_malware_sample(self):
|
|
if '|' in self.value:
|
|
# Get the filename, ignore the md5, because humans.
|
|
self.malware_filename, md5 = self.value.split('|')
|
|
else:
|
|
# Assuming the user only passed the filename
|
|
self.malware_filename = self.value
|
|
m = hashlib.md5()
|
|
m.update(self.data.getvalue())
|
|
self.value = self.malware_filename
|
|
self.malware_binary = self.data
|
|
self.encrypt = True
|
|
|
|
def _load_data(self):
|
|
if not isinstance(self.data, BytesIO):
|
|
self.data = BytesIO(base64.b64decode(self.data))
|
|
if self.type == 'malware-sample':
|
|
try:
|
|
with ZipFile(self.data) as f:
|
|
for name in f.namelist():
|
|
if name.endswith('.txt'):
|
|
with f.open(name, pwd=b'infected') as unpacked:
|
|
self.malware_filename = unpacked.read().decode()
|
|
else:
|
|
with f.open(name, pwd=b'infected') as unpacked:
|
|
self.malware_binary = BytesIO(unpacked.read())
|
|
except:
|
|
# not a encrypted zip file, assuming it is a new malware sample
|
|
self._prepare_new_malware_sample()
|
|
|
|
def _json(self):
|
|
# DEPRECATED
|
|
return self.to_dict()
|
|
|
|
def to_dict(self):
|
|
to_return = {'type': self.type, 'category': self.category, 'to_ids': self.to_ids,
|
|
'distribution': self.distribution, 'value': self.value,
|
|
'comment': self.comment, 'disable_correlation': self.disable_correlation}
|
|
if self.uuid:
|
|
to_return['uuid'] = self.uuid
|
|
if self.sig:
|
|
to_return['sig'] = self.sig
|
|
if self.sharing_group_id:
|
|
to_return['sharing_group_id'] = self.sharing_group_id
|
|
if self.Tag:
|
|
to_return['Tag'] = self.Tag
|
|
if self.data:
|
|
to_return['data'] = base64.b64encode(self.data.getvalue()).decode()
|
|
if self.encrypt:
|
|
to_return['encrypt'] = self.encrypt
|
|
to_return = _int_to_str(to_return)
|
|
return to_return
|
|
|
|
def _json_full(self):
|
|
to_return = self._json()
|
|
if self.id:
|
|
to_return['id'] = self.id
|
|
if self.timestamp:
|
|
# Should never be set on an update, MISP will automatically set it to now
|
|
to_return['timestamp'] = int(time.mktime(self.timestamp.timetuple()))
|
|
if self.deleted is not None:
|
|
to_return['deleted'] = self.deleted
|
|
if self.ShadowAttribute:
|
|
to_return['ShadowAttribute'] = self.ShadowAttribute
|
|
if self.SharingGroup:
|
|
to_return['SharingGroup'] = self.SharingGroup
|
|
to_return = _int_to_str(to_return)
|
|
return to_return
|
|
|
|
|
|
class EncodeUpdate(JSONEncoder):
|
|
def default(self, obj):
|
|
try:
|
|
return obj._json()
|
|
except AttributeError:
|
|
return JSONEncoder.default(self, obj)
|
|
|
|
|
|
class EncodeFull(JSONEncoder):
|
|
def default(self, obj):
|
|
try:
|
|
return obj._json_full()
|
|
except AttributeError:
|
|
return JSONEncoder.default(self, obj)
|
|
|
|
|
|
def _int_to_str(d):
|
|
# transform all integer back to string
|
|
for k, v in d.items():
|
|
if isinstance(v, (int, float)) and not isinstance(v, bool):
|
|
d[k] = str(v)
|
|
return d
|
|
|
|
|
|
class MISPEvent(object):
|
|
|
|
def __init__(self, describe_types=None, strict_validation=False):
|
|
self.ressources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
|
|
if strict_validation:
|
|
with open(os.path.join(self.ressources_path, 'schema.json'), 'r') as f:
|
|
self.json_schema = json.load(f)
|
|
else:
|
|
with open(os.path.join(self.ressources_path, 'schema-lax.json'), 'r') as f:
|
|
self.json_schema = json.load(f)
|
|
if not describe_types:
|
|
with open(os.path.join(self.ressources_path, 'describeTypes.json'), 'r') as f:
|
|
t = json.load(f)
|
|
describe_types = t['result']
|
|
self.describe_types = describe_types
|
|
self.categories = describe_types['categories']
|
|
self.types = describe_types['types']
|
|
self.category_type_mapping = describe_types['category_type_mappings']
|
|
self.sane_default = describe_types['sane_defaults']
|
|
self.new = True
|
|
self.dump_full = False
|
|
|
|
self._reinitialize_event()
|
|
|
|
def _reinitialize_event(self):
|
|
# Default values for a valid event to send to a MISP instance
|
|
self.distribution = 3
|
|
self.threat_level_id = 2
|
|
self.analysis = 0
|
|
self.info = None
|
|
self.published = False
|
|
self.date = datetime.date.today()
|
|
self.attributes = []
|
|
|
|
# All other keys
|
|
self.sig = None
|
|
self.global_sig = None
|
|
self.id = None
|
|
self.orgc_id = None
|
|
self.org_id = None
|
|
self.uuid = None
|
|
self.attribute_count = None
|
|
self.timestamp = None
|
|
self.proposal_email_lock = None
|
|
self.locked = None
|
|
self.publish_timestamp = None
|
|
self.sharing_group_id = None
|
|
self.Org = None
|
|
self.Orgc = None
|
|
self.ShadowAttribute = []
|
|
self.RelatedEvent = []
|
|
self.Tag = []
|
|
self.Galaxy = None
|
|
self.Object = None
|
|
|
|
def _serialize(self):
|
|
return '{date}{threat_level_id}{info}{uuid}{analysis}{timestamp}'.format(
|
|
date=self.date, threat_level_id=self.threat_level_id, info=self.info,
|
|
uuid=self.uuid, analysis=self.analysis, timestamp=self.timestamp).encode()
|
|
|
|
def _serialize_sigs(self):
|
|
all_sigs = self.sig
|
|
for a in self.attributes:
|
|
all_sigs += a.sig
|
|
return all_sigs.encode()
|
|
|
|
def sign(self, gpg_uid, passphrase=None):
|
|
if not has_pyme:
|
|
raise PyMISPError('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
|
|
to_sign = self._serialize()
|
|
with gpg.Context() as c:
|
|
keys = list(c.keylist(gpg_uid))
|
|
c.signers = keys[:1]
|
|
if passphrase:
|
|
c.set_passphrase_cb(lambda *args: passphrase)
|
|
signed, _ = c.sign(to_sign, mode=mode.DETACH)
|
|
self.sig = base64.b64encode(signed).decode()
|
|
for a in self.attributes:
|
|
a.sign(gpg_uid, passphrase)
|
|
to_sign_global = self._serialize_sigs()
|
|
with gpg.Context() as c:
|
|
keys = list(c.keylist(gpg_uid))
|
|
c.signers = keys[:1]
|
|
if passphrase:
|
|
c.set_passphrase_cb(lambda *args: passphrase)
|
|
signed, _ = c.sign(to_sign_global, mode=mode.DETACH)
|
|
self.global_sig = base64.b64encode(signed).decode()
|
|
|
|
def verify(self, gpg_uid):
|
|
if not has_pyme:
|
|
raise PyMISPError('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
|
|
to_return = {}
|
|
signed_data = self._serialize()
|
|
with gpg.Context() as c:
|
|
keys = list(c.keylist(gpg_uid))
|
|
try:
|
|
c.verify(signed_data, signature=base64.b64decode(self.sig), verify=keys[:1])
|
|
to_return[self.uuid] = True
|
|
except:
|
|
to_return[self.uuid] = False
|
|
for a in self.attributes:
|
|
to_return.update(a.verify(gpg_uid))
|
|
to_verify_global = self._serialize_sigs()
|
|
with gpg.Context() as c:
|
|
keys = list(c.keylist(gpg_uid))
|
|
try:
|
|
c.verify(to_verify_global, signature=base64.b64decode(self.global_sig), verify=keys[:1])
|
|
to_return['global'] = True
|
|
except:
|
|
to_return['global'] = False
|
|
return to_return
|
|
|
|
def load_file(self, event_path):
|
|
if not os.path.exists(event_path):
|
|
raise PyMISPError('Invalid path, unable to load the event.')
|
|
with open(event_path, 'r') as f:
|
|
self.load(f)
|
|
|
|
def load(self, json_event):
|
|
self.new = False
|
|
self.dump_full = True
|
|
if hasattr(json_event, 'read'):
|
|
# python2 and python3 compatible to find if we have a file
|
|
json_event = json_event.read()
|
|
if isinstance(json_event, basestring):
|
|
json_event = json.loads(json_event)
|
|
if json_event.get('response'):
|
|
event = json_event.get('response')[0]
|
|
else:
|
|
event = json_event
|
|
if not event:
|
|
raise PyMISPError('Invalid event')
|
|
# Invalid event created by MISP up to 2.4.52 (attribute_count is none instead of '0')
|
|
if event.get('Event') and event.get('Event').get('attribute_count') is None:
|
|
event['Event']['attribute_count'] = '0'
|
|
jsonschema.validate(event, self.json_schema)
|
|
e = event.get('Event')
|
|
self._reinitialize_event()
|
|
self.set_all_values(**e)
|
|
|
|
def set_date(self, date, ignore_invalid=False):
|
|
if isinstance(date, basestring) or isinstance(date, unicode):
|
|
self.date = parse(date).date()
|
|
elif isinstance(date, datetime.datetime):
|
|
self.date = date.date()
|
|
elif isinstance(date, datetime.date):
|
|
self.date = date
|
|
else:
|
|
if ignore_invalid:
|
|
self.date = datetime.date.today()
|
|
else:
|
|
raise NewEventError('Invalid format for the date: {} - {}'.format(date, type(date)))
|
|
|
|
def set_all_values(self, **kwargs):
|
|
# Required value
|
|
if kwargs.get('info'):
|
|
self.info = kwargs['info']
|
|
elif not self.info:
|
|
raise NewAttributeError('The info field of the new event is required.')
|
|
|
|
# Default values for a valid event to send to a MISP instance
|
|
if kwargs.get('distribution') is not None:
|
|
self.distribution = int(kwargs['distribution'])
|
|
if self.distribution not in [0, 1, 2, 3, 4]:
|
|
raise NewEventError('{} is invalid, the distribution has to be in 0, 1, 2, 3, 4'.format(self.distribution))
|
|
if kwargs.get('threat_level_id') is not None:
|
|
self.threat_level_id = int(kwargs['threat_level_id'])
|
|
if self.threat_level_id not in [1, 2, 3, 4]:
|
|
raise NewEventError('{} is invalid, the threat_level has to be in 1, 2, 3, 4'.format(self.threat_level_id))
|
|
if kwargs.get('analysis') is not None:
|
|
self.analysis = int(kwargs['analysis'])
|
|
if self.analysis not in [0, 1, 2]:
|
|
raise NewEventError('{} is invalid, the analysis has to be in 0, 1, 2'.format(self.analysis))
|
|
if kwargs.get('published') is not None:
|
|
self.unpublish()
|
|
if kwargs.get("published") is True:
|
|
self.publish()
|
|
if kwargs.get('date'):
|
|
self.set_date(kwargs['date'])
|
|
if kwargs.get('Attribute'):
|
|
for a in kwargs['Attribute']:
|
|
attribute = MISPAttribute(self.describe_types)
|
|
attribute.set_all_values(**a)
|
|
self.attributes.append(attribute)
|
|
|
|
# All other keys
|
|
if kwargs.get('id'):
|
|
self.id = int(kwargs['id'])
|
|
if kwargs.get('orgc_id'):
|
|
self.orgc_id = int(kwargs['orgc_id'])
|
|
if kwargs.get('org_id'):
|
|
self.org_id = int(kwargs['org_id'])
|
|
if kwargs.get('uuid'):
|
|
self.uuid = kwargs['uuid']
|
|
if kwargs.get('attribute_count'):
|
|
self.attribute_count = int(kwargs['attribute_count'])
|
|
if kwargs.get('timestamp'):
|
|
self.timestamp = datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=int(kwargs['timestamp']))
|
|
if kwargs.get('proposal_email_lock'):
|
|
self.proposal_email_lock = kwargs['proposal_email_lock']
|
|
if kwargs.get('locked'):
|
|
self.locked = kwargs['locked']
|
|
if kwargs.get('publish_timestamp'):
|
|
self.publish_timestamp = datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=int(kwargs['publish_timestamp']))
|
|
if kwargs.get('sharing_group_id'):
|
|
self.sharing_group_id = int(kwargs['sharing_group_id'])
|
|
if kwargs.get('Org'):
|
|
self.Org = kwargs['Org']
|
|
if kwargs.get('Orgc'):
|
|
self.Orgc = kwargs['Orgc']
|
|
if kwargs.get('ShadowAttribute'):
|
|
self.ShadowAttribute = kwargs['ShadowAttribute']
|
|
if kwargs.get('RelatedEvent'):
|
|
self.RelatedEvent = []
|
|
for rel_event in kwargs['RelatedEvent']:
|
|
sub_event = MISPEvent()
|
|
sub_event.load(rel_event)
|
|
self.RelatedEvent.append(sub_event)
|
|
if kwargs.get('Galaxy'):
|
|
self.Galaxy = kwargs['Galaxy']
|
|
if kwargs.get('Tag'):
|
|
self.Tag = [t for t in kwargs['Tag'] if t]
|
|
if kwargs.get('sig'):
|
|
self.sig = kwargs['sig']
|
|
if kwargs.get('global_sig'):
|
|
self.global_sig = kwargs['global_sig']
|
|
if kwargs.get('Object'):
|
|
self.Object = []
|
|
for obj in kwargs['Object']:
|
|
tmp_object = MISPObject(obj['name'])
|
|
tmp_object.from_dict(**obj)
|
|
self.Object.append(tmp_object)
|
|
|
|
def _json(self):
|
|
# DEPTECATED
|
|
return self.to_dict()
|
|
|
|
def to_dict(self):
|
|
to_return = {'Event': {}}
|
|
to_return['Event'] = {'distribution': self.distribution, 'info': self.info,
|
|
'date': self.date.isoformat(), 'published': self.published,
|
|
'threat_level_id': self.threat_level_id,
|
|
'analysis': self.analysis, 'Attribute': []}
|
|
if self.sig:
|
|
to_return['Event']['sig'] = self.sig
|
|
if self.global_sig:
|
|
to_return['Event']['global_sig'] = self.global_sig
|
|
if self.uuid:
|
|
to_return['Event']['uuid'] = self.uuid
|
|
if self.Tag:
|
|
to_return['Event']['Tag'] = self.Tag
|
|
if self.Orgc:
|
|
to_return['Event']['Orgc'] = self.Orgc
|
|
if self.Galaxy:
|
|
to_return['Event']['Galaxy'] = self.Galaxy
|
|
if self.sharing_group_id:
|
|
to_return['Event']['sharing_group_id'] = self.sharing_group_id
|
|
to_return['Event'] = _int_to_str(to_return['Event'])
|
|
if self.attributes:
|
|
to_return['Event']['Attribute'] = [a._json() for a in self.attributes]
|
|
jsonschema.validate(to_return, self.json_schema)
|
|
return to_return
|
|
|
|
def _json_full(self):
|
|
to_return = self._json()
|
|
if self.id:
|
|
to_return['Event']['id'] = self.id
|
|
if self.orgc_id:
|
|
to_return['Event']['orgc_id'] = self.orgc_id
|
|
if self.org_id:
|
|
to_return['Event']['org_id'] = self.org_id
|
|
if self.locked is not None:
|
|
to_return['Event']['locked'] = self.locked
|
|
if self.attribute_count is not None:
|
|
to_return['Event']['attribute_count'] = self.attribute_count
|
|
if self.RelatedEvent:
|
|
to_return['Event']['RelatedEvent'] = []
|
|
for rel_event in self.RelatedEvent:
|
|
to_return['Event']['RelatedEvent'].append(rel_event._json_full())
|
|
if self.Org:
|
|
to_return['Event']['Org'] = self.Org
|
|
if self.sharing_group_id:
|
|
to_return['Event']['sharing_group_id'] = self.sharing_group_id
|
|
if self.ShadowAttribute:
|
|
to_return['Event']['ShadowAttribute'] = self.ShadowAttribute
|
|
if self.proposal_email_lock is not None:
|
|
to_return['Event']['proposal_email_lock'] = self.proposal_email_lock
|
|
if self.locked is not None:
|
|
to_return['Event']['locked'] = self.locked
|
|
if self.publish_timestamp:
|
|
to_return['Event']['publish_timestamp'] = int(time.mktime(self.publish_timestamp.timetuple()))
|
|
if self.timestamp:
|
|
# Should never be set on an update, MISP will automatically set it to now
|
|
to_return['Event']['timestamp'] = int(time.mktime(self.timestamp.timetuple()))
|
|
to_return['Event'] = _int_to_str(to_return['Event'])
|
|
if self.attributes:
|
|
to_return['Event']['Attribute'] = [a._json_full() for a in self.attributes]
|
|
jsonschema.validate(to_return, self.json_schema)
|
|
return to_return
|
|
|
|
def add_tag(self, tag):
|
|
self.Tag.append({'name': tag})
|
|
|
|
def add_attribute_tag(self, tag, attribute_identifier):
|
|
attribute = None
|
|
for a in self.attributes:
|
|
if a.id == attribute_identifier or a.uuid == attribute_identifier or attribute_identifier in a.value:
|
|
a.add_tag(tag)
|
|
attribute = a
|
|
if not attribute:
|
|
raise Exception('No attribute with identifier {} found.'.format(attribute_identifier))
|
|
return attribute
|
|
|
|
def publish(self):
|
|
self.published = True
|
|
|
|
def unpublish(self):
|
|
self.published = False
|
|
|
|
def delete_attribute(self, attribute_id):
|
|
found = False
|
|
for a in self.attributes:
|
|
if a.id == attribute_id or a.uuid == attribute_id:
|
|
a.delete()
|
|
found = True
|
|
break
|
|
if not found:
|
|
raise Exception('No attribute with UUID/ID {} found.'.format(attribute_id))
|
|
|
|
def add_attribute(self, type, value, **kwargs):
|
|
attribute = MISPAttribute(self.describe_types)
|
|
if isinstance(value, list):
|
|
for a in value:
|
|
self.add_attribute(type, a, **kwargs)
|
|
else:
|
|
attribute.set_all_values(type=type, value=value, **kwargs)
|
|
self.attributes.append(attribute)
|
|
|
|
|
|
class MISPObjectReference(AbstractMISP):
|
|
|
|
attributes = ['source_uuid', 'destination_uuid', 'relationship_type', 'comment', 'uuid', 'deleted']
|
|
|
|
def __init__(self):
|
|
super(MISPObjectReference, self).__init__()
|
|
|
|
def from_dict(self, source_uuid, destination_uuid, relationship_type, comment=None, **kwargs):
|
|
self.source_uuid = source_uuid
|
|
self.destination_uuid = destination_uuid
|
|
self.relationship_type = relationship_type
|
|
self.comment = comment
|
|
for k, v in kwargs:
|
|
setattr(self, k, v)
|
|
|
|
|
|
class MISPObjectAttribute(MISPAttribute, AbstractMISP):
|
|
|
|
# This list is very limited and hardcoded to fit the current needs (file/pe/pesection creation): MISPAttriute will follow the
|
|
# same spec and just add one attribute: object_relation
|
|
attributes = ['object_relation', 'value', 'type', 'category', 'disable_correlation', 'to_ids',
|
|
'data', 'encrypt', 'distribution', 'comment', 'uuid', 'event_id']
|
|
|
|
def __init__(self, definition):
|
|
MISPAttribute.__init__(self)
|
|
AbstractMISP.__init__(self)
|
|
self.definition = definition
|
|
|
|
def from_dict(self, object_relation, value, **kwargs):
|
|
self.object_relation = object_relation
|
|
self.value = value
|
|
# Initialize the new MISPAttribute
|
|
# Get the misp attribute type from the definition
|
|
self.type = kwargs.pop('type', None)
|
|
if self.type is None:
|
|
self.type = self.definition.get('misp-attribute')
|
|
self.disable_correlation = kwargs.pop('disable_correlation', None)
|
|
if self.disable_correlation is None:
|
|
# The correlation can be disabled by default in the object definition.
|
|
# Use this value if it isn't overloaded by the object
|
|
self.disable_correlation = self.definition.get('disable_correlation')
|
|
self.to_ids = kwargs.pop('to_ids', None)
|
|
if self.to_ids is None:
|
|
# Same for the to_ids flag
|
|
self.to_ids = self.definition.get('to_ids')
|
|
# FIXME: dirty hack until all the classes are ported to the new format but we get the default values
|
|
kwargs.update(**self)
|
|
MISPAttribute.from_dict(self, **kwargs)
|
|
|
|
|
|
class MISPObject(AbstractMISP):
|
|
|
|
attributes = ['name', 'meta-category', 'uuid', 'description', 'template_version', 'template_uuid', 'Attribute']
|
|
|
|
def __init__(self, name, strict=True):
|
|
super(MISPObject, self).__init__()
|
|
self.strict = strict
|
|
self.name = name
|
|
self.misp_objects_path = os.path.join(
|
|
os.path.abspath(os.path.dirname(sys.modules['pymisp'].__file__)),
|
|
'data', 'misp-objects', 'objects')
|
|
if os.path.exists(os.path.join(self.misp_objects_path, self.name, 'definition.json')):
|
|
self.known_template = True
|
|
else:
|
|
if self.strict:
|
|
raise UnknownMISPObjectTemplate('{} is unknown in the MISP object directory.')
|
|
else:
|
|
self.known_template = False
|
|
if self.known_template:
|
|
with open(os.path.join(self.misp_objects_path, self.name, 'definition.json'), 'r') as f:
|
|
self.definition = json.load(f)
|
|
setattr(self, 'meta-category', self.definition['meta-category'])
|
|
self.template_uuid = self.definition['uuid']
|
|
self.description = self.definition['description']
|
|
self.template_version = self.definition['version']
|
|
else:
|
|
# FIXME We need to set something for meta-category, template_uuid, description and template_version
|
|
pass
|
|
self.uuid = str(uuid.uuid4())
|
|
self.Attribute = []
|
|
self.ObjectReference = []
|
|
|
|
def from_dict(self, **kwargs):
|
|
if self.known_template:
|
|
if kwargs.get('template_uuid') and kwargs['template_uuid'] != self.template_uuid:
|
|
if self.strict:
|
|
raise UnknownMISPObjectTemplate('UUID of the object is different from the one of the template.')
|
|
else:
|
|
self.known_template = False
|
|
if kwargs.get('template_version') and int(kwargs['template_version']) != self.template_version:
|
|
if self.strict:
|
|
raise UnknownMISPObjectTemplate('Version of the object ({}) is different from the one of the template ({}).'.format(kwargs['template_version'], self.template_version))
|
|
else:
|
|
self.known_template = False
|
|
|
|
for key, value in kwargs.items():
|
|
if key == 'Attribute':
|
|
for v in value:
|
|
self.add_attribute(**v)
|
|
elif key == 'ObjectReference':
|
|
for v in value:
|
|
self.add_reference(**v)
|
|
else:
|
|
setattr(self, key, value)
|
|
|
|
def to_dict(self, strict=True):
|
|
if strict or self.strict and self.known_template:
|
|
self._validate()
|
|
return super(MISPObject, self).to_dict()
|
|
|
|
def to_json(self, strict=True):
|
|
if strict or self.strict and self.known_template:
|
|
self._validate()
|
|
return super(MISPObject, self).to_json()
|
|
|
|
def _validate(self):
|
|
"""Make sure the object we're creating has the required fields"""
|
|
all_object_relations = []
|
|
for a in self.Attribute:
|
|
all_object_relations.append(a.object_relation)
|
|
count_relations = dict(Counter(all_object_relations))
|
|
for key, counter in count_relations.items():
|
|
if counter == 1:
|
|
continue
|
|
if not self.definition['attributes'][key].get('multiple'):
|
|
raise InvalidMISPObject('Multiple occurrences of {} is not allowed'.format(key))
|
|
all_attribute_names = set(count_relations.keys())
|
|
if self.definition.get('requiredOneOf'):
|
|
if not set(self.definition['requiredOneOf']) & all_attribute_names:
|
|
raise InvalidMISPObject('At least one of the following attributes is required: {}'.format(', '.join(self.definition['requiredOneOf'])))
|
|
if self.definition.get('required'):
|
|
for r in self.definition.get('required'):
|
|
if r not in all_attribute_names:
|
|
raise InvalidMISPObject('{} is required'.format(r))
|
|
return True
|
|
|
|
def add_reference(self, destination_uuid, relationship_type, comment=None, **kwargs):
|
|
"""Add a link (uuid) to an other object"""
|
|
if kwargs.get('source_uuid'):
|
|
# Load existing object
|
|
source_uuid = kwargs.get('source_uuid')
|
|
else:
|
|
# New reference
|
|
source_uuid = self.uuid
|
|
reference = MISPObjectReference()
|
|
reference.from_dict(source_uuid=source_uuid, destination_uuid=destination_uuid,
|
|
relationship_type=relationship_type, comment=comment, **kwargs)
|
|
self.ObjectReference.append(reference)
|
|
|
|
def add_attribute(self, object_relation, **value):
|
|
if value.get('value') is None:
|
|
return None
|
|
if self.known_template:
|
|
attribute = MISPObjectAttribute(self.definition['attributes'][object_relation])
|
|
else:
|
|
attribute = MISPObjectAttribute({})
|
|
attribute.from_dict(object_relation, **value)
|
|
self.Attribute.append(attribute)
|
|
return attribute
|