mirror of https://github.com/MISP/MISP
add: [stix2 import] Parsing network traffic in the case of network connection object
parent
d32fb0858b
commit
ed364cc15f
|
@ -247,7 +247,16 @@ class StixParser():
|
|||
def attributes_from_network_traffic(self, objects, name=None):
|
||||
network_traffic, references = self.fetch_network_traffic_objects_and_references(objects)
|
||||
attributes = self.fill_observable_attributes(network_traffic, network_traffic_mapping)
|
||||
if hasattr(network_traffic, 'extensions') and network_traffic.extensions:
|
||||
if name is not None:
|
||||
mapping = network_socket_types
|
||||
for protocol in network_traffic.protocols:
|
||||
try:
|
||||
layer = connection_protocols[protocol]
|
||||
attributes.append({'type': 'text', 'value': protocol, 'to_ids': False,
|
||||
'object_relation': 'layer{}-protocol'.format(layer)})
|
||||
except KeyError:
|
||||
continue
|
||||
elif hasattr(network_traffic, 'extensions') and network_traffic.extensions:
|
||||
extension_type, extension_value = list(network_traffic.extensions.items())[0]
|
||||
name = network_traffic_extensions[extension_type]
|
||||
attributes.extend(self.parse_socket_extension(extension_value))
|
||||
|
@ -478,6 +487,7 @@ class StixFromMISPParser(StixParser):
|
|||
'email': {'observable': self.observable_email, 'pattern': self.pattern_email},
|
||||
'file': {'observable': self.observable_file, 'pattern': self.pattern_file},
|
||||
'ip-port': {'observable': self.attributes_from_ip_port_observable, 'pattern': self.pattern_ip_port},
|
||||
'network-connection': {'observable': self.observable_connection, 'pattern': self.pattern_connection},
|
||||
'network-socket': {'observable': self.observable_socket, 'pattern': self.pattern_socket},
|
||||
'process': {'observable': self.attributes_from_process_observable, 'pattern': self.pattern_process},
|
||||
'registry-key': {'observable': self.attributes_from_regkey_observable, 'pattern': self.pattern_regkey},
|
||||
|
@ -625,6 +635,25 @@ class StixFromMISPParser(StixParser):
|
|||
else:
|
||||
self.misp_event['Galaxy'].append(self.parse_galaxy(o, labels))
|
||||
|
||||
def observable_connection(self, observable):
|
||||
attributes, _ = self.attributes_from_network_traffic(observable, 'network-connection')
|
||||
return attributes
|
||||
|
||||
def pattern_connection(self, pattern):
|
||||
attributes = []
|
||||
for p in pattern:
|
||||
p_type, p_value = p.split(' = ')
|
||||
p_value = p_value[1:-1]
|
||||
try:
|
||||
mapping = network_traffic_mapping[p_type]
|
||||
except KeyError:
|
||||
if not p_type.startswith('network-traffic:protocols['):
|
||||
continue
|
||||
mapping = {'type': 'text', 'relation': 'layer{}-protocol'.format(connection_protocols[p_value])}
|
||||
attributes.append({'type': mapping['type'], 'object_relation': mapping['relation'],
|
||||
'value': p_value})
|
||||
return attributes
|
||||
|
||||
def observable_email(self, observable):
|
||||
to_ids = False
|
||||
attributes, message = self.parse_complex_fields_observable_email(observable, to_ids)
|
||||
|
|
|
@ -311,3 +311,7 @@ external_pattern_mapping = {'domain-name': domain_pattern_mapping,
|
|||
'url': {'value':{'type': 'url'}},
|
||||
'x509-certificate': x509_mapping
|
||||
}
|
||||
|
||||
connection_protocols = {"IP": "3", "ICMP": "3", "ARP": "3",
|
||||
"TCP": "4", "UDP": "4",
|
||||
"HTTP": "7", "HTTPS": "7", "FTP": "7"}
|
||||
|
|
Loading…
Reference in New Issue