mirror of https://github.com/MISP/MISP
fix: [stix2 import] Reusing function & avoiding duplicates
parent
271d6ecc60
commit
e32ef1b0c6
|
@ -244,6 +244,22 @@ class StixParser():
|
|||
attributes.extend(self.fill_observable_attributes(traffic, network_traffic_mapping))
|
||||
return attributes
|
||||
|
||||
def attributes_from_network_traffic(self, objects, name=None):
|
||||
network_traffic, references = self.fetch_network_traffic_objects_and_references(objects)
|
||||
attributes = self.fill_observable_attributes(network_traffic, network_traffic_mapping)
|
||||
if hasattr(network_traffic, 'extensions') and network_traffic.extensions:
|
||||
extension_type, extension_value = list(network_traffic.extensions.items())[0]
|
||||
name = network_traffic_extensions[extension_type]
|
||||
attributes.extend(self.fill_observable_attributes(extension_value, network_traffic_mapping))
|
||||
mapping = network_traffic_references_mapping['with_extensions']
|
||||
else:
|
||||
name = 'ip-port'
|
||||
mapping = network_traffic_references_mapping['without_extensions']
|
||||
attributes.extend(self.parse_network_traffic_references(references, network_traffic, mapping))
|
||||
if references:
|
||||
attributes.extend(self.parse_remaining_references(references, mapping))
|
||||
return attributes, name
|
||||
|
||||
def attributes_from_process_observable(self, objects):
|
||||
main_process = None
|
||||
for _object in objects.values():
|
||||
|
@ -306,13 +322,6 @@ class StixParser():
|
|||
file = value
|
||||
return file, data
|
||||
|
||||
@staticmethod
|
||||
def fetch_network_traffic_objects(objects):
|
||||
network_traffics = []
|
||||
for value in objects.values():
|
||||
if isinstance(value, stix2.NetworkTraffic):
|
||||
return value
|
||||
|
||||
@staticmethod
|
||||
def fetch_network_traffic_objects_and_references(objects):
|
||||
references = defaultdict(dict)
|
||||
|
@ -376,15 +385,23 @@ class StixParser():
|
|||
self.misp_event.add_object(**misp_object)
|
||||
|
||||
@staticmethod
|
||||
def parse_network_traffic_references(objects, network_traffic, mapping, attr='get'):
|
||||
def __parse_network_traffic_reference(ref_object, ref, mapping):
|
||||
origin = ref.split('_')[0]
|
||||
misp_type, relation = mapping[ref_object._type]
|
||||
return {'type': misp_type.format(origin), 'object_relation': relation.format(origin),
|
||||
'to_ids': False, 'value': ref_object.value}
|
||||
|
||||
def parse_network_traffic_references(self, objects, network_traffic, mapping):
|
||||
attributes= []
|
||||
for ref in ('src_ref', 'dst_ref'):
|
||||
if hasattr(network_traffic, ref):
|
||||
ref_object = getattr(objects, attr)(getattr(network_traffic, ref))
|
||||
origin = ref.split('_')[0]
|
||||
misp_type, relation = mapping[ref_object._type]
|
||||
attributes.append({'type': misp_type.format(origin), 'object_relation': relation.format(origin),
|
||||
'to_ids': False, 'value': ref_object.value})
|
||||
ref_object = objects.pop(getattr(network_traffic, ref))
|
||||
attributes.append(self.__parse_network_traffic_reference(ref_object, ref, mapping))
|
||||
for refs in ('src_refs', 'dst_refs'):
|
||||
if hasattr(network_traffic, refs):
|
||||
for ref in getattr(network_traffic, refs):
|
||||
ref_object = objects.pop(ref)
|
||||
attributes.append(self.__parse_network_traffic_reference(ref_object, refs, mapping))
|
||||
return attributes
|
||||
|
||||
def parse_pe(self, extension):
|
||||
|
@ -1213,17 +1230,7 @@ class ExternalStixParser(StixParser):
|
|||
self.add_attributes_from_pattern('ip-dst', pattern, marking, uuid)
|
||||
|
||||
def parse_ip_network_traffic_observable(self, objects, marking, uuid):
|
||||
network_traffic = self.fetch_network_traffic_objects(objects)
|
||||
attributes = self.fill_observable_attributes(network_traffic, network_traffic_mapping)
|
||||
if hasattr(network_traffic, 'extensions') and network_traffic.extensions:
|
||||
extension_type, extension_value = list(network_traffic.extensions.items())[0]
|
||||
name = network_traffic_extensions[extension_type]
|
||||
attributes.extend(self.fill_observable_attributes(extension_value, network_traffic_mapping))
|
||||
mapping = network_traffic_references_mapping['with_extensions']
|
||||
else:
|
||||
name = 'ip-port'
|
||||
mapping = network_traffic_references_mapping['without_extensions']
|
||||
attributes.extend(self.parse_network_traffic_references(objects, network_traffic, mapping))
|
||||
attributes, name = self.attributes_from_network_traffic(objects)
|
||||
self.handle_import_case(attributes, name, marking, uuid)
|
||||
|
||||
def parse_ip_port_observable(self, objects, marking, uuid):
|
||||
|
@ -1231,19 +1238,7 @@ class ExternalStixParser(StixParser):
|
|||
self.handle_import_case(attributes, 'ip-port', marking, uuid)
|
||||
|
||||
def parse_ip_port_or_network_socket_observable(self, objects, marking, uuid):
|
||||
network_traffic, references = self.fetch_network_traffic_objects_and_references(objects)
|
||||
attributes = self.fill_observable_attributes(network_traffic, network_traffic_mapping)
|
||||
if hasattr(network_traffic, 'extensions') and network_traffic.extensions:
|
||||
extension_type, extension_value = list(network_traffic.extensions.items())[0]
|
||||
name = network_traffic_extensions[extension_type]
|
||||
attributes.extend(self.fill_observable_attributes(extension_value, network_traffic_mapping))
|
||||
mapping = network_traffic_references_mapping['with_extensions']
|
||||
else:
|
||||
name = 'ip-port'
|
||||
mapping = network_traffic_references_mapping['without_extensions']
|
||||
attributes.extend(self.parse_network_traffic_references(references, network_traffic, mapping, attr='pop'))
|
||||
if references:
|
||||
attributes.extend(self.parse_remaining_references(references, mapping))
|
||||
attributes, name = self.attributes_from_network_traffic(objects)
|
||||
self.handle_import_case(attributes, name, marking, uuid)
|
||||
|
||||
def parse_mac_address_observable(self, objects, marking, uuid):
|
||||
|
@ -1253,19 +1248,7 @@ class ExternalStixParser(StixParser):
|
|||
self.add_attributes_from_observable(objects, 'mutex', 'name', marking, uuid)
|
||||
|
||||
def parse_network_socket_observable(self, objects, marking, uuid):
|
||||
network_traffic, references = self.fetch_network_traffic_objects_and_references(objects)
|
||||
attributes = self.fill_observable_attributes(network_traffic, network_traffic_mapping)
|
||||
if hasattr(network_traffic, 'extensions') and network_traffic.extensions:
|
||||
extension_type, extension_value = list(network_traffic.extensions.items())[0]
|
||||
name = network_traffic_extensions[extension_type]
|
||||
attributes.extend(self.fill_observable_attributes(extension_value, network_traffic_mapping))
|
||||
mapping = network_traffic_references_mapping['with_extensions']
|
||||
else:
|
||||
name = 'ip-port'
|
||||
mapping = network_traffic_references_mapping['without_extensions']
|
||||
attributes.extend(self.parse_network_traffic_references(objects, network_traffic, mapping, attr='pop'))
|
||||
if references:
|
||||
attributes.extend(self.parse_remaining_references(references, mapping))
|
||||
attributes, name = self.attributes_from_network_traffic(objects)
|
||||
self.handle_import_case(attributes, name, marking, uuid)
|
||||
|
||||
def parse_network_traffic_pattern(self, pattern, marking=None, uuid=None):
|
||||
|
|
Loading…
Reference in New Issue