mirror of https://github.com/MISP/MISP
fix: [stix2 import] Cleaned up duplicate function & Fixed external STIX files parsing
- External STIX files parsing improvement to comepull/3725/head
parent
81f93d61e1
commit
02fe24b8fd
|
@ -75,6 +75,34 @@ class StixParser():
|
|||
self.buildMISPDict()
|
||||
self.set_distribution()
|
||||
|
||||
def buildMISPDict(self):
|
||||
report_attributes = defaultdict(set)
|
||||
for _, report in self.event['report'].items():
|
||||
report_attributes['orgs'].add(report['created_by_ref'].split('--')[1])
|
||||
report_attributes['name'].add(report['name'])
|
||||
if report.get('published'):
|
||||
report_attributes['published'].add(report['published'])
|
||||
if 'labels' in report:
|
||||
report_attributes['labels'].update([l for l in report['labels']])
|
||||
if 'external_references' in report:
|
||||
self.add_links(report['external_references'])
|
||||
for ref in report['object_refs']:
|
||||
if 'relationship' not in ref:
|
||||
object_type, uuid = ref.split('--')
|
||||
object2parse = self.event[object_type][uuid]
|
||||
self.process_parsing(object2parse, object_type)
|
||||
if len(report_attributes['orgs']) == 1:
|
||||
identity = self.event['identity'][report_attributes['orgs'].pop()]
|
||||
self.misp_event['Org'] = {'name': identity['name']}
|
||||
if len(report_attributes['published']) == 1:
|
||||
self.misp_event.publish_timestamp = self.getTimestampfromDate(report_attributes['published'].pop())
|
||||
if len(report_attributes['name']) == 1:
|
||||
self.misp_event.info = report_attributes['name'].pop()
|
||||
else:
|
||||
self.misp_event.info = "Imported with MISP import script for {}.".format(self.stix_version)
|
||||
for l in report_attributes['labels']:
|
||||
self.misp_event.add_tag(l)
|
||||
|
||||
def set_distribution(self):
|
||||
for attribute in self.misp_event.attributes:
|
||||
attribute.distribution = self._attribute_distribution
|
||||
|
@ -89,6 +117,19 @@ class StixParser():
|
|||
with open(outputfile, 'w') as f:
|
||||
f.write(eventDict)
|
||||
|
||||
def add_links(self, refs):
|
||||
for e in refs:
|
||||
link = {"type": "link"}
|
||||
comment = e.get('source_name')
|
||||
try:
|
||||
comment = comment.split('url - ')[1]
|
||||
except IndexError:
|
||||
pass
|
||||
if comment:
|
||||
link['comment'] = comment
|
||||
link['value'] = e.get('url')
|
||||
self.misp_event.add_attribute(**link)
|
||||
|
||||
@staticmethod
|
||||
def getTimestampfromDate(stix_date):
|
||||
try:
|
||||
|
@ -104,6 +145,18 @@ class StixParser():
|
|||
def get_misp_category(labels):
|
||||
return labels[1].split('=')[1][1:-1]
|
||||
|
||||
def parse_course_of_action(self, o):
|
||||
misp_object = MISPObject('course-of-action')
|
||||
if 'name' in o:
|
||||
attribute = {'type': 'text', 'object_relation': 'name', 'value': o.get('name')}
|
||||
misp_object.add_attribute(**attribute)
|
||||
else:
|
||||
return
|
||||
if 'description' in o:
|
||||
attribute = {'type': 'text', 'object_relation': 'description', 'value': o.get('description')}
|
||||
misp_object.add_attribute(**attribute)
|
||||
self.misp_event.add_object(**misp_object)
|
||||
|
||||
@staticmethod
|
||||
def parse_observable(observable, attribute_type):
|
||||
return misp_types_mapping[attribute_type](observable, attribute_type)
|
||||
|
@ -150,57 +203,14 @@ class StixFromMISPParser(StixParser):
|
|||
'url': {'observable': observable_url, 'pattern': pattern_url},
|
||||
'WindowsPEBinaryFile': {'observable': self.observable_pe, 'pattern': self.pattern_pe},
|
||||
'x509': {'observable': observable_x509, 'pattern': pattern_x509}}
|
||||
self.object_from_refs = {'course-of-action': self.parse_course_of_action, 'vulnerability': self.parse_vulnerability,
|
||||
self.object_from_refs = {'course-of-action': self.parse_MISP_course_of_action, 'vulnerability': self.parse_vulnerability,
|
||||
'x-misp-object': self.parse_custom}
|
||||
self.object_from_refs.update(dict.fromkeys(list(galaxy_types.keys()), self.parse_galaxy))
|
||||
self.object_from_refs.update(dict.fromkeys(['indicator', 'observed-data'], self.parse_usual_object))
|
||||
|
||||
def buildMISPDict(self):
|
||||
report_attributes = defaultdict(list)
|
||||
orgs = []
|
||||
for _, report in self.event['report'].items():
|
||||
org_uuid = report['created_by_ref'].split('--')[1]
|
||||
if org_uuid not in orgs:
|
||||
orgs.append(org_uuid)
|
||||
report_name = report['name']
|
||||
if report_name not in orgs:
|
||||
report_attributes['name'].append(report_name)
|
||||
if report.get('published'):
|
||||
report_attributes['published'].append(report['published'])
|
||||
if 'labels' in report:
|
||||
report_attributes['labels'].extend([l for l in report['labels'] if l not in report_attributes['labels']])
|
||||
if 'external_references' in report:
|
||||
self.add_links(report['external_references'])
|
||||
for ref in report['object_refs']:
|
||||
if 'relationship' not in ref:
|
||||
object_type, uuid = ref.split('--')
|
||||
object2parse = self.event[object_type][uuid]
|
||||
labels = object2parse.get('labels')
|
||||
self.object_from_refs[object_type](object2parse, labels)
|
||||
if len(orgs) == 1:
|
||||
identity = self.event['identity'][orgs[0]]
|
||||
self.misp_event['Org'] = {'name': identity['name']}
|
||||
if len(report_attributes['published']) == 1:
|
||||
self.misp_event.publish_timestamp = self.getTimestampfromDate(report_attributes['published'][0])
|
||||
if len(report_attributes['name']) == 1:
|
||||
self.misp_event.info = report_attributes['name'][0]
|
||||
else:
|
||||
self.misp_event.info = "Imported from MISP import for STIX 2.0 script."
|
||||
for l in report_attributes['labels']:
|
||||
self.misp_event.add_tag(l)
|
||||
|
||||
def add_links(self, refs):
|
||||
for e in refs:
|
||||
link = {"type": "link"}
|
||||
comment = e.get('source_name')
|
||||
try:
|
||||
comment = comment.split('url - ')[1]
|
||||
except IndexError:
|
||||
pass
|
||||
if comment:
|
||||
link['comment'] = comment
|
||||
link['value'] = e.get('url')
|
||||
self.misp_event.add_attribute(**link)
|
||||
def process_parsing(self, object2parse, object_type):
|
||||
labels = object2parse.get('labels')
|
||||
self.object_from_refs[object_type](object2parse, labels)
|
||||
|
||||
def parse_usual_object(self, o, labels):
|
||||
if 'from_object' in labels:
|
||||
|
@ -219,17 +229,8 @@ class StixFromMISPParser(StixParser):
|
|||
'description': cluster_description, 'uuid': uuid}]}
|
||||
self.misp_event['Galaxy'].append(galaxy)
|
||||
|
||||
def parse_course_of_action(self, o, _):
|
||||
misp_object = MISPObject('course-of-action')
|
||||
if 'name' in o:
|
||||
attribute = {'type': 'text', 'object_relation': 'name', 'value': o.get('name')}
|
||||
misp_object.add_attribute(**attribute)
|
||||
else:
|
||||
return
|
||||
if 'description' in o:
|
||||
attribute = {'type': 'text', 'object_relation': 'description', 'value': o.get('description')}
|
||||
misp_object.add_attribute(**attribute)
|
||||
self.misp_event.add_object(**misp_object)
|
||||
def parse_MISP_course_of_action(self, o, _):
|
||||
self.parse_course_of_action(o)
|
||||
|
||||
def parse_custom(self, o, labels):
|
||||
if 'from_object' in labels:
|
||||
|
@ -541,46 +542,22 @@ class ExternalStixParser(StixParser):
|
|||
def __init__(self):
|
||||
super(ExternalStixParser, self).__init__()
|
||||
self.version_attribute = {'type': 'text', 'object_relation': 'version', 'value': self.stix_version}
|
||||
self.object_from_refs = {'course-of-action': self.parse_course_of_action, 'vulnerability': self.parse_external_vulnerability,
|
||||
'indicator': self.parse_external_indicator, 'observed-data': self.parse_external_observable}
|
||||
self.object_from_refs.update(dict.fromkeys(list(galaxy_types.keys()), self.parse_external_galaxy))
|
||||
|
||||
def buildMISPDict(self):
|
||||
self.fetch_report()
|
||||
for o in self.event:
|
||||
object_type = o._type
|
||||
if object_type in ('relationship', 'report'):
|
||||
continue
|
||||
if object_type in galaxy_types:
|
||||
self.parse_external_galaxy(o)
|
||||
elif object_type == 'vulnerability':
|
||||
attribute = {'type': 'vulnerability', 'value': o.get('name')}
|
||||
if 'description' in o:
|
||||
attribute['comment'] = o.get('description')
|
||||
self.misp_event.add_attribute(**attribute)
|
||||
elif object_type == 'course-of-action':
|
||||
self.parse_course_of_action(o)
|
||||
elif object_type == 'indicator':
|
||||
pattern = o.get('pattern')
|
||||
self.parse_external_pattern(pattern)
|
||||
attribute = {'type': 'stix2-pattern', 'object_relation': 'stix2-pattern', 'value': pattern}
|
||||
misp_object = {'name': 'stix2-pattern', 'meta-category': 'stix2-pattern',
|
||||
'Attribute': [self.version_attribute, attribute]}
|
||||
self.misp_event.add_object(**misp_object)
|
||||
|
||||
def fetch_report(self):
|
||||
reports = []
|
||||
for o in self.event:
|
||||
if o._type == 'report':
|
||||
reports.append(o)
|
||||
if len(reports) == 1:
|
||||
self.report = reports[0]
|
||||
self.parse_report()
|
||||
def process_parsing(self, object2parse, object_type):
|
||||
self.object_from_refs[object_type](object2parse)
|
||||
|
||||
def parse_external_galaxy(self, o):
|
||||
galaxy = {'name': galaxy_types[o._type]}
|
||||
if 'kill_chain_phases' in o:
|
||||
galaxy['type'] = o['kill_chain_phases'][0].get('phase_name')
|
||||
galaxy = {'name': galaxy_types[o._type].replace('-', ' ').title()}
|
||||
cluster = defaultdict(dict)
|
||||
cluster['value'] = o.get('name')
|
||||
cluster['description'] = o.get('description')
|
||||
if 'kill_chain_phases' in o:
|
||||
galaxy_type = o['kill_chain_phases'][0].get('phase_name')
|
||||
galaxy['type'] = galaxy_type
|
||||
cluster['type'] = galaxy_type
|
||||
if 'aliases' in o:
|
||||
aliases = []
|
||||
for a in o.get('aliases'):
|
||||
|
@ -589,6 +566,18 @@ class ExternalStixParser(StixParser):
|
|||
galaxy['GalaxyCluster'] = [cluster]
|
||||
self.misp_event['Galaxy'].append(galaxy)
|
||||
|
||||
def parse_external_indicator(self, o):
|
||||
pattern = o.get('pattern')
|
||||
self.parse_external_pattern(pattern)
|
||||
attribute = {'type': 'stix2-pattern', 'object_relation': 'stix2-pattern', 'value': pattern}
|
||||
misp_object = {'name': 'stix2-pattern', 'meta-category': 'stix2-pattern',
|
||||
'Attribute': [self.version_attribute, attribute]}
|
||||
self.misp_event.add_object(**misp_object)
|
||||
|
||||
def parse_external_observable(self, o):
|
||||
observable = o.get('objects')
|
||||
# deeper analyse to come, as well as for indicators
|
||||
|
||||
def parse_external_pattern(self, pattern):
|
||||
if ' OR ' in pattern and ' AND ' not in pattern:
|
||||
pattern = pattern.split('OR')
|
||||
|
@ -601,6 +590,12 @@ class ExternalStixParser(StixParser):
|
|||
attribute = self.attribute_from_external_pattern(pattern[0])
|
||||
self.misp_event.add_attribute(**attribute)
|
||||
|
||||
def parse_external_vulnerability(self, o):
|
||||
attribute = {'type': 'vulnerability', 'value': o.get('name')}
|
||||
if 'description' in o:
|
||||
attribute['comment'] = o.get('description')
|
||||
self.misp_event.add_attribute(**attribute)
|
||||
|
||||
@staticmethod
|
||||
def attribute_from_external_pattern(pattern):
|
||||
pattern_type, pattern_value = pattern.split(' = ')
|
||||
|
|
Loading…
Reference in New Issue