From e32ef1b0c68d1e47478f5f6e845de4823087b024 Mon Sep 17 00:00:00 2001 From: chrisr3d Date: Thu, 4 Jul 2019 14:30:54 +0200 Subject: [PATCH] fix: [stix2 import] Reusing function & avoiding duplicates --- app/files/scripts/stix2/stix2misp.py | 83 +++++++++++----------------- 1 file changed, 33 insertions(+), 50 deletions(-) diff --git a/app/files/scripts/stix2/stix2misp.py b/app/files/scripts/stix2/stix2misp.py index 46b13e0ed..dd4930f50 100644 --- a/app/files/scripts/stix2/stix2misp.py +++ b/app/files/scripts/stix2/stix2misp.py @@ -244,6 +244,22 @@ class StixParser(): attributes.extend(self.fill_observable_attributes(traffic, network_traffic_mapping)) return attributes + def attributes_from_network_traffic(self, objects, name=None): + network_traffic, references = self.fetch_network_traffic_objects_and_references(objects) + attributes = self.fill_observable_attributes(network_traffic, network_traffic_mapping) + if hasattr(network_traffic, 'extensions') and network_traffic.extensions: + extension_type, extension_value = list(network_traffic.extensions.items())[0] + name = network_traffic_extensions[extension_type] + attributes.extend(self.fill_observable_attributes(extension_value, network_traffic_mapping)) + mapping = network_traffic_references_mapping['with_extensions'] + else: + name = 'ip-port' + mapping = network_traffic_references_mapping['without_extensions'] + attributes.extend(self.parse_network_traffic_references(references, network_traffic, mapping)) + if references: + attributes.extend(self.parse_remaining_references(references, mapping)) + return attributes, name + def attributes_from_process_observable(self, objects): main_process = None for _object in objects.values(): @@ -306,13 +322,6 @@ class StixParser(): file = value return file, data - @staticmethod - def fetch_network_traffic_objects(objects): - network_traffics = [] - for value in objects.values(): - if isinstance(value, stix2.NetworkTraffic): - return value - @staticmethod def fetch_network_traffic_objects_and_references(objects): references = defaultdict(dict) @@ -376,15 +385,23 @@ class StixParser(): self.misp_event.add_object(**misp_object) @staticmethod - def parse_network_traffic_references(objects, network_traffic, mapping, attr='get'): + def __parse_network_traffic_reference(ref_object, ref, mapping): + origin = ref.split('_')[0] + misp_type, relation = mapping[ref_object._type] + return {'type': misp_type.format(origin), 'object_relation': relation.format(origin), + 'to_ids': False, 'value': ref_object.value} + + def parse_network_traffic_references(self, objects, network_traffic, mapping): attributes= [] for ref in ('src_ref', 'dst_ref'): if hasattr(network_traffic, ref): - ref_object = getattr(objects, attr)(getattr(network_traffic, ref)) - origin = ref.split('_')[0] - misp_type, relation = mapping[ref_object._type] - attributes.append({'type': misp_type.format(origin), 'object_relation': relation.format(origin), - 'to_ids': False, 'value': ref_object.value}) + ref_object = objects.pop(getattr(network_traffic, ref)) + attributes.append(self.__parse_network_traffic_reference(ref_object, ref, mapping)) + for refs in ('src_refs', 'dst_refs'): + if hasattr(network_traffic, refs): + for ref in getattr(network_traffic, refs): + ref_object = objects.pop(ref) + attributes.append(self.__parse_network_traffic_reference(ref_object, refs, mapping)) return attributes def parse_pe(self, extension): @@ -1213,17 +1230,7 @@ class ExternalStixParser(StixParser): self.add_attributes_from_pattern('ip-dst', pattern, marking, uuid) def parse_ip_network_traffic_observable(self, objects, marking, uuid): - network_traffic = self.fetch_network_traffic_objects(objects) - attributes = self.fill_observable_attributes(network_traffic, network_traffic_mapping) - if hasattr(network_traffic, 'extensions') and network_traffic.extensions: - extension_type, extension_value = list(network_traffic.extensions.items())[0] - name = network_traffic_extensions[extension_type] - attributes.extend(self.fill_observable_attributes(extension_value, network_traffic_mapping)) - mapping = network_traffic_references_mapping['with_extensions'] - else: - name = 'ip-port' - mapping = network_traffic_references_mapping['without_extensions'] - attributes.extend(self.parse_network_traffic_references(objects, network_traffic, mapping)) + attributes, name = self.attributes_from_network_traffic(objects) self.handle_import_case(attributes, name, marking, uuid) def parse_ip_port_observable(self, objects, marking, uuid): @@ -1231,19 +1238,7 @@ class ExternalStixParser(StixParser): self.handle_import_case(attributes, 'ip-port', marking, uuid) def parse_ip_port_or_network_socket_observable(self, objects, marking, uuid): - network_traffic, references = self.fetch_network_traffic_objects_and_references(objects) - attributes = self.fill_observable_attributes(network_traffic, network_traffic_mapping) - if hasattr(network_traffic, 'extensions') and network_traffic.extensions: - extension_type, extension_value = list(network_traffic.extensions.items())[0] - name = network_traffic_extensions[extension_type] - attributes.extend(self.fill_observable_attributes(extension_value, network_traffic_mapping)) - mapping = network_traffic_references_mapping['with_extensions'] - else: - name = 'ip-port' - mapping = network_traffic_references_mapping['without_extensions'] - attributes.extend(self.parse_network_traffic_references(references, network_traffic, mapping, attr='pop')) - if references: - attributes.extend(self.parse_remaining_references(references, mapping)) + attributes, name = self.attributes_from_network_traffic(objects) self.handle_import_case(attributes, name, marking, uuid) def parse_mac_address_observable(self, objects, marking, uuid): @@ -1253,19 +1248,7 @@ class ExternalStixParser(StixParser): self.add_attributes_from_observable(objects, 'mutex', 'name', marking, uuid) def parse_network_socket_observable(self, objects, marking, uuid): - network_traffic, references = self.fetch_network_traffic_objects_and_references(objects) - attributes = self.fill_observable_attributes(network_traffic, network_traffic_mapping) - if hasattr(network_traffic, 'extensions') and network_traffic.extensions: - extension_type, extension_value = list(network_traffic.extensions.items())[0] - name = network_traffic_extensions[extension_type] - attributes.extend(self.fill_observable_attributes(extension_value, network_traffic_mapping)) - mapping = network_traffic_references_mapping['with_extensions'] - else: - name = 'ip-port' - mapping = network_traffic_references_mapping['without_extensions'] - attributes.extend(self.parse_network_traffic_references(objects, network_traffic, mapping, attr='pop')) - if references: - attributes.extend(self.parse_remaining_references(references, mapping)) + attributes, name = self.attributes_from_network_traffic(objects) self.handle_import_case(attributes, name, marking, uuid) def parse_network_traffic_pattern(self, pattern, marking=None, uuid=None):