mirror of https://github.com/MISP/misp-modules
fix: Using pymisp classes & methods to parse the module results
parent
ae5bd8d06a
commit
28eb92da53
|
@ -1,5 +1,7 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from collections import defaultdict
|
||||
from pymisp import MISPEvent, MISPObject
|
||||
from pymisp import __path__ as pymisp_path
|
||||
import csv
|
||||
import io
|
||||
import json
|
||||
|
@ -35,6 +37,7 @@ delimiters = [',', ';', '|', '/', '\t', ' ']
|
|||
class CsvParser():
|
||||
def __init__(self, header, has_header, data):
|
||||
data_header = data[0]
|
||||
self.misp_event = MISPEvent()
|
||||
if data_header == misp_standard_csv_header or data_header == misp_extended_csv_header:
|
||||
self.header = misp_standard_csv_header if data_header == misp_standard_csv_header else misp_extended_csv_header[:13]
|
||||
self.from_misp = True
|
||||
|
@ -50,7 +53,6 @@ class CsvParser():
|
|||
self.has_delimiter = True
|
||||
self.fields_number, self.delimiter, self.header = self.get_delimiter_from_header(data[0])
|
||||
self.data = data
|
||||
self.result = []
|
||||
|
||||
def get_delimiter_from_header(self, data):
|
||||
delimiters_count = {}
|
||||
|
@ -104,38 +106,30 @@ class CsvParser():
|
|||
self.buildAttributes()
|
||||
|
||||
def build_misp_event(self):
|
||||
l_attributes = []
|
||||
l_objects = []
|
||||
objects = defaultdict(list)
|
||||
objects = {}
|
||||
header_length = len(self.header)
|
||||
attribute_fields = self.header[:1] + self.header[2:6]
|
||||
attribute_fields = self.header[:1] + self.header[2:6] + self.header[7:8]
|
||||
for line in self.data:
|
||||
attribute = {}
|
||||
try:
|
||||
try:
|
||||
a_uuid, _, a_category, a_type, value, comment, to_ids, _, relation, o_uuid, o_name, o_category = line[:header_length]
|
||||
except ValueError:
|
||||
a_uuid, _, a_category, a_type, value, comment, to_ids, _, relation, tag, o_uuid, o_name, o_category = line[:header_length]
|
||||
if tag:
|
||||
attribute['tags'] = tag
|
||||
a_uuid, _, a_category, a_type, value, comment, to_ids, timestamp, relation, tag, o_uuid, o_name, o_category = line[:header_length]
|
||||
except ValueError:
|
||||
continue
|
||||
for t, v in zip(attribute_fields, [a_uuid, a_category, a_type, value, comment]):
|
||||
attribute[t] = v.replace('"', '')
|
||||
for t, v in zip(attribute_fields, (a_uuid, a_category, a_type, value, comment, timestamp)):
|
||||
attribute[t] = v.strip('"')
|
||||
attribute['to_ids'] = True if to_ids == '1' else False
|
||||
if tag:
|
||||
attribute['Tag'] = [{'name': t.strip()} for t in tag.split(',')]
|
||||
if relation:
|
||||
attribute["object_relation"] = relation.replace('"', '')
|
||||
object_index = tuple(o.replace('"', '') for o in (o_uuid, o_name, o_category))
|
||||
objects[object_index].append(attribute)
|
||||
if o_uuid not in objects:
|
||||
objects[o_uuid] = MISPObject(o_name)
|
||||
objects[o_uuid].add_attribute(relation, **attribute)
|
||||
else:
|
||||
l_attributes.append(attribute)
|
||||
for keys, attributes in objects.items():
|
||||
misp_object = {}
|
||||
for t, v in zip(['uuid', 'name', 'meta-category'], keys):
|
||||
misp_object[t] = v
|
||||
misp_object['Attribute'] = attributes
|
||||
l_objects.append(misp_object)
|
||||
self.result = {"Attribute": l_attributes, "Object": l_objects}
|
||||
self.misp_event.add_attribute(**attribute)
|
||||
for uuid, misp_object in objects.items():
|
||||
misp_object.uuid = uuid
|
||||
self.misp_event.add_object(**misp_object)
|
||||
self.finalize_results()
|
||||
|
||||
def buildAttributes(self):
|
||||
# if there is only 1 field of data
|
||||
|
@ -144,7 +138,7 @@ class CsvParser():
|
|||
for data in self.data:
|
||||
d = data.strip()
|
||||
if d:
|
||||
self.result.append({'types': mispType, 'values': d})
|
||||
self.misp_event.add_attribute(**{'type': mispType, 'value': d})
|
||||
else:
|
||||
# split fields that should be recognized as misp attribute types from the others
|
||||
list2pop, misp, head = self.findMispTypes()
|
||||
|
@ -160,14 +154,15 @@ class CsvParser():
|
|||
datamisp.append(datasplit.pop(l).strip())
|
||||
# for each misp type, we create an attribute
|
||||
for m, dm in zip(misp, datamisp):
|
||||
attribute = {'types': m, 'values': dm}
|
||||
attribute = {'type': m, 'value': dm}
|
||||
for h, ds in zip(head, datasplit):
|
||||
if h:
|
||||
attribute[h] = ds.strip()
|
||||
self.result.append(attribute)
|
||||
self.misp_event.add_attribute(**attribute)
|
||||
self.finalize_results()
|
||||
|
||||
def findMispTypes(self):
|
||||
descFilename = os.path.join(pymisp.__path__[0], 'data/describeTypes.json')
|
||||
descFilename = os.path.join(pymisp_path[0], 'data/describeTypes.json')
|
||||
with open(descFilename, 'r') as f:
|
||||
MispTypes = json.loads(f.read())['result'].get('types')
|
||||
list2pop = []
|
||||
|
@ -196,6 +191,10 @@ class CsvParser():
|
|||
# return list of indexes of the misp types, list of the misp types, remaining fields that will be attribute fields
|
||||
return list2pop, misp, list(reversed(head))
|
||||
|
||||
def finalize_results(self):
|
||||
event=json.loads(self.misp_event.to_json())['Event']
|
||||
self.results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])}
|
||||
|
||||
|
||||
def handler(q=False):
|
||||
if q is False:
|
||||
|
@ -221,8 +220,7 @@ def handler(q=False):
|
|||
csv_parser = CsvParser(header, has_header, data)
|
||||
# build the attributes
|
||||
csv_parser.parse_csv()
|
||||
r = {'results': csv_parser.result}
|
||||
return r
|
||||
return {'results': csv_parser.results}
|
||||
|
||||
|
||||
def introspection():
|
||||
|
|
Loading…
Reference in New Issue