fix: [stix2 export] Cleaned up MISP objects parsing

- Replaced multiple if statements in a for loop by
  a dictionary mapping
pull/3725/head
chrisr3d 2018-09-17 16:39:13 +02:00
parent 28dac6a7c2
commit 4aa861b227
No known key found for this signature in database
GPG Key ID: 6BBED1B63A6D639F
2 changed files with 71 additions and 56 deletions

View File

@ -108,39 +108,18 @@ class StixBuilder():
self.add_custom(attribute)
if hasattr(self.misp_event, 'objects') and self.misp_event.objects:
self.load_objects_mapping()
objects_to_parse = defaultdict(dict)
self.objects_to_parse = defaultdict(dict)
misp_objects = self.misp_event.objects
self.object_references, self.processes = self.fetch_object_references(misp_objects)
for misp_object in misp_objects:
to_ids = self.fetch_ids_flag(misp_object.attributes)
name = misp_object.name
if name == "vulnerability":
self.add_object_vulnerability(misp_object, to_ids)
elif name == "course-of-action":
self.add_course_of_action(misp_object, from_object=True)
elif name in ('pe', 'pe-section'):
objects_to_parse[name][misp_object.uuid] = to_ids, misp_object
elif name in objectsMapping:
if name == 'file' and misp_object.references:
to_parse = False
for reference in misp_object.references:
if reference.relationship_type == 'included-in' and reference.Object['name'] == "pe":
objects_to_parse[name][misp_object.uuid] = to_ids, misp_object
to_parse = True
break
if to_parse:
continue
try:
if to_ids or name == "stix2-pattern":
self.add_object_indicator(misp_object)
else:
self.add_object_observable(misp_object)
except:
self.add_object_custom(misp_object, to_ids)
else:
try:
getattr(self, objectsMapping[name]['to_call'])(misp_object, to_ids)
except KeyError:
self.add_object_custom(misp_object, to_ids)
if objects_to_parse:
self.resolve_objects2parse(objects_to_parse)
if self.objects_to_parse:
self.resolve_objects2parse()
if hasattr(self.misp_event, 'Galaxy') and self.misp_event.Galaxy:
for galaxy in self.misp_event.Galaxy:
self.parse_galaxy(galaxy, self.report_id)
@ -236,6 +215,21 @@ class StixBuilder():
except:
self.add_custom(attribute)
def handle_usual_object_name(self, misp_object, to_ids):
name = misp_object.name
if name == 'file' and misp_object.references:
for reference in misp_object.references:
if reference.relationship_type == 'included-in' and reference.Object['name'] == "pe":
self.objects_to_parse[name][misp_object.uuid] = to_ids, misp_object
return
try:
if to_ids or name == "stix2-pattern":
self.add_object_indicator(misp_object)
else:
self.add_object_observable(misp_object)
except:
self.add_object_custom(misp_object, to_ids)
def handle_link(self, attribute):
url = attribute.value
source = "url"
@ -247,8 +241,11 @@ class StixBuilder():
link = {'source_name': source, 'url': url}
self.external_refs.append(link)
def resolve_objects2parse(self, objects2parse):
for uuid, misp_object in objects2parse['file'].items():
def populate_objects_to_parse(self, misp_object, to_ids):
self.objects_to_parse[misp_object.name][misp_object.uuid] = to_ids, misp_object
def resolve_objects2parse(self):
for uuid, misp_object in self.objects2parse['file'].items():
to_ids_file, file_object = misp_object
file_id = "file--{}".format(file_object.uuid)
to_ids_list = [to_ids_file]
@ -257,12 +254,12 @@ class StixBuilder():
if reference.relationship_type == "included-in" and reference.Object['name'] == "pe":
pe_uuid = reference.referenced_uuid
break
to_ids_pe, pe_object = objects2parse['pe'][pe_uuid]
to_ids_pe, pe_object = self.objects2parse['pe'][pe_uuid]
to_ids_list.append(to_ids_pe)
sections = []
for reference in pe_object.references:
if reference.Object['name'] == "pe-section":
to_ids_section, section_object = objects2parse['pe-section'][reference.referenced_uuid]
to_ids_section, section_object = self.objects2parse['pe-section'][reference.referenced_uuid]
to_ids_list.append(to_ids_section)
sections.append(section_object)
if True in to_ids_list:
@ -366,21 +363,25 @@ class StixBuilder():
attack_pattern = AttackPattern(**a_p_args)
self.append_object(attack_pattern, a_p_id)
def add_course_of_action(self, misp_object, from_object=False):
if from_object:
coa_id = 'course-of-action--{}'.format(misp_object.uuid)
coa_args = {'id': coa_id, 'type': 'course-of-action'}
for attribute in misp_object.attributes:
self.parse_galaxies(attribute.Galaxy, coa_id)
relation = attribute.object_relation
if relation == 'name':
coa_args['name'] = attribute.value
elif relation == 'description':
coa_args['description'] = attribute.value
if not 'name' in coa_args:
return
else:
coa_args, coa_id = self.generate_galaxy_args(misp_object, False, False, 'course-of-action')
def add_course_of_action(self, misp_object):
coa_args, coa_id = self.generate_galaxy_args(misp_object, False, False, 'course-of-action')
self.add_coa_stix_object(coa_args, coa_id)
def add_course_of_action_from_object(self, misp_object, _):
coa_id = 'course-of-action--{}'.format(misp_object.uuid)
coa_args = {'id': coa_id, 'type': 'course-of-action'}
for attribute in misp_object.attributes:
self.parse_galaxies(attribute.Galaxy, coa_id)
relation = attribute.object_relation
if relation == 'name':
coa_args['name'] = attribute.value
elif relation == 'description':
coa_args['description'] = attribute.value
if not 'name' in coa_args:
return
self.add_coa_stix_object(coa_args, coa_id)
def add_coa_stix_object(self, coa_args):
coa_args['created_by_ref'] = self.identity_id
course_of_action = CourseOfAction(**coa_args)
self.append_object(course_of_action, coa_id)

View File

@ -272,21 +272,35 @@ network_traffic_pattern = "network-traffic:{0} = '{1}' AND "
network_traffic_src_ref = "src_ref.type = '{0}' AND network-traffic:src_ref.value"
network_traffic_dst_ref = "dst_ref.type = '{0}' AND network-traffic:dst_ref.value"
objectsMapping = {'asn': {'observable': {'type': 'autonomous-system'},
objectsMapping = {'asn': {'to_call': 'handle_usual_object_name',
'observable': {'type': 'autonomous-system'},
'pattern': "autonomous-system:{0} = '{1}' AND "},
'domain-ip': {'pattern': "domain-name:{0} = '{1}' AND "},
'email': {'observable': {'0': {'type': 'email-message'}},
'course-of-action': {'to_call': 'add_course_of_action_from_object'},
'domain-ip': {'to_call': 'handle_usual_object_name',
'pattern': "domain-name:{0} = '{1}' AND "},
'email': {'to_call': 'handle_usual_object_name',
'observable': {'0': {'type': 'email-message'}},
'pattern': "email-{0}:{1} = '{2}' AND "},
'file': {'observable': {'0': {'type': 'file', 'hashes': {}}},
'file': {'to_call': 'handle_usual_object_name',
'observable': {'0': {'type': 'file', 'hashes': {}}},
'pattern': "file:{0} = '{1}' AND "},
'ip-port': {'pattern': network_traffic_pattern},
'network-socket': {'pattern': network_traffic_pattern},
'process': {'pattern': "process:{0} = '{1}' AND "},
'registry-key': {'observable': {'0': {'type': 'windows-registry-key'}},
'ip-port': {'to_call': 'handle_usual_object_name',
'pattern': network_traffic_pattern},
'network-socket': {'to_call': 'handle_usual_object_name',
'pattern': network_traffic_pattern},
'pe': {'to_call': 'populate_objects_to_parse'},
'pe-section': {'to_call': 'populate_objects_to_parse'},
'process': {'to_call': 'handle_usual_object_name',
'pattern': "process:{0} = '{1}' AND "},
'registry-key': {'to_call': 'handle_usual_object_name',
'observable': {'0': {'type': 'windows-registry-key'}},
'pattern': "windows-registry-key:{0} = '{1}' AND "},
'url': {'observable': {'0': {'type': 'url'}},
'url': {'to_call': 'handle_usual_object_name',
'observable': {'0': {'type': 'url'}},
'pattern': "url:{0} = '{1}' AND "},
'x509': {'pattern': "x509-certificate:{0} = '{1}' AND "}
'vulnerability': {'to_call': 'add_object_vulnerability'},
'x509': {'to_call': 'handle_usual_object_name',
'pattern': "x509-certificate:{0} = '{1}' AND "}
}
asnObjectMapping = {'asn': 'number', 'description': 'name', 'subnet-announced': 'value'}