From 4596d7688710e5b5ba8008aac23cc21a4a19bde1 Mon Sep 17 00:00:00 2001 From: Jakub Onderka Date: Sat, 6 Jan 2024 22:27:36 +0100 Subject: [PATCH] chg: [internal] Optimise csvimport --- misp_modules/modules/import_mod/csvimport.py | 26 +++++++------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/misp_modules/modules/import_mod/csvimport.py b/misp_modules/modules/import_mod/csvimport.py index 6bd79b7..8f4a643 100644 --- a/misp_modules/modules/import_mod/csvimport.py +++ b/misp_modules/modules/import_mod/csvimport.py @@ -1,10 +1,6 @@ -# -*- coding: utf-8 -*- from pymisp import MISPEvent, MISPObject -from pymisp import __path__ as pymisp_path import csv import io -import json -import os import base64 misperrors = {'error': 'Error'} @@ -33,7 +29,7 @@ misp_context_additional_fields = ['event_info', 'event_member_org', 'event_sourc misp_extended_csv_header = misp_standard_csv_header + misp_context_additional_fields -class CsvParser(): +class CsvParser: def __init__(self, header, has_header, delimiter, data, from_misp, MISPtypes, categories): self.misp_event = MISPEvent() self.header = header @@ -77,7 +73,7 @@ class CsvParser(): return {'error': 'In order to import MISP objects, an object relation for each attribute contained in an object is required.'} self.__build_misp_event(attribute_indexes, object_indexes) else: - attribute_fields = attribute_fields = misp_standard_csv_header[:1] + misp_standard_csv_header[2:9] + attribute_fields = misp_standard_csv_header[:1] + misp_standard_csv_header[2:9] attribute_indexes = [] types_indexes = [] for i in range(len(self.header)): @@ -236,7 +232,7 @@ class CsvParser(): return score def __finalize_results(self): - event = json.loads(self.misp_event.to_json()) + event = self.misp_event.to_dict() self.results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])} @@ -252,10 +248,7 @@ def __standard_parsing(data): return list(tuple(part.strip() for part in line) for line in csv.reader(io.TextIOWrapper(io.BytesIO(data.encode()), encoding='utf-8')) if line and not line[0].startswith('#')) -def handler(q=False): - if q is False: - return False - request = json.loads(q) +def dict_handler(request: dict): if request.get('data'): try: data = base64.b64decode(request['data']).decode('utf-8') @@ -282,12 +275,11 @@ def handler(q=False): del data[0] if header == misp_standard_csv_header or header == misp_extended_csv_header: header = misp_standard_csv_header - descFilename = os.path.join(pymisp_path[0], 'data/describeTypes.json') - with open(descFilename, 'r') as f: - description = json.loads(f.read())['result'] - MISPtypes = description['types'] + + description = MISPEvent().describe_types + misp_types = description['types'] for h in header: - if not any((h in MISPtypes, h in misp_extended_csv_header, h in ('', ' ', '_', 'object_id'))): + if not any((h in misp_types, h in misp_extended_csv_header, h in ('', ' ', '_', 'object_id'))): misperrors['error'] = 'Wrong header field: {}. Please use a header value that can be recognized by MISP (or alternatively skip it using a whitespace).'.format(h) return misperrors from_misp = all((h in misp_extended_csv_header or h in ('', ' ', '_', 'object_id') for h in header)) @@ -300,7 +292,7 @@ def handler(q=False): wrong_types = tuple(wrong_type for wrong_type in ('type', 'value') if wrong_type in header) misperrors['error'] = 'Error with the following header: {}. It contains the following field(s): {}, which is(are) already provided by the usage of at least on MISP attribute type in the header.'.format(header, 'and'.join(wrong_types)) return misperrors - csv_parser = CsvParser(header, has_header, delimiter, data, from_misp, MISPtypes, description['categories']) + csv_parser = CsvParser(header, has_header, delimiter, data, from_misp, misp_types, description['categories']) # build the attributes result = csv_parser.parse_csv() if 'error' in result: