diff --git a/misp_modules/modules/expansion/sigmf-expand.py b/misp_modules/modules/expansion/sigmf-expand.py index ec51106..366ec18 100644 --- a/misp_modules/modules/expansion/sigmf-expand.py +++ b/misp_modules/modules/expansion/sigmf-expand.py @@ -10,7 +10,9 @@ import logging import sys from pymisp import MISPObject, MISPEvent from sigmf import SigMFFile -import pymisp +from sigmf.archive import SIGMF_DATASET_EXT, SIGMF_METADATA_EXT +import tarfile +import codecs log = logging.getLogger("sigmf-expand") log.setLevel(logging.DEBUG) @@ -23,10 +25,10 @@ sh.setFormatter(fmt) log.addHandler(sh) misperrors = {'error': 'Error'} -mispattributes = {'input': ['sigmf-recording'], 'output': [ +mispattributes = {'input': ['sigmf-recording', 'sigmf-archive'], 'output': [ 'MISP objects'], 'format': 'misp_standard'} moduleinfo = {'version': '0.1', 'author': 'Luciano Righetti', - 'description': 'Expand a SigMF Recording object into a SigMF Expanded Recording object.', + 'description': 'Expands a SigMF Recording object into a SigMF Expanded Recording object, extracts a SigMF archive into a SigMF Recording object.', 'module-type': ['expansion']} @@ -102,14 +104,77 @@ def generate_plots(recording, meta_filename, data_bytes): return [{'relation': 'waterfall-plot', 'attribute': waterfall_attr}] -def handler(q=False): - request = json.loads(q) - object = request.get("object") - if not object: - return {"error": "No object provided"} +def process_sigmf_archive(object): - if 'Attribute' not in object: - return {"error": "Empty Attribute list"} + event = MISPEvent() + sigmf_data_attr = None + sigmf_meta_attr = None + + try: + # get sigmf-archive attribute + for attribute in object['Attribute']: + if attribute['object_relation'] == 'SigMF-archive': + + # write temp data file to disk + sigmf_archive_file = tempfile.NamedTemporaryFile( + suffix='.sigmf') + sigmf_archive_bin = base64.b64decode(attribute['data']) + with open(sigmf_archive_file.name, 'wb') as f: + f.write(sigmf_archive_bin) + f.close() + + sigmf_tarfile = tarfile.open( + sigmf_archive_file.name, mode="r", format=tarfile.PAX_FORMAT) + + files = sigmf_tarfile.getmembers() + + for file in files: + if file.name.endswith(SIGMF_METADATA_EXT): + metadata_reader = sigmf_tarfile.extractfile(file) + sigmf_meta_attr = { + 'type': 'attachment', + 'value': file.name, + 'data': base64.b64encode(metadata_reader.read()).decode("utf-8"), + 'comment': 'SigMF metadata file', + 'object_relation': 'SigMF-meta' + } + + if file.name.endswith(SIGMF_DATASET_EXT): + data_reader = sigmf_tarfile.extractfile(file) + sigmf_data_attr = { + 'type': 'attachment', + 'value': file.name, + 'data': base64.b64encode(data_reader.read()).decode("utf-8"), + 'comment': 'SigMF data file', + 'object_relation': 'SigMF-data' + } + + if sigmf_meta_attr is None: + return {"error": "No SigMF metadata file found"} + + recording = MISPObject('sigmf-recording') + recording.add_attribute(**sigmf_meta_attr) + recording.add_attribute(**sigmf_data_attr) + + # add reference to original SigMF Archive object + recording.add_reference(object['uuid'], "expands") + + event.add_object(recording) + event = json.loads(event.to_json()) + + return {"results": {'Object': event['Object']}} + + # no sigmf-archive attribute found + return {"error": "No SigMF-archive attribute found"} + + except Exception as e: + logging.exception(e) + return {"error": "An error occured when processing the SigMF archive"} + + +def process_sigmf_recording(object): + + event = MISPEvent() for attribute in object['Attribute']: if attribute['object_relation'] == 'SigMF-data': @@ -147,7 +212,6 @@ def handler(q=False): logging.exception(e) return {"error": "Provided .sigmf-meta and .sigmf-data is not a valid SigMF file"} - event = MISPEvent() expanded_sigmf = MISPObject('sigmf-expanded-recording') if 'core:author' in sigmf_meta['global']: @@ -198,6 +262,30 @@ def handler(q=False): return {"results": {'Object': event['Object']}} +def handler(q=False): + request = json.loads(q) + object = request.get("object") + event = MISPEvent() + + if not object: + return {"error": "No object provided"} + + if 'Attribute' not in object: + return {"error": "Empty Attribute list"} + + # check if it's a SigMF Archive + if object['name'] == 'sigmf-archive': + return process_sigmf_archive(object) + + # check if it's a SigMF Recording + if object['name'] == 'sigmf-recording': + return process_sigmf_recording(object) + + # TODO: add support for SigMF Collection + + return {"error": "No SigMF object provided"} + + def introspection(): return mispattributes