mirror of https://github.com/MISP/MISP
fix: [stix2 import] Reusing functions to simplify the code & avoid duplicates
parent
9f21bfd8bf
commit
c674ee3067
|
@ -229,21 +229,6 @@ class StixParser():
|
|||
continue
|
||||
return attributes
|
||||
|
||||
def attributes_from_ip_port_observable(self, objects):
|
||||
ordered_objects = defaultdict(dict)
|
||||
for key, value in objects.items():
|
||||
if isinstance(value, (stix2.IPv4Address, stix2.IPv6Address, stix2.NetworkTraffic)):
|
||||
ordered_objects[value._type.split('-')[1]][key] = value
|
||||
else:
|
||||
attributes = self.fill_observable_attributes(value, network_traffic_mapping)
|
||||
for traffic in ordered_objects['traffic'].values():
|
||||
if hasattr(traffic, 'dst_ref'):
|
||||
mapping = ip_attribute_mapping
|
||||
attributes.append({'type': mapping['type'], 'object_relation': mapping['relation'],
|
||||
'to_ids': False, 'value': ordered_objects['addr'][traffic.dst_ref].value})
|
||||
attributes.extend(self.fill_observable_attributes(traffic, network_traffic_mapping))
|
||||
return attributes
|
||||
|
||||
def attributes_from_network_traffic(self, objects, name=None):
|
||||
network_traffic, references = self.fetch_network_traffic_objects_and_references(objects)
|
||||
attributes = self.fill_observable_attributes(network_traffic, network_traffic_mapping)
|
||||
|
@ -486,7 +471,7 @@ class StixFromMISPParser(StixParser):
|
|||
'domain-ip': {'observable': self.attributes_from_domain_ip_observable, 'pattern': self.pattern_domain_ip},
|
||||
'email': {'observable': self.observable_email, 'pattern': self.pattern_email},
|
||||
'file': {'observable': self.observable_file, 'pattern': self.pattern_file},
|
||||
'ip-port': {'observable': self.attributes_from_ip_port_observable, 'pattern': self.pattern_ip_port},
|
||||
'ip-port': {'observable': self.observable_ip_port, 'pattern': self.pattern_ip_port},
|
||||
'network-connection': {'observable': self.observable_connection, 'pattern': self.pattern_connection},
|
||||
'network-socket': {'observable': self.observable_socket, 'pattern': self.pattern_socket},
|
||||
'process': {'observable': self.attributes_from_process_observable, 'pattern': self.pattern_process},
|
||||
|
@ -813,6 +798,10 @@ class StixFromMISPParser(StixParser):
|
|||
def pattern_domain_ip(self, pattern):
|
||||
return self.fill_pattern_attributes(pattern, domain_ip_mapping)
|
||||
|
||||
def observable_ip_port(self, observable):
|
||||
attributes, _ = self.attributes_from_network_traffic(observable)
|
||||
return attributes
|
||||
|
||||
def pattern_ip_port(self, pattern):
|
||||
return self.fill_pattern_attributes(pattern, network_traffic_mapping)
|
||||
|
||||
|
@ -851,13 +840,6 @@ class StixFromMISPParser(StixParser):
|
|||
attributes, _ = self.attributes_from_network_traffic(observable)
|
||||
return attributes
|
||||
|
||||
@staticmethod
|
||||
def parse_socket_observable(observable):
|
||||
for key in observable:
|
||||
observable_object = observable[key]
|
||||
if observable_object['type'] == 'network-traffic':
|
||||
return dict(observable_object)
|
||||
|
||||
@staticmethod
|
||||
def pattern_socket(pattern):
|
||||
attributes = []
|
||||
|
@ -989,7 +971,7 @@ class ExternalStixParser(StixParser):
|
|||
('domain-name', 'ipv6-addr'): self.parse_domain_ip_observable,
|
||||
('domain-name', 'ipv4-addr', 'network-traffic'): self.parse_ip_port_or_network_socket_observable,
|
||||
('domain-name', 'ipv6-addr', 'network-traffic'): self.parse_ip_port_or_network_socket_observable,
|
||||
('domain-name', 'ipv4-addr', 'ipv6-addr', 'network-traffic'): self.parse_ip_port_observable,
|
||||
('domain-name', 'ipv4-addr', 'ipv6-addr', 'network-traffic'): self.parse_ip_port_or_network_socket_observable,
|
||||
('domain-name', 'network-traffic'): self.parse_network_socket_observable,
|
||||
('domain-name', 'network-traffic', 'url'): self.parse_url_object_observable,
|
||||
('email-addr', 'email-message'): self.parse_email_observable,
|
||||
|
@ -1241,10 +1223,6 @@ class ExternalStixParser(StixParser):
|
|||
attributes, name = self.attributes_from_network_traffic(objects)
|
||||
self.handle_import_case(attributes, name, marking, uuid)
|
||||
|
||||
def parse_ip_port_observable(self, objects, marking, uuid):
|
||||
attributes = self.attributes_from_ip_port_observable(objects)
|
||||
self.handle_import_case(attributes, 'ip-port', marking, uuid)
|
||||
|
||||
def parse_ip_port_or_network_socket_observable(self, objects, marking, uuid):
|
||||
attributes, name = self.attributes_from_network_traffic(objects)
|
||||
self.handle_import_case(attributes, name, marking, uuid)
|
||||
|
|
|
@ -220,7 +220,8 @@ network_traffic_mapping = {'src_port': src_port_attribute_mapping,
|
|||
'network-traffic:end': end_datetime_attribute_mapping,
|
||||
'value': domain_attribute_mapping,
|
||||
'domain-name:value': domain_attribute_mapping,
|
||||
'network-traffic:dst_ref.value': ip_attribute_mapping,
|
||||
'network-traffic:dst_ref.value': {'type': 'ip-dst', 'relation': 'ip-dst'},
|
||||
'network-traffic:src_red.value': {'type': 'ip-src', 'relation': 'ip-src'},
|
||||
'address_family': address_family_attribute_mapping,
|
||||
"network-traffic:extensions.'socket-ext'.address_family": address_family_attribute_mapping,
|
||||
'protocol_family': domain_family_attribute_mapping,
|
||||
|
@ -233,8 +234,7 @@ network_traffic_mapping = {'src_port': src_port_attribute_mapping,
|
|||
network_traffic_extensions = {'socket-ext': 'network-socket'}
|
||||
|
||||
network_traffic_ip = ('ip-{}', 'ip-{}')
|
||||
ip_port_ip = ('ip-dst', 'ip')
|
||||
ip_port_types = {'domain-name': ('domain', 'domain'), 'ipv4-addr': ip_port_ip, 'ipv6-addr': ip_port_ip}
|
||||
ip_port_types = {'domain-name': ('domain', 'domain'), 'ipv4-addr': network_traffic_ip, 'ipv6-addr': network_traffic_ip}
|
||||
network_socket_types = {'domain-name': ('hostname', 'hostname-{}'), 'ipv4-addr': network_traffic_ip, 'ipv6-addr': network_traffic_ip}
|
||||
network_traffic_references_mapping = {'with_extensions': network_socket_types, 'without_extensions': ip_port_types}
|
||||
|
||||
|
|
Loading…
Reference in New Issue