new: add waterfall plot to the expanded object

pull/628/head
Luciano Righetti 2023-07-12 15:34:44 +02:00
parent 5e2957b13f
commit 3f0fa14545
1 changed files with 82 additions and 3 deletions

View File

@ -1,12 +1,16 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import base64 import base64
import numpy as np
import matplotlib.pyplot as plt
import io
import json import json
import tempfile import tempfile
import logging import logging
import sys import sys
from pymisp import MISPObject, MISPEvent from pymisp import MISPObject, MISPEvent
from sigmf import SigMFFile from sigmf import SigMFFile
import pymisp
log = logging.getLogger("sigmf-expand") log = logging.getLogger("sigmf-expand")
log.setLevel(logging.DEBUG) log.setLevel(logging.DEBUG)
@ -26,6 +30,71 @@ moduleinfo = {'version': '0.1', 'author': 'Luciano Righetti',
'module-type': ['expansion']} 'module-type': ['expansion']}
def generate_plots(recording, meta_filename):
# FFT plot
filename = meta_filename.replace('.sigmf-data', '')
# snippet from https://gist.github.com/daniestevez/0d519fd4044f3b9f44e170fd619fbb40
NFFT = 2048
N = NFFT * 4096
fs = recording.get_global_info()['core:sample_rate']
x = np.fromfile(recording.data_file, 'int16', count=2*N)
x = x[::2] + 1j * x[1::2]
# f = np.fft.fftshift(np.average(
# np.abs(np.fft.fft(x.reshape(-1, NFFT)))**2, axis=0))
# freq = np.fft.fftshift(np.fft.fftfreq(NFFT, 1/fs))
# plt.figure(figsize=(10, 4))
# plt.plot(1e-6 * freq, 10*np.log10(f))
# plt.title(filename)
# plt.ylabel('PSD (dB)')
# plt.xlabel('Baseband frequency (MHz)')
# fft_buff = io.BytesIO()
# plt.savefig(fft_buff, format='png')
# fft_buff.seek(0)
# fft_png = base64.b64encode(fft_buff.read()).decode('utf-8')
# fft_attr = {
# 'type': 'attachment',
# 'value': filename + '-fft.png',
# 'data': fft_png,
# 'comment': 'FFT plot of the recording'
# }
# Waterfall plot
# snippet from https://pysdr.org/content/frequency_domain.html#fast-fourier-transform-fft
fft_size = 1024
# // is an integer division which rounds down
num_rows = len(x) // fft_size
spectrogram = np.zeros((num_rows, fft_size))
for i in range(num_rows):
spectrogram[i, :] = 10 * \
np.log10(np.abs(np.fft.fftshift(
np.fft.fft(x[i*fft_size:(i+1)*fft_size])))**2)
plt.figure(figsize=(10, 4))
plt.title(filename)
plt.imshow(spectrogram, aspect='auto', extent=[
fs/-2/1e6, fs/2/1e6, 0, len(x)/fs])
plt.xlabel("Frequency [MHz]")
plt.ylabel("Time [ms]")
plt.savefig(filename + '-spectrogram.png')
waterfall_buff = io.BytesIO()
plt.savefig(waterfall_buff, format='png')
waterfall_buff.seek(0)
waterfall_png = base64.b64encode(waterfall_buff.read()).decode('utf-8')
waterfall_attr = {
'type': 'attachment',
'value': filename + '-waterfall.png',
'data': waterfall_png,
'comment': 'Waterfall plot of the recording'
}
# return [fft_attr, waterfall_attr]
return [{'relation': 'waterfall-plot', 'attribute': waterfall_attr}]
def handler(q=False): def handler(q=False):
request = json.loads(q) request = json.loads(q)
object = request.get("object") object = request.get("object")
@ -73,6 +142,8 @@ def handler(q=False):
event = MISPEvent() event = MISPEvent()
expanded_sigmf = MISPObject('sigmf-expanded-recording') expanded_sigmf = MISPObject('sigmf-expanded-recording')
logging.error(expanded_sigmf.to_json())
logging.error(pymisp.__file__)
if 'core:author' in sigmf_meta['global']: if 'core:author' in sigmf_meta['global']:
expanded_sigmf.add_attribute( expanded_sigmf.add_attribute(
@ -102,11 +173,19 @@ def handler(q=False):
expanded_sigmf.add_attribute( expanded_sigmf.add_attribute(
'version', **{'type': 'text', 'value': sigmf_meta['global']['core:version']}) 'version', **{'type': 'text', 'value': sigmf_meta['global']['core:version']})
# TODO: geolocation (GeoJSON)
# add reference to original SigMF Recording object # add reference to original SigMF Recording object
expanded_sigmf.add_reference(object['uuid'], "expands") expanded_sigmf.add_reference(object['uuid'], "expands")
# add FFT and waterfall plot
try:
plots = generate_plots(recording, sigmf_data_attr['value'])
except Exception as e:
logging.exception(e)
return {"error": "Could not generate plots"}
for plot in plots:
expanded_sigmf.add_attribute(plot['relation'], **plot['attribute'])
event.add_object(expanded_sigmf) event.add_object(expanded_sigmf)
event = json.loads(event.to_json()) event = json.loads(event.to_json())