From df2183ce5488a7dc8d42e34dd6f2609a1ac3b5d2 Mon Sep 17 00:00:00 2001 From: Luciano Righetti Date: Thu, 13 Jul 2023 11:06:25 +0200 Subject: [PATCH] fix: properly read samples in different datatypes --- .../modules/expansion/sigmf-expand.py | 74 ++++++++++--------- 1 file changed, 41 insertions(+), 33 deletions(-) diff --git a/misp_modules/modules/expansion/sigmf-expand.py b/misp_modules/modules/expansion/sigmf-expand.py index def1e1b..ec51106 100644 --- a/misp_modules/modules/expansion/sigmf-expand.py +++ b/misp_modules/modules/expansion/sigmf-expand.py @@ -30,52 +30,60 @@ moduleinfo = {'version': '0.1', 'author': 'Luciano Righetti', 'module-type': ['expansion']} -def generate_plots(recording, meta_filename): +def get_samples(data_bytes, data_type) -> np.ndarray: + """ + Get samples from bytes. + + Source: https://github.com/IQEngine/IQEngine/blob/main/api/rf/samples.py + + Parameters + ---------- + data_bytes : bytes + The bytes to convert to samples. + data_type : str + The data type of the bytes. + + Returns + ------- + np.ndarray + The samples. + """ + + if data_type == "ci16_le" or data_type == "ci16": + samples = np.frombuffer(data_bytes, dtype=np.int16) + samples = samples[::2] + 1j * samples[1::2] + elif data_type == "cf32_le": + samples = np.frombuffer(data_bytes, dtype=np.complex64) + elif data_type == "ci8" or data_type == "i8": + samples = np.frombuffer(data_bytes, dtype=np.int8) + samples = samples[::2] + 1j * samples[1::2] + else: + raise ("Datatype " + data_type + " not implemented") + return samples + + +def generate_plots(recording, meta_filename, data_bytes): # FFT plot filename = meta_filename.replace('.sigmf-data', '') - # snippet from https://gist.github.com/daniestevez/0d519fd4044f3b9f44e170fd619fbb40 - NFFT = 2048 - N = NFFT * 4096 - fs = recording.get_global_info()['core:sample_rate'] - x = np.fromfile(recording.data_file, 'int16', count=2*N) - x = x[::2] + 1j * x[1::2] - - # f = np.fft.fftshift(np.average( - # np.abs(np.fft.fft(x.reshape(-1, NFFT)))**2, axis=0)) - # freq = np.fft.fftshift(np.fft.fftfreq(NFFT, 1/fs)) - - # plt.figure(figsize=(10, 4)) - # plt.plot(1e-6 * freq, 10*np.log10(f)) - # plt.title(filename) - # plt.ylabel('PSD (dB)') - # plt.xlabel('Baseband frequency (MHz)') - # fft_buff = io.BytesIO() - # plt.savefig(fft_buff, format='png') - # fft_buff.seek(0) - # fft_png = base64.b64encode(fft_buff.read()).decode('utf-8') - - # fft_attr = { - # 'type': 'attachment', - # 'value': filename + '-fft.png', - # 'data': fft_png, - # 'comment': 'FFT plot of the recording' - # } + samples = get_samples( + data_bytes, recording.get_global_info()['core:datatype']) + sample_rate = recording.get_global_info()['core:sample_rate'] # Waterfall plot # snippet from https://pysdr.org/content/frequency_domain.html#fast-fourier-transform-fft fft_size = 1024 # // is an integer division which rounds down - num_rows = len(x) // fft_size + num_rows = len(samples) // fft_size spectrogram = np.zeros((num_rows, fft_size)) for i in range(num_rows): spectrogram[i, :] = 10 * \ np.log10(np.abs(np.fft.fftshift( - np.fft.fft(x[i*fft_size:(i+1)*fft_size])))**2) + np.fft.fft(samples[i*fft_size:(i+1)*fft_size])))**2) plt.figure(figsize=(10, 4)) plt.title(filename) plt.imshow(spectrogram, aspect='auto', extent=[ - fs/-2/1e6, fs/2/1e6, 0, len(x)/fs]) + sample_rate/-2/1e6, sample_rate/2/1e6, 0, len(samples)/sample_rate]) plt.xlabel("Frequency [MHz]") plt.ylabel("Time [ms]") plt.savefig(filename + '-spectrogram.png') @@ -91,7 +99,6 @@ def generate_plots(recording, meta_filename): 'comment': 'Waterfall plot of the recording' } - # return [fft_attr, waterfall_attr] return [{'relation': 'waterfall-plot', 'attribute': waterfall_attr}] @@ -176,7 +183,8 @@ def handler(q=False): # add FFT and waterfall plot try: - plots = generate_plots(recording, sigmf_data_attr['value']) + plots = generate_plots( + recording, sigmf_data_attr['value'], sigmf_data_bin) except Exception as e: logging.exception(e) return {"error": "Could not generate plots"}