chg: [internal] Optimise csvimport

pull/648/head
Jakub Onderka 2024-01-06 22:27:36 +01:00
parent 193d7fd0bc
commit 4596d76887
1 changed files with 9 additions and 17 deletions

View File

@ -1,10 +1,6 @@
# -*- coding: utf-8 -*-
from pymisp import MISPEvent, MISPObject
from pymisp import __path__ as pymisp_path
import csv
import io
import json
import os
import base64
misperrors = {'error': 'Error'}
@ -33,7 +29,7 @@ misp_context_additional_fields = ['event_info', 'event_member_org', 'event_sourc
misp_extended_csv_header = misp_standard_csv_header + misp_context_additional_fields
class CsvParser():
class CsvParser:
def __init__(self, header, has_header, delimiter, data, from_misp, MISPtypes, categories):
self.misp_event = MISPEvent()
self.header = header
@ -77,7 +73,7 @@ class CsvParser():
return {'error': 'In order to import MISP objects, an object relation for each attribute contained in an object is required.'}
self.__build_misp_event(attribute_indexes, object_indexes)
else:
attribute_fields = attribute_fields = misp_standard_csv_header[:1] + misp_standard_csv_header[2:9]
attribute_fields = misp_standard_csv_header[:1] + misp_standard_csv_header[2:9]
attribute_indexes = []
types_indexes = []
for i in range(len(self.header)):
@ -236,7 +232,7 @@ class CsvParser():
return score
def __finalize_results(self):
event = json.loads(self.misp_event.to_json())
event = self.misp_event.to_dict()
self.results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])}
@ -252,10 +248,7 @@ def __standard_parsing(data):
return list(tuple(part.strip() for part in line) for line in csv.reader(io.TextIOWrapper(io.BytesIO(data.encode()), encoding='utf-8')) if line and not line[0].startswith('#'))
def handler(q=False):
if q is False:
return False
request = json.loads(q)
def dict_handler(request: dict):
if request.get('data'):
try:
data = base64.b64decode(request['data']).decode('utf-8')
@ -282,12 +275,11 @@ def handler(q=False):
del data[0]
if header == misp_standard_csv_header or header == misp_extended_csv_header:
header = misp_standard_csv_header
descFilename = os.path.join(pymisp_path[0], 'data/describeTypes.json')
with open(descFilename, 'r') as f:
description = json.loads(f.read())['result']
MISPtypes = description['types']
description = MISPEvent().describe_types
misp_types = description['types']
for h in header:
if not any((h in MISPtypes, h in misp_extended_csv_header, h in ('', ' ', '_', 'object_id'))):
if not any((h in misp_types, h in misp_extended_csv_header, h in ('', ' ', '_', 'object_id'))):
misperrors['error'] = 'Wrong header field: {}. Please use a header value that can be recognized by MISP (or alternatively skip it using a whitespace).'.format(h)
return misperrors
from_misp = all((h in misp_extended_csv_header or h in ('', ' ', '_', 'object_id') for h in header))
@ -300,7 +292,7 @@ def handler(q=False):
wrong_types = tuple(wrong_type for wrong_type in ('type', 'value') if wrong_type in header)
misperrors['error'] = 'Error with the following header: {}. It contains the following field(s): {}, which is(are) already provided by the usage of at least on MISP attribute type in the header.'.format(header, 'and'.join(wrong_types))
return misperrors
csv_parser = CsvParser(header, has_header, delimiter, data, from_misp, MISPtypes, description['categories'])
csv_parser = CsvParser(header, has_header, delimiter, data, from_misp, misp_types, description['categories'])
# build the attributes
result = csv_parser.parse_csv()
if 'error' in result: