add: support extracting sigmf archives into sigmf recordings

pull/628/head
Luciano Righetti 2023-08-03 09:25:46 +02:00
parent 858b4ed1c6
commit 23069a7c5d
1 changed files with 99 additions and 11 deletions

View File

@ -10,7 +10,9 @@ import logging
import sys
from pymisp import MISPObject, MISPEvent
from sigmf import SigMFFile
import pymisp
from sigmf.archive import SIGMF_DATASET_EXT, SIGMF_METADATA_EXT
import tarfile
import codecs
log = logging.getLogger("sigmf-expand")
log.setLevel(logging.DEBUG)
@ -23,10 +25,10 @@ sh.setFormatter(fmt)
log.addHandler(sh)
misperrors = {'error': 'Error'}
mispattributes = {'input': ['sigmf-recording'], 'output': [
mispattributes = {'input': ['sigmf-recording', 'sigmf-archive'], 'output': [
'MISP objects'], 'format': 'misp_standard'}
moduleinfo = {'version': '0.1', 'author': 'Luciano Righetti',
'description': 'Expand a SigMF Recording object into a SigMF Expanded Recording object.',
'description': 'Expands a SigMF Recording object into a SigMF Expanded Recording object, extracts a SigMF archive into a SigMF Recording object.',
'module-type': ['expansion']}
@ -102,14 +104,77 @@ def generate_plots(recording, meta_filename, data_bytes):
return [{'relation': 'waterfall-plot', 'attribute': waterfall_attr}]
def handler(q=False):
request = json.loads(q)
object = request.get("object")
if not object:
return {"error": "No object provided"}
def process_sigmf_archive(object):
if 'Attribute' not in object:
return {"error": "Empty Attribute list"}
event = MISPEvent()
sigmf_data_attr = None
sigmf_meta_attr = None
try:
# get sigmf-archive attribute
for attribute in object['Attribute']:
if attribute['object_relation'] == 'SigMF-archive':
# write temp data file to disk
sigmf_archive_file = tempfile.NamedTemporaryFile(
suffix='.sigmf')
sigmf_archive_bin = base64.b64decode(attribute['data'])
with open(sigmf_archive_file.name, 'wb') as f:
f.write(sigmf_archive_bin)
f.close()
sigmf_tarfile = tarfile.open(
sigmf_archive_file.name, mode="r", format=tarfile.PAX_FORMAT)
files = sigmf_tarfile.getmembers()
for file in files:
if file.name.endswith(SIGMF_METADATA_EXT):
metadata_reader = sigmf_tarfile.extractfile(file)
sigmf_meta_attr = {
'type': 'attachment',
'value': file.name,
'data': base64.b64encode(metadata_reader.read()).decode("utf-8"),
'comment': 'SigMF metadata file',
'object_relation': 'SigMF-meta'
}
if file.name.endswith(SIGMF_DATASET_EXT):
data_reader = sigmf_tarfile.extractfile(file)
sigmf_data_attr = {
'type': 'attachment',
'value': file.name,
'data': base64.b64encode(data_reader.read()).decode("utf-8"),
'comment': 'SigMF data file',
'object_relation': 'SigMF-data'
}
if sigmf_meta_attr is None:
return {"error": "No SigMF metadata file found"}
recording = MISPObject('sigmf-recording')
recording.add_attribute(**sigmf_meta_attr)
recording.add_attribute(**sigmf_data_attr)
# add reference to original SigMF Archive object
recording.add_reference(object['uuid'], "expands")
event.add_object(recording)
event = json.loads(event.to_json())
return {"results": {'Object': event['Object']}}
# no sigmf-archive attribute found
return {"error": "No SigMF-archive attribute found"}
except Exception as e:
logging.exception(e)
return {"error": "An error occured when processing the SigMF archive"}
def process_sigmf_recording(object):
event = MISPEvent()
for attribute in object['Attribute']:
if attribute['object_relation'] == 'SigMF-data':
@ -147,7 +212,6 @@ def handler(q=False):
logging.exception(e)
return {"error": "Provided .sigmf-meta and .sigmf-data is not a valid SigMF file"}
event = MISPEvent()
expanded_sigmf = MISPObject('sigmf-expanded-recording')
if 'core:author' in sigmf_meta['global']:
@ -198,6 +262,30 @@ def handler(q=False):
return {"results": {'Object': event['Object']}}
def handler(q=False):
request = json.loads(q)
object = request.get("object")
event = MISPEvent()
if not object:
return {"error": "No object provided"}
if 'Attribute' not in object:
return {"error": "Empty Attribute list"}
# check if it's a SigMF Archive
if object['name'] == 'sigmf-archive':
return process_sigmf_archive(object)
# check if it's a SigMF Recording
if object['name'] == 'sigmf-recording':
return process_sigmf_recording(object)
# TODO: add support for SigMF Collection
return {"error": "No SigMF object provided"}
def introspection():
return mispattributes