From 671c9fabf5304f9d79ddde103d873a4a2ba320eb Mon Sep 17 00:00:00 2001 From: iglocska Date: Sun, 8 Dec 2024 18:48:07 +0100 Subject: [PATCH] new: [support for signing] added - added new class CryptographicKeys - added functions to to_feed calls to include crypto keys - added protected boolean field to misp event - updated feed generator to support signing - if the new setting is set to True signing will be attempted for protected events - protected events are now passed to the /cryptographic_keys/serverSign endpoint of misp for signing - signatures are included as a .asc file in the output directory - TODO: - currently the JSON dumping is moved from a streamed dumping to an in memory dump before saving to disk - add a check for protected events and revert to streamed dumping for non protected events - alternatively use the already saved files to request signing from MISP --- examples/feed-generator/generate.py | 31 ++++++++++++++++--- examples/feed-generator/settings.default.py | 3 ++ pymisp/mispevent.py | 34 +++++++++++++++++++-- 3 files changed, 62 insertions(+), 6 deletions(-) diff --git a/examples/feed-generator/generate.py b/examples/feed-generator/generate.py index 48c28e8..f217cb0 100755 --- a/examples/feed-generator/generate.py +++ b/examples/feed-generator/generate.py @@ -5,12 +5,17 @@ import sys import json import os from pymisp import ExpandedPyMISP -from settings import url, key, ssl, outputdir, filters, valid_attribute_distribution_levels +from settings import url, key, ssl, outputdir, filters, valid_attribute_distribution_levels, with_signatures try: from settings import with_distribution except ImportError: with_distribution = False +try: + from settings import with_signatures +except ImportError: + with_signatures = False + try: from settings import with_local_tags except ImportError: @@ -39,14 +44,31 @@ def init(): return ExpandedPyMISP(url, key, ssl) -def saveEvent(event): +def saveEvent(event, misp): + stringified_event = json.dumps(event, indent=2) + if with_signatures and event['Event'].get('protected'): + signature = getSignature(stringified_event, misp) + try: + with open(os.path.join(outputdir, f'{event["Event"]["uuid"]}.asc'), 'w') as f: + f.write(signature) + except Exception as e: + print(e) + sys.exit('Could not create the event signature dump.') + try: with open(os.path.join(outputdir, f'{event["Event"]["uuid"]}.json'), 'w') as f: - json.dump(event, f, indent=2) + f.write(stringified_event) except Exception as e: print(e) sys.exit('Could not create the event dump.') +def getSignature(stringified_event, misp): + try: + signature = misp.direct_call('/cryptographicKeys/serverSign', stringified_event) + return signature + except Exception as e: + print(e) + sys.exit('Could not get the signature for the event from the MISP instance. Perhaps the user does not have the necessary permissions.') def saveHashes(hashes): try: @@ -79,6 +101,7 @@ if __name__ == '__main__': sys.exit("No events returned.") manifest = {} hashes = [] + signatures = [] counter = 1 total = len(events) for event in events: @@ -97,7 +120,7 @@ if __name__ == '__main__': continue hashes += [[h, e.uuid] for h in e_feed['Event'].pop('_hashes')] manifest.update(e_feed['Event'].pop('_manifest')) - saveEvent(e_feed) + saveEvent(e_feed, misp) print("Event " + str(counter) + "/" + str(total) + " exported.") counter += 1 saveManifest(manifest) diff --git a/examples/feed-generator/settings.default.py b/examples/feed-generator/settings.default.py index 3b994e8..6befb60 100755 --- a/examples/feed-generator/settings.default.py +++ b/examples/feed-generator/settings.default.py @@ -54,3 +54,6 @@ with_distribution = False # Include the exportable local tags along with the global tags. The default is True. with_local_tags = True + +# Include signatures for protected events. This will allow MISP to ingest and update protected events from the feed. It requires perm_server_sign to be set to true in the user's role on MISP's side. +with_signatures = False \ No newline at end of file diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 362fd1e..8ae7380 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -1559,7 +1559,8 @@ class MISPGalaxy(AbstractMISP): class MISPEvent(AnalystDataBehaviorMixin): _fields_for_feed: set[str] = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp', - 'publish_timestamp', 'published', 'date', 'extends_uuid'} + 'publish_timestamp', 'published', 'date', 'extends_uuid', + 'protected'} _analyst_data_object_type = 'Event' @@ -1581,6 +1582,7 @@ class MISPEvent(AnalystDataBehaviorMixin): self.EventReport: list[MISPEventReport] = [] self.Tag: list[MISPTag] = [] self.Galaxy: list[MISPGalaxy] = [] + self.CryptographicKey: list[MISPCryptographicKey] = [] self.publish_timestamp: float | int | datetime self.timestamp: float | int | datetime @@ -1649,13 +1651,14 @@ class MISPEvent(AnalystDataBehaviorMixin): to_return += attribute.hash_values(algorithm) return to_return - def to_feed(self, valid_distributions: list[int] = [0, 1, 2, 3, 4, 5], with_meta: bool = False, with_distribution: bool=False, with_local_tags: bool = True, with_event_reports: bool = True) -> dict[str, Any]: + def to_feed(self, valid_distributions: list[int] = [0, 1, 2, 3, 4, 5], with_meta: bool = False, with_distribution: bool=False, with_local_tags: bool = True, with_event_reports: bool = True, with_cryptographic_keys: bool = True) -> dict[str, Any]: """ Generate a json output for MISP Feed. :param valid_distributions: only makes sense if the distribution key is set; i.e., the event is exported from a MISP instance. :param with_distribution: exports distribution and Sharing Group info; otherwise all SharingGroup information is discarded (protecting privacy) :param with_local_tags: tag export includes local exportable tags along with global exportable tags :param with_event_reports: include event reports in the returned MISP event + :param with_cryptographic_keys: include the associated cryptographic keys in the returned protected MISP event """ required = ['info', 'Orgc'] for r in required: @@ -1719,6 +1722,13 @@ class MISPEvent(AnalystDataBehaviorMixin): event_report.pop('SharingGroup', None) event_report.pop('sharing_group_id', None) to_return['EventReport'].append(event_report.to_dict()) + + if with_cryptographic_keys and self.cryptographic_keys: + to_return['CryptographicKey'] = [] + for cryptographic_key in self.cryptographic_keys: + cryptographic_key.pop('parent_id', None) + cryptographic_key.pop('id', None) + to_return['CryptographicKey'].append(cryptographic_key.to_dict()) return {'Event': to_return} @@ -1755,6 +1765,10 @@ class MISPEvent(AnalystDataBehaviorMixin): @property def event_reports(self) -> list[MISPEventReport]: return self.EventReport + + @property + def cryptographic_keys(self) -> list[MISPCryptographicKey]: + return self.CryptographicKey @property def shadow_attributes(self) -> list[MISPShadowAttribute]: @@ -1891,6 +1905,8 @@ class MISPEvent(AnalystDataBehaviorMixin): [self.add_galaxy(**e) for e in kwargs.pop('Galaxy')] if kwargs.get('EventReport'): [self.add_event_report(**e) for e in kwargs.pop('EventReport')] + if kwargs.get('CryptographicKey'): + [self.add_cryprographic_key(**e) for e in kwargs.pop('CryptographicKey')] # All other keys if kwargs.get('id'): @@ -2041,6 +2057,15 @@ class MISPEvent(AnalystDataBehaviorMixin): self.edited = True return event_report + def add_cryprographic_key(self, parent_type: str, key_data: str, type: str, uuid: str, fingerprint: str, timestamp: str, **kwargs) -> MISPCryptographicKey: # type: ignore[no-untyped-def] + """Add a Cryptographic Key. parent_type, key_data, type, uuid, fingerprint, timestamp are required but you can pass all + other parameters supported by MISPEventReport""" + cryptographic_key = MISPCryptographicKey() + cryptographic_key.from_dict(parent_type=parent_type, key_data=key_data, type=type, uuid=uuid, fingerprint=fingerprint, timestamp=timestamp, **kwargs) + self.cryptographic_keys.append(cryptographic_key) + self.edited = True + return cryptographic_key + def add_galaxy(self, galaxy: MISPGalaxy | dict[str, Any] | None = None, **kwargs) -> MISPGalaxy: # type: ignore[no-untyped-def] """Add a galaxy and sub-clusters into an event, either by passing a MISPGalaxy or a dictionary. @@ -2225,6 +2250,11 @@ class MISPWarninglist(AbstractMISP): kwargs = kwargs['Warninglist'] super().from_dict(**kwargs) +class MISPCryptographicKey(AbstractMISP): + def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] + if 'CryptographicKey' in kwargs: + kwargs = kwargs['CryptographicKey'] + super().from_dict(**kwargs) class MISPTaxonomy(AbstractMISP):