Browse Source

chg: Checking attributes category

- We check the category before adding the
  attribute to the event
- Checking if the category is correct and if not,
  doing a case insensitive check
- If the category is not correct after the 2 first
  tests, we simply delete it from the attribute
  and pymisp will give the attribute a default
  category value based on the atttribute type, at
  the creation of the attribute
pull/360/head
chrisr3d 2 years ago
parent
commit
cf5ad29f27
No known key found for this signature in database GPG Key ID: 6BBED1B63A6D639F
  1. 67
      misp_modules/modules/import_mod/csvimport.py

67
misp_modules/modules/import_mod/csvimport.py

@ -34,7 +34,7 @@ misp_extended_csv_header = misp_standard_csv_header + misp_context_additional_fi
class CsvParser():
def __init__(self, header, has_header, delimiter, data, from_misp, MISPtypes):
def __init__(self, header, has_header, delimiter, data, from_misp, MISPtypes, categories):
self.misp_event = MISPEvent()
self.header = header
self.has_header = has_header
@ -42,11 +42,16 @@ class CsvParser():
self.data = data
self.from_misp = from_misp
self.MISPtypes = MISPtypes
self.categories = categories
self.fields_number = len(self.header)
self.__score_mapping = {0: self.__create_standard_misp,
self.__score_mapping = {0: self.__create_standard_attribute,
1: self.__create_attribute_with_ids,
2: self.__create_attribute_with_tags,
3: self.__create_attribute_with_ids_and_tags}
3: self.__create_attribute_with_ids_and_tags,
4: self.__create_attribute_check_category,
5: self.__create_attribute_check_category_and_ids,
6: self.__create_attribute_check_category_and_tags,
7: self.__create_attribute_check_category_with_ids_and_tags}
def parse_csv(self):
if self.from_misp:
@ -165,35 +170,68 @@ class CsvParser():
# Utility functions #
################################################################################
def __create_attribute_check_category(self, line, indexes):
attribute = self.__create_standard_attribute(line, indexes)
self.__check_category(attribute)
return attribute
def __create_attribute_check_category_and_ids(self, line, indexes):
attribute = self.__create_attribute_with_ids(line, indexes)
self.__check_category(attribute)
return attribute
def __create_attribute_check_category_and_tags(self, line, indexes):
attribute = self.__create_attribute_with_tags(line, indexes)
self.__check_category(attribute)
return attribute
def __create_attribute_check_category_with_ids_and_tags(self, line, indexes):
attribute = self.__create_attribute_with_ids_and_tags(line, indexes)
self.__check_category(attribute)
return attribute
def __create_attribute_with_ids(self, line, indexes):
attribute = self.__create_standard_misp(line, indexes)
return self.__deal_with_ids(attribute)
attribute = self.__create_standard_attribute(line, indexes)
self.__deal_with_ids(attribute)
return attribute
def __create_attribute_with_ids_and_tags(self, line, indexes):
attribute = self.__deal_with_ids(self.__create_standard_misp(line, indexes))
return self.__deal_with_tags(attribute)
attribute = self.__create_standard_attribute(line, indexes)
self.__deal_with_ids(attribute)
self.__deal_with_tags(attribute)
return attribute
def __create_attribute_with_tags(self, line, indexes):
attribute = self.__create_standard_misp(line, indexes)
return self.__deal_with_tags(attribute)
attribute = self.__create_standard_attribute(line, indexes)
self.__deal_with_tags(attribute)
return attribute
def __create_standard_misp(self, line, indexes):
def __create_standard_attribute(self, line, indexes):
return {self.header[index]: line[index] for index in indexes if line[index]}
def __check_category(self, attribute):
category = attribute['category']
if category in self.categories:
return
if category.capitalize() in self.categories:
attribute['category'] = category.capitalize()
return
del attribute['category']
@staticmethod
def __deal_with_ids(attribute):
attribute['to_ids'] = True if attribute['to_ids'] == '1' else False
return attribute
@staticmethod
def __deal_with_tags(attribute):
attribute['Tag'] = [{'name': tag.strip()} for tag in attribute['Tag'].split(',')]
return attribute
def __get_score(self):
score = 1 if 'to_ids' in self.header else 0
if 'attribute_tag' in self.header:
score += 2
if 'category' in self.header:
score += 4
return score
def __finalize_results(self):
@ -241,7 +279,8 @@ def handler(q=False):
header = misp_standard_csv_header
descFilename = os.path.join(pymisp_path[0], 'data/describeTypes.json')
with open(descFilename, 'r') as f:
MISPtypes = json.loads(f.read())['result'].get('types')
description = json.loads(f.read())['result']
MISPtypes = description['types']
for h in header:
if not any((h in MISPtypes, h in misp_extended_csv_header, h in ('', ' ', '_', 'object_id'))):
misperrors['error'] = 'Wrong header field: {}. Please use a header value that can be recognized by MISP (or alternatively skip it using a whitespace).'.format(h)
@ -256,7 +295,7 @@ def handler(q=False):
wrong_types = tuple(wrong_type for wrong_type in ('type', 'value') if wrong_type in header)
misperrors['error'] = 'Error with the following header: {}. It contains the following field(s): {}, which is(are) already provided by the usage of at least on MISP attribute type in the header.'.format(header, 'and'.join(wrong_types))
return misperrors
csv_parser = CsvParser(header, has_header, delimiter, data, from_misp, MISPtypes)
csv_parser = CsvParser(header, has_header, delimiter, data, from_misp, MISPtypes, description['categories'])
# build the attributes
result = csv_parser.parse_csv()
if 'error' in result:

Loading…
Cancel
Save