mirror of https://github.com/MISP/misp-modules
fix: properly read samples in different datatypes
parent
e26bfef477
commit
df2183ce54
|
@ -30,52 +30,60 @@ moduleinfo = {'version': '0.1', 'author': 'Luciano Righetti',
|
|||
'module-type': ['expansion']}
|
||||
|
||||
|
||||
def generate_plots(recording, meta_filename):
|
||||
def get_samples(data_bytes, data_type) -> np.ndarray:
|
||||
"""
|
||||
Get samples from bytes.
|
||||
|
||||
Source: https://github.com/IQEngine/IQEngine/blob/main/api/rf/samples.py
|
||||
|
||||
Parameters
|
||||
----------
|
||||
data_bytes : bytes
|
||||
The bytes to convert to samples.
|
||||
data_type : str
|
||||
The data type of the bytes.
|
||||
|
||||
Returns
|
||||
-------
|
||||
np.ndarray
|
||||
The samples.
|
||||
"""
|
||||
|
||||
if data_type == "ci16_le" or data_type == "ci16":
|
||||
samples = np.frombuffer(data_bytes, dtype=np.int16)
|
||||
samples = samples[::2] + 1j * samples[1::2]
|
||||
elif data_type == "cf32_le":
|
||||
samples = np.frombuffer(data_bytes, dtype=np.complex64)
|
||||
elif data_type == "ci8" or data_type == "i8":
|
||||
samples = np.frombuffer(data_bytes, dtype=np.int8)
|
||||
samples = samples[::2] + 1j * samples[1::2]
|
||||
else:
|
||||
raise ("Datatype " + data_type + " not implemented")
|
||||
return samples
|
||||
|
||||
|
||||
def generate_plots(recording, meta_filename, data_bytes):
|
||||
# FFT plot
|
||||
filename = meta_filename.replace('.sigmf-data', '')
|
||||
# snippet from https://gist.github.com/daniestevez/0d519fd4044f3b9f44e170fd619fbb40
|
||||
NFFT = 2048
|
||||
N = NFFT * 4096
|
||||
fs = recording.get_global_info()['core:sample_rate']
|
||||
x = np.fromfile(recording.data_file, 'int16', count=2*N)
|
||||
x = x[::2] + 1j * x[1::2]
|
||||
|
||||
# f = np.fft.fftshift(np.average(
|
||||
# np.abs(np.fft.fft(x.reshape(-1, NFFT)))**2, axis=0))
|
||||
# freq = np.fft.fftshift(np.fft.fftfreq(NFFT, 1/fs))
|
||||
|
||||
# plt.figure(figsize=(10, 4))
|
||||
# plt.plot(1e-6 * freq, 10*np.log10(f))
|
||||
# plt.title(filename)
|
||||
# plt.ylabel('PSD (dB)')
|
||||
# plt.xlabel('Baseband frequency (MHz)')
|
||||
# fft_buff = io.BytesIO()
|
||||
# plt.savefig(fft_buff, format='png')
|
||||
# fft_buff.seek(0)
|
||||
# fft_png = base64.b64encode(fft_buff.read()).decode('utf-8')
|
||||
|
||||
# fft_attr = {
|
||||
# 'type': 'attachment',
|
||||
# 'value': filename + '-fft.png',
|
||||
# 'data': fft_png,
|
||||
# 'comment': 'FFT plot of the recording'
|
||||
# }
|
||||
samples = get_samples(
|
||||
data_bytes, recording.get_global_info()['core:datatype'])
|
||||
sample_rate = recording.get_global_info()['core:sample_rate']
|
||||
|
||||
# Waterfall plot
|
||||
# snippet from https://pysdr.org/content/frequency_domain.html#fast-fourier-transform-fft
|
||||
fft_size = 1024
|
||||
# // is an integer division which rounds down
|
||||
num_rows = len(x) // fft_size
|
||||
num_rows = len(samples) // fft_size
|
||||
spectrogram = np.zeros((num_rows, fft_size))
|
||||
for i in range(num_rows):
|
||||
spectrogram[i, :] = 10 * \
|
||||
np.log10(np.abs(np.fft.fftshift(
|
||||
np.fft.fft(x[i*fft_size:(i+1)*fft_size])))**2)
|
||||
np.fft.fft(samples[i*fft_size:(i+1)*fft_size])))**2)
|
||||
|
||||
plt.figure(figsize=(10, 4))
|
||||
plt.title(filename)
|
||||
plt.imshow(spectrogram, aspect='auto', extent=[
|
||||
fs/-2/1e6, fs/2/1e6, 0, len(x)/fs])
|
||||
sample_rate/-2/1e6, sample_rate/2/1e6, 0, len(samples)/sample_rate])
|
||||
plt.xlabel("Frequency [MHz]")
|
||||
plt.ylabel("Time [ms]")
|
||||
plt.savefig(filename + '-spectrogram.png')
|
||||
|
@ -91,7 +99,6 @@ def generate_plots(recording, meta_filename):
|
|||
'comment': 'Waterfall plot of the recording'
|
||||
}
|
||||
|
||||
# return [fft_attr, waterfall_attr]
|
||||
return [{'relation': 'waterfall-plot', 'attribute': waterfall_attr}]
|
||||
|
||||
|
||||
|
@ -176,7 +183,8 @@ def handler(q=False):
|
|||
|
||||
# add FFT and waterfall plot
|
||||
try:
|
||||
plots = generate_plots(recording, sigmf_data_attr['value'])
|
||||
plots = generate_plots(
|
||||
recording, sigmf_data_attr['value'], sigmf_data_bin)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
return {"error": "Could not generate plots"}
|
||||
|
|
Loading…
Reference in New Issue