Add signing support for MISP events

pull/30/head
Raphaël Vinot 2016-11-17 17:07:29 +01:00
parent fba21ac051
commit 35a4dd52bc
2 changed files with 117 additions and 3 deletions

19
examples/test_sign.py Executable file
View File

@ -0,0 +1,19 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
from pymisp import mispevent
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Sign & verify a MISP event.')
parser.add_argument("-i", "--input", required=True, help="Json file")
parser.add_argument("-u", "--uid", required=True, help="GPG UID")
args = parser.parse_args()
me = mispevent.MISPEvent()
me.load(args.input)
me.sign(args.uid)
me.verify(args.uid)

View File

@ -7,6 +7,7 @@ import json
from json import JSONEncoder
import os
import warnings
import base64
try:
from dateutil.parser import parse
except ImportError:
@ -17,6 +18,19 @@ try:
except ImportError:
pass
try:
# pyme renamed to gpg the 2016-10-28
import gpg
from gpg.constants.sig import mode
has_pyme = True
except ImportError:
# pyme renamed to gpg the 2016-10-28
import pyme as gpg
from pyme.constants.sig import mode
has_pyme = True
except ImportError:
has_pyme = False
from .exceptions import PyMISPError, NewEventError, NewAttributeError
# Least dirty way to support python 2 and 3
@ -53,9 +67,33 @@ class MISPAttribute(object):
self.timestamp = None
self.sharing_group_id = None
self.deleted = None
self.sig = None
self.SharingGroup = []
self.ShadowAttribute = []
def _serialize(self):
return '{type}{category}{to_ids}{uuid}{timestamp}{comment}{deleted}{value}'.format(
type=self.type, category=self.category, to_ids=self.to_ids, uuid=self.uuid, timestamp=self.timestamp,
comment=self.comment, deleted=self.deleted, value=self.value).encode()
def sign(self, gpg_uid):
if not has_pyme:
raise Exception('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
to_sign = self._serialize()
with gpg.Context() as c:
keys = list(c.keylist(gpg_uid))
c.signers = keys[:1]
signed, _ = c.sign(to_sign, mode=mode.DETACH)
self.sig = base64.b64encode(signed).decode()
def verify(self, gpg_uid):
if not has_pyme:
raise Exception('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
signed_data = self._serialize()
with gpg.Context() as c:
keys = list(c.keylist(gpg_uid))
c.verify(signed_data, signature=base64.b64decode(self.sig), verify=keys[:1])
def set_all_values(self, **kwargs):
if kwargs.get('type') and kwargs.get('category'):
if kwargs['type'] not in self.category_type_mapping[kwargs['category']]:
@ -65,14 +103,14 @@ class MISPAttribute(object):
self.type = kwargs['type']
if self.type not in self.types:
raise NewAttributeError('{} is invalid, type has to be in {}'.format(self.type, (', '.join(self.types))))
else:
elif not self.type:
raise NewAttributeError('The type of the attribute is required.')
type_defaults = self.sane_default[self.type]
if kwargs.get('value'):
self.value = kwargs['value']
else:
elif not self.value:
raise NewAttributeError('The value of the attribute is required.')
# Default values
@ -111,6 +149,8 @@ class MISPAttribute(object):
self.SharingGroup = kwargs['SharingGroup']
if kwargs.get('ShadowAttribute'):
self.ShadowAttribute = kwargs['ShadowAttribute']
if kwargs.get('sig'):
self.sig = kwargs['sig']
def _json(self):
to_return = {'type': self.type, 'category': self.category, 'to_ids': self.to_ids,
@ -118,6 +158,8 @@ class MISPAttribute(object):
'comment': self.comment}
if self.sharing_group_id:
to_return['sharing_group_id'] = self.sharing_group_id
if self.sig:
to_return['sig'] = self.sig
to_return = _int_to_str(to_return)
return to_return
@ -193,6 +235,8 @@ class MISPEvent(object):
self.attributes = []
# All other keys
self.sig = None
self.global_sig = None
self.id = None
self.orgc_id = None
self.org_id = None
@ -209,6 +253,49 @@ class MISPEvent(object):
self.RelatedEvent = []
self.Tag = []
def _serialize(self):
return '{date}{threat_level_id}{info}{uuid}{analysis}{timestamp}'.format(
date=self.date, threat_level_id=self.threat_level_id, info=self.info,
uuid=self.uuid, analysis=self.analysis, timestamp=self.timestamp).encode()
def _serialize_sigs(self):
all_sigs = self.sig
for a in self.attributes:
all_sigs += a.sig
return all_sigs.encode()
def sign(self, gpg_uid):
if not has_pyme:
raise Exception('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
to_sign = self._serialize()
with gpg.Context() as c:
keys = list(c.keylist(gpg_uid))
c.signers = keys[:1]
signed, _ = c.sign(to_sign, mode=mode.DETACH)
self.sig = base64.b64encode(signed).decode()
for a in self.attributes:
a.sign(gpg_uid)
to_sign_global = self._serialize_sigs()
with gpg.Context() as c:
keys = list(c.keylist(gpg_uid))
c.signers = keys[:1]
signed, _ = c.sign(to_sign_global, mode=mode.DETACH)
self.global_sig = base64.b64encode(signed).decode()
def verify(self, gpg_uid):
if not has_pyme:
raise Exception('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
signed_data = self._serialize()
with gpg.Context() as c:
keys = list(c.keylist(gpg_uid))
c.verify(signed_data, signature=base64.b64decode(self.sig), verify=keys[:1])
for a in self.attributes:
a.verify(gpg_uid)
to_verify_global = self._serialize_sigs()
with gpg.Context() as c:
keys = list(c.keylist(gpg_uid))
c.verify(to_verify_global, signature=base64.b64decode(self.global_sig), verify=keys[:1])
def load(self, json_event):
self.new = False
self.dump_full = True
@ -251,7 +338,7 @@ class MISPEvent(object):
# Required value
if kwargs.get('info'):
self.info = kwargs['info']
else:
elif not self.info:
raise NewAttributeError('The info field of the new event is required.')
# Default values for a valid event to send to a MISP instance
@ -308,6 +395,10 @@ class MISPEvent(object):
self.RelatedEvent = kwargs['RelatedEvent']
if kwargs.get('Tag'):
self.Tag = kwargs['Tag']
if kwargs.get('sig'):
self.sig = kwargs['sig']
if kwargs.get('global_sig'):
self.global_sig = kwargs['global_sig']
def _json(self):
to_return = {'Event': {}}
@ -315,6 +406,10 @@ class MISPEvent(object):
'date': self.date.isoformat(), 'published': self.published,
'threat_level_id': self.threat_level_id,
'analysis': self.analysis, 'Attribute': []}
if self.sig:
to_return['sig'] = self.sig
if self.global_sig:
to_return['global_sig'] = self.global_sig
if self.id:
to_return['Event']['id'] = self.id
if self.orgc_id: