mirror of https://github.com/MISP/PyMISP
new: [support for signing] added
- added new class CryptographicKeys - added functions to to_feed calls to include crypto keys - added protected boolean field to misp event - updated feed generator to support signing - if the new setting is set to True signing will be attempted for protected events - protected events are now passed to the /cryptographic_keys/serverSign endpoint of misp for signing - signatures are included as a .asc file in the output directory - TODO: - currently the JSON dumping is moved from a streamed dumping to an in memory dump before saving to disk - add a check for protected events and revert to streamed dumping for non protected events - alternatively use the already saved files to request signing from MISPpull/1312/head
parent
30819e141f
commit
671c9fabf5
|
@ -5,12 +5,17 @@ import sys
|
|||
import json
|
||||
import os
|
||||
from pymisp import ExpandedPyMISP
|
||||
from settings import url, key, ssl, outputdir, filters, valid_attribute_distribution_levels
|
||||
from settings import url, key, ssl, outputdir, filters, valid_attribute_distribution_levels, with_signatures
|
||||
try:
|
||||
from settings import with_distribution
|
||||
except ImportError:
|
||||
with_distribution = False
|
||||
|
||||
try:
|
||||
from settings import with_signatures
|
||||
except ImportError:
|
||||
with_signatures = False
|
||||
|
||||
try:
|
||||
from settings import with_local_tags
|
||||
except ImportError:
|
||||
|
@ -39,14 +44,31 @@ def init():
|
|||
return ExpandedPyMISP(url, key, ssl)
|
||||
|
||||
|
||||
def saveEvent(event):
|
||||
def saveEvent(event, misp):
|
||||
stringified_event = json.dumps(event, indent=2)
|
||||
if with_signatures and event['Event'].get('protected'):
|
||||
signature = getSignature(stringified_event, misp)
|
||||
try:
|
||||
with open(os.path.join(outputdir, f'{event["Event"]["uuid"]}.asc'), 'w') as f:
|
||||
f.write(signature)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
sys.exit('Could not create the event signature dump.')
|
||||
|
||||
try:
|
||||
with open(os.path.join(outputdir, f'{event["Event"]["uuid"]}.json'), 'w') as f:
|
||||
json.dump(event, f, indent=2)
|
||||
f.write(stringified_event)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
sys.exit('Could not create the event dump.')
|
||||
|
||||
def getSignature(stringified_event, misp):
|
||||
try:
|
||||
signature = misp.direct_call('/cryptographicKeys/serverSign', stringified_event)
|
||||
return signature
|
||||
except Exception as e:
|
||||
print(e)
|
||||
sys.exit('Could not get the signature for the event from the MISP instance. Perhaps the user does not have the necessary permissions.')
|
||||
|
||||
def saveHashes(hashes):
|
||||
try:
|
||||
|
@ -79,6 +101,7 @@ if __name__ == '__main__':
|
|||
sys.exit("No events returned.")
|
||||
manifest = {}
|
||||
hashes = []
|
||||
signatures = []
|
||||
counter = 1
|
||||
total = len(events)
|
||||
for event in events:
|
||||
|
@ -97,7 +120,7 @@ if __name__ == '__main__':
|
|||
continue
|
||||
hashes += [[h, e.uuid] for h in e_feed['Event'].pop('_hashes')]
|
||||
manifest.update(e_feed['Event'].pop('_manifest'))
|
||||
saveEvent(e_feed)
|
||||
saveEvent(e_feed, misp)
|
||||
print("Event " + str(counter) + "/" + str(total) + " exported.")
|
||||
counter += 1
|
||||
saveManifest(manifest)
|
||||
|
|
|
@ -54,3 +54,6 @@ with_distribution = False
|
|||
|
||||
# Include the exportable local tags along with the global tags. The default is True.
|
||||
with_local_tags = True
|
||||
|
||||
# Include signatures for protected events. This will allow MISP to ingest and update protected events from the feed. It requires perm_server_sign to be set to true in the user's role on MISP's side.
|
||||
with_signatures = False
|
|
@ -1559,7 +1559,8 @@ class MISPGalaxy(AbstractMISP):
|
|||
class MISPEvent(AnalystDataBehaviorMixin):
|
||||
|
||||
_fields_for_feed: set[str] = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
|
||||
'publish_timestamp', 'published', 'date', 'extends_uuid'}
|
||||
'publish_timestamp', 'published', 'date', 'extends_uuid',
|
||||
'protected'}
|
||||
|
||||
_analyst_data_object_type = 'Event'
|
||||
|
||||
|
@ -1581,6 +1582,7 @@ class MISPEvent(AnalystDataBehaviorMixin):
|
|||
self.EventReport: list[MISPEventReport] = []
|
||||
self.Tag: list[MISPTag] = []
|
||||
self.Galaxy: list[MISPGalaxy] = []
|
||||
self.CryptographicKey: list[MISPCryptographicKey] = []
|
||||
|
||||
self.publish_timestamp: float | int | datetime
|
||||
self.timestamp: float | int | datetime
|
||||
|
@ -1649,13 +1651,14 @@ class MISPEvent(AnalystDataBehaviorMixin):
|
|||
to_return += attribute.hash_values(algorithm)
|
||||
return to_return
|
||||
|
||||
def to_feed(self, valid_distributions: list[int] = [0, 1, 2, 3, 4, 5], with_meta: bool = False, with_distribution: bool=False, with_local_tags: bool = True, with_event_reports: bool = True) -> dict[str, Any]:
|
||||
def to_feed(self, valid_distributions: list[int] = [0, 1, 2, 3, 4, 5], with_meta: bool = False, with_distribution: bool=False, with_local_tags: bool = True, with_event_reports: bool = True, with_cryptographic_keys: bool = True) -> dict[str, Any]:
|
||||
""" Generate a json output for MISP Feed.
|
||||
|
||||
:param valid_distributions: only makes sense if the distribution key is set; i.e., the event is exported from a MISP instance.
|
||||
:param with_distribution: exports distribution and Sharing Group info; otherwise all SharingGroup information is discarded (protecting privacy)
|
||||
:param with_local_tags: tag export includes local exportable tags along with global exportable tags
|
||||
:param with_event_reports: include event reports in the returned MISP event
|
||||
:param with_cryptographic_keys: include the associated cryptographic keys in the returned protected MISP event
|
||||
"""
|
||||
required = ['info', 'Orgc']
|
||||
for r in required:
|
||||
|
@ -1719,6 +1722,13 @@ class MISPEvent(AnalystDataBehaviorMixin):
|
|||
event_report.pop('SharingGroup', None)
|
||||
event_report.pop('sharing_group_id', None)
|
||||
to_return['EventReport'].append(event_report.to_dict())
|
||||
|
||||
if with_cryptographic_keys and self.cryptographic_keys:
|
||||
to_return['CryptographicKey'] = []
|
||||
for cryptographic_key in self.cryptographic_keys:
|
||||
cryptographic_key.pop('parent_id', None)
|
||||
cryptographic_key.pop('id', None)
|
||||
to_return['CryptographicKey'].append(cryptographic_key.to_dict())
|
||||
|
||||
return {'Event': to_return}
|
||||
|
||||
|
@ -1755,6 +1765,10 @@ class MISPEvent(AnalystDataBehaviorMixin):
|
|||
@property
|
||||
def event_reports(self) -> list[MISPEventReport]:
|
||||
return self.EventReport
|
||||
|
||||
@property
|
||||
def cryptographic_keys(self) -> list[MISPCryptographicKey]:
|
||||
return self.CryptographicKey
|
||||
|
||||
@property
|
||||
def shadow_attributes(self) -> list[MISPShadowAttribute]:
|
||||
|
@ -1891,6 +1905,8 @@ class MISPEvent(AnalystDataBehaviorMixin):
|
|||
[self.add_galaxy(**e) for e in kwargs.pop('Galaxy')]
|
||||
if kwargs.get('EventReport'):
|
||||
[self.add_event_report(**e) for e in kwargs.pop('EventReport')]
|
||||
if kwargs.get('CryptographicKey'):
|
||||
[self.add_cryprographic_key(**e) for e in kwargs.pop('CryptographicKey')]
|
||||
|
||||
# All other keys
|
||||
if kwargs.get('id'):
|
||||
|
@ -2041,6 +2057,15 @@ class MISPEvent(AnalystDataBehaviorMixin):
|
|||
self.edited = True
|
||||
return event_report
|
||||
|
||||
def add_cryprographic_key(self, parent_type: str, key_data: str, type: str, uuid: str, fingerprint: str, timestamp: str, **kwargs) -> MISPCryptographicKey: # type: ignore[no-untyped-def]
|
||||
"""Add a Cryptographic Key. parent_type, key_data, type, uuid, fingerprint, timestamp are required but you can pass all
|
||||
other parameters supported by MISPEventReport"""
|
||||
cryptographic_key = MISPCryptographicKey()
|
||||
cryptographic_key.from_dict(parent_type=parent_type, key_data=key_data, type=type, uuid=uuid, fingerprint=fingerprint, timestamp=timestamp, **kwargs)
|
||||
self.cryptographic_keys.append(cryptographic_key)
|
||||
self.edited = True
|
||||
return cryptographic_key
|
||||
|
||||
def add_galaxy(self, galaxy: MISPGalaxy | dict[str, Any] | None = None, **kwargs) -> MISPGalaxy: # type: ignore[no-untyped-def]
|
||||
"""Add a galaxy and sub-clusters into an event, either by passing
|
||||
a MISPGalaxy or a dictionary.
|
||||
|
@ -2225,6 +2250,11 @@ class MISPWarninglist(AbstractMISP):
|
|||
kwargs = kwargs['Warninglist']
|
||||
super().from_dict(**kwargs)
|
||||
|
||||
class MISPCryptographicKey(AbstractMISP):
|
||||
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
|
||||
if 'CryptographicKey' in kwargs:
|
||||
kwargs = kwargs['CryptographicKey']
|
||||
super().from_dict(**kwargs)
|
||||
|
||||
class MISPTaxonomy(AbstractMISP):
|
||||
|
||||
|
|
Loading…
Reference in New Issue