mirror of https://github.com/MISP/MISP
fix: [stix2 import] Checking if attack-pattern, course-of-action and vulnerability names are known galaxies before importing them as MISP object
parent
f7d5bc272b
commit
7692e7f8ae
|
@ -74,7 +74,9 @@ class StixParser():
|
|||
attribute_distribution = int(attribute_distribution) if attribute_distribution.isdigit() else 5
|
||||
except IndexError:
|
||||
attribute_distribution = 5
|
||||
self._synonyms_to_tag_names = args[2] if len(args) > 2 else '/var/www/MISP/app/files/scripts/synonymsToTagNames.json'
|
||||
synonyms_to_tag_names = args[2] if len(args) > 2 else '/var/www/MISP/app/files/scripts/synonymsToTagNames.json'
|
||||
with open(synonyms_to_tag_names, 'rt', encoding='utf-8') as f:
|
||||
self._synonyms_to_tag_names = json.loads(f.read())
|
||||
self.parse_event(event)
|
||||
|
||||
def _load_galaxy(self, galaxy):
|
||||
|
@ -102,11 +104,6 @@ class StixParser():
|
|||
except AttributeError:
|
||||
self.report = {report['id'].split('--')[1]: report}
|
||||
|
||||
def _load_synonyms_to_tag_names(self):
|
||||
with open(self._synonyms_to_tag_names, 'rt', encoding='utf-8') as f:
|
||||
synonyms_to_tag_names = json.loads(f.read())
|
||||
self._synonyms_to_tag_names = synonyms_to_tag_names
|
||||
|
||||
def save_file(self):
|
||||
event = self.misp_event.to_json()
|
||||
with open(f'{self.filename}.stix2', 'wt', encoding='utf-8') as f:
|
||||
|
@ -474,12 +471,12 @@ class StixFromMISPParser(StixParser):
|
|||
|
||||
def parse_galaxy(self, galaxy):
|
||||
if hasattr(galaxy, 'labels'):
|
||||
return tuple(label for label in galaxy.labels if label.startswith('misp-galaxy:'))
|
||||
return [label for label in galaxy.labels if label.startswith('misp-galaxy:')]
|
||||
try:
|
||||
return tuple(self._get_tag_names_from_synonym(galaxy.name))
|
||||
return self._synonyms_to_tag_names[name]
|
||||
except KeyError:
|
||||
print(f'Unknown {galaxy._type} name: {galaxy.name}', file=sys.stderr)
|
||||
return tuple()
|
||||
return [f'misp-galaxy:{galaxy._type}="{galaxy.name}"']
|
||||
|
||||
def parse_indicator_attribute(self, indicator):
|
||||
attribute = self.create_attribute_dict(indicator)
|
||||
|
@ -1132,13 +1129,6 @@ class StixFromMISPParser(StixParser):
|
|||
def get_misp_type(labels):
|
||||
return labels[0].split('=')[1].strip('"')
|
||||
|
||||
def _get_tag_names_from_synonym(self, name):
|
||||
try:
|
||||
return self._synonyms_to_tag_names[name]
|
||||
except TypeError:
|
||||
self._load_synonyms_to_tag_names()
|
||||
return self._synonyms_to_tag_names[name]
|
||||
|
||||
@staticmethod
|
||||
def parse_attribute_pattern(pattern):
|
||||
if ' AND ' in pattern:
|
||||
|
@ -1203,13 +1193,9 @@ class ExternalStixParser(StixParser):
|
|||
self.handle_markings()
|
||||
|
||||
def parse_galaxy(self, galaxy):
|
||||
try:
|
||||
galaxy_name = self._check_existing_galaxy_name(galaxy.name)
|
||||
except AttributeError:
|
||||
self._load_synonyms_to_tag_names()
|
||||
galaxy_name = self._check_existing_galaxy_name(galaxy.name)
|
||||
if galaxy_name:
|
||||
return galaxy_name
|
||||
galaxy_names = self._check_existing_galaxy_name(galaxy.name)
|
||||
if galaxy_names is not None:
|
||||
return galaxy_names
|
||||
return [f'misp-galaxy:{galaxy._type}="{galaxy.name}"']
|
||||
|
||||
def _parse_indicator(self, indicator):
|
||||
|
@ -1259,21 +1245,29 @@ class ExternalStixParser(StixParser):
|
|||
misp_object.add_attribute(**attribute)
|
||||
|
||||
def parse_attack_pattern(self, attack_pattern):
|
||||
misp_object = self.create_misp_object(attack_pattern)
|
||||
if hasattr(attack_pattern, 'external_references'):
|
||||
for reference in attack_pattern.external_references:
|
||||
source_name = reference['source_name']
|
||||
value = reference['external_id'].split('-')[1] if source_name == 'capec' else reference['url']
|
||||
attribute = deepcopy(stix2misp_mapping.attack_pattern_references_mapping[source_name]) if source_name in stix2misp_mapping.attack_pattern_references_mapping else stix2misp_mapping.references_attribute_mapping
|
||||
attribute['value'] = value
|
||||
misp_object.add_attribute(**attribute)
|
||||
self.fill_misp_object(misp_object, attack_pattern, 'attack_pattern_mapping')
|
||||
self.misp_event.add_object(**misp_object)
|
||||
galaxy_names = self._check_existing_galaxy_name(attack_pattern.name)
|
||||
if galaxy_names is not None:
|
||||
self.galaxy[attack_pattern['id'].split('--')[1]] = {'tag_names': galaxy_names, 'used': False}
|
||||
else:
|
||||
misp_object = self.create_misp_object(attack_pattern)
|
||||
if hasattr(attack_pattern, 'external_references'):
|
||||
for reference in attack_pattern.external_references:
|
||||
source_name = reference['source_name']
|
||||
value = reference['external_id'].split('-')[1] if source_name == 'capec' else reference['url']
|
||||
attribute = deepcopy(stix2misp_mapping.attack_pattern_references_mapping[source_name]) if source_name in stix2misp_mapping.attack_pattern_references_mapping else stix2misp_mapping.references_attribute_mapping
|
||||
attribute['value'] = value
|
||||
misp_object.add_attribute(**attribute)
|
||||
self.fill_misp_object(misp_object, attack_pattern, 'attack_pattern_mapping')
|
||||
self.misp_event.add_object(**misp_object)
|
||||
|
||||
def parse_course_of_action(self, course_of_action):
|
||||
misp_object = self.create_misp_object(course_of_action)
|
||||
self.fill_misp_object(misp_object, course_of_action, 'course_of_action_mapping')
|
||||
self.misp_event.add_object(**misp_object)
|
||||
galaxy_names = self._check_existing_galaxy_name(course_of_action.name)
|
||||
if galaxy_names is not None:
|
||||
self.galaxy[course_of_action['id'].split('--')[1]] = {'tag_names': galaxy_names, 'used': False}
|
||||
else:
|
||||
misp_object = self.create_misp_object(course_of_action)
|
||||
self.fill_misp_object(misp_object, course_of_action, 'course_of_action_mapping')
|
||||
self.misp_event.add_object(**misp_object)
|
||||
|
||||
def parse_usual_indicator(self, indicator, separator):
|
||||
pattern = tuple(part.strip() for part in indicator.pattern.strip('[]').split(separator))
|
||||
|
@ -1285,16 +1279,20 @@ class ExternalStixParser(StixParser):
|
|||
self.add_stix2_pattern_object(indicator)
|
||||
|
||||
def parse_vulnerability(self, vulnerability):
|
||||
attributes = self._get_attributes_from_observable(vulnerability, 'vulnerability_mapping')
|
||||
if hasattr(vulnerability, 'external_references'):
|
||||
for reference in vulnerability.external_references:
|
||||
if reference['source_name'] == 'url':
|
||||
attribute = deepcopy(stix2misp_mapping.references_attribute_mapping)
|
||||
attribute['value'] = reference['url']
|
||||
attributes.append(attribute)
|
||||
if len(attributes) == 1 and attributes[0]['object_relation'] == 'id':
|
||||
attributes[0]['type'] = 'vulnerability'
|
||||
self.handle_import_case(vulnerability, attributes, 'vulnerability')
|
||||
galaxy_names = self._check_existing_galaxy_name(vulnerability.name)
|
||||
if galaxy_names is not None:
|
||||
self.galaxy[vulnerability['id'].split('--')[1]] = {'tag_names': galaxy_names, 'used': False}
|
||||
else:
|
||||
attributes = self._get_attributes_from_observable(vulnerability, 'vulnerability_mapping')
|
||||
if hasattr(vulnerability, 'external_references'):
|
||||
for reference in vulnerability.external_references:
|
||||
if reference['source_name'] == 'url':
|
||||
attribute = deepcopy(stix2misp_mapping.references_attribute_mapping)
|
||||
attribute['value'] = reference['url']
|
||||
attributes.append(attribute)
|
||||
if len(attributes) == 1 and attributes[0]['object_relation'] == 'id':
|
||||
attributes[0]['type'] = 'vulnerability'
|
||||
self.handle_import_case(vulnerability, attributes, 'vulnerability')
|
||||
|
||||
################################################################################
|
||||
## OBSERVABLE PARSING FUNCTIONS ##
|
||||
|
|
Loading…
Reference in New Issue