229 lines
5.9 KiB
Python
229 lines
5.9 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# Inspection library for the analyzer
|
|
#
|
|
# Copyright (C) 2019 Romain Kieffer
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
from pyshark import FileCapture
|
|
from glob import glob
|
|
|
|
|
|
unassigned_icmp_types = ['1', '2', '7'] + [str(i) for i in range(44, 253)]
|
|
robustness_icmp_types = [str(i) for i in range(20, 30)]
|
|
deprecated_icmp_types = ['4', '6', '15', '16', '17', '18'] + [str(i) for i in range(30, 40)]
|
|
icmp_type_dict = {
|
|
'0': 'echo-rep',
|
|
'3': 'dest-unreachable',
|
|
'4': 'source-quench',
|
|
'5': 'redirect',
|
|
'6': 'alternate-host-add',
|
|
'7': 'unassigned',
|
|
'8': 'echo',
|
|
'9': 'router-advertisement',
|
|
'10': 'router-solicitation',
|
|
'11': 'time-exceed',
|
|
'12': 'param-pb',
|
|
'13': 'timestamp',
|
|
'14': 'timestamp-rep',
|
|
'15': 'info-request',
|
|
'16': 'info-reply',
|
|
'17': 'address-mask-req',
|
|
'18': 'address-mask-rep',
|
|
'30': 'traceroute',
|
|
'31': 'mobile-host-redirect',
|
|
'32': 'datagram-conversion-error',
|
|
'33': 'ipv6-where',
|
|
'34': 'ipv6-here',
|
|
'35': 'mobile-register-req',
|
|
'36': 'mobile-register-rep',
|
|
'37': 'dn-req',
|
|
'38': 'dn-rep',
|
|
'39': 'skip',
|
|
'40': 'photuris',
|
|
'41': 'experimental-mobility',
|
|
'42': 'extended-echo-req',
|
|
'43': 'extended-echo-rep',
|
|
'253': 'rfc3692-1',
|
|
'254': 'rfc3692-2',
|
|
}
|
|
|
|
unassigned_proto = [str(i) for i in range(143, 253)]
|
|
proto_dict = {
|
|
'0': 'hopopt',
|
|
'1': 'icmp',
|
|
'2': 'igmp',
|
|
'6': 'tcp',
|
|
'8': 'egp',
|
|
'10': 'bbn-rcc-mon',
|
|
'15': 'xnet',
|
|
'16': 'chaos',
|
|
'17': 'udp',
|
|
'18': 'mux',
|
|
'23': 'trunk-1',
|
|
'24': 'trunk-2',
|
|
'25': 'leaf-1',
|
|
'27': 'rdp',
|
|
'28': 'irtp',
|
|
'29': 'iso-tp4',
|
|
'32': 'merit-inp',
|
|
'33': 'dccp',
|
|
'34': '3pc',
|
|
'38': 'idpr-cmtp',
|
|
'41': 'ipv6',
|
|
'42': 'sdrp',
|
|
'47': 'gre',
|
|
'50': 'esp',
|
|
'51': 'ah',
|
|
'54': 'narp',
|
|
'55': 'mobile',
|
|
'56': 'tlsp',
|
|
'58': 'ipv6-icmp',
|
|
'59': 'ipv6-nonxt',
|
|
'61': 'any_host_internal_protocol',
|
|
'62': 'cftp',
|
|
'63': 'any_local_network',
|
|
'67': 'ippc',
|
|
'69': 'sat-mon',
|
|
'70': 'visa',
|
|
'72': 'cpnx',
|
|
'75': 'pvp',
|
|
'76': 'br-sat-mon',
|
|
'78': 'wb-mon',
|
|
'84': 'iptm',
|
|
'85': 'nsfnet-igp',
|
|
'93': 'ax25',
|
|
'97': 'etherip',
|
|
'98': 'encap',
|
|
'104': 'aris',
|
|
'106': 'qnx',
|
|
'113': 'pgm',
|
|
'115': 'l2tp',
|
|
'117': 'iatp',
|
|
'119': 'srp',
|
|
'124': 'isisv4',
|
|
'135': 'mobility-header',
|
|
'137': 'mpls-in-ip',
|
|
}
|
|
|
|
|
|
def get_cap(path_to_cap):
|
|
return FileCapture(input_file=path_to_cap, display_filter='icmp', use_json=True, include_raw=True)
|
|
|
|
|
|
def get_files(path) -> list:
|
|
caps = glob(path)
|
|
return caps
|
|
|
|
|
|
def init_cap_list(dataset_path: str) -> list:
|
|
if dataset_path[-1] == '/':
|
|
extension = '*.gz'
|
|
else:
|
|
extension = '/*.gz'
|
|
cap_path = dataset_path + extension
|
|
caps = get_files(cap_path)
|
|
caps.sort()
|
|
return caps
|
|
|
|
|
|
def list_caps(state: str, redis):
|
|
caps = []
|
|
b_list = redis.lrange(state, 0, -1)
|
|
for item in b_list:
|
|
caps.append(item.decode())
|
|
return caps
|
|
|
|
|
|
def get_protocol(packet):
|
|
if 'ip_proto' in packet.icmp.field_names:
|
|
protocol = str(packet.icmp.ip_proto)
|
|
if int(protocol) in range(143, 253):
|
|
return protocol + ' (unassigned)'
|
|
ip_proto = proto_dict[protocol]
|
|
else:
|
|
return 'non-backscatter-icmp'
|
|
return protocol + ' : ' + str(ip_proto)
|
|
|
|
|
|
def get_icmp_payload(packet):
|
|
if 'data' in packet.icmp.field_names:
|
|
return str(packet.icmp.data)
|
|
elif packet.icmp.field_names != ['type', 'code', 'checksum', 'checksum_status', 'ident', 'seq', 'seq_le']:
|
|
print(packet.icmp.field_names)
|
|
print(packet.icmp)
|
|
return 'No data'
|
|
|
|
|
|
def get_port(packet, protocol, endpoint):
|
|
if protocol == 'tcp':
|
|
if endpoint == 'src':
|
|
return packet.icmp.tcp_srcport
|
|
elif endpoint == 'dst':
|
|
return packet.icmp.tcp_dstport
|
|
elif protocol == 'udp':
|
|
if endpoint == 'src':
|
|
return packet.icmp.udp_srcport
|
|
elif endpoint == 'dst':
|
|
return packet.icmp.udp_dstport
|
|
else:
|
|
return 0
|
|
|
|
|
|
def get_src_port(packet):
|
|
proto = get_protocol(packet)
|
|
return get_port(packet, proto, 'src')
|
|
|
|
|
|
def get_dst_port(packet):
|
|
proto = get_protocol(packet)
|
|
return get_port(packet, proto, 'dst')
|
|
|
|
|
|
def get_icmp_ip(packet):
|
|
proto = get_protocol(packet)
|
|
if 'ip_src' in packet.icmp.field_names:
|
|
return packet.icmp.ip_src
|
|
|
|
|
|
def list_sources_and_targets(cap):
|
|
sources, targets = [], []
|
|
for packet in cap:
|
|
src_port_tuple = get_src_port(packet)
|
|
src = (packet.icmp.ip_src, src_port_tuple)
|
|
dst_port_tuple = get_dst_port(packet)
|
|
dst = (packet.icmp.ip_dst, dst_port_tuple)
|
|
if src not in sources:
|
|
sources.append(src)
|
|
if dst not in targets:
|
|
targets.append(dst)
|
|
return sources, targets
|
|
|
|
|
|
def check_icmp_checksum(data):
|
|
hex_sum = 0
|
|
split_data = [data[i:i + 4] for i in range(0, len(data), 4)]
|
|
checksum = hex(int(split_data[1], 16))
|
|
split_data[1] = '0000'
|
|
for i in range(len(split_data)):
|
|
hex_sum += int(split_data[i], 16)
|
|
mask = (1 << hex_sum.bit_length()) - 1
|
|
res = hex(hex_sum ^ mask)
|
|
if res == checksum:
|
|
return 'good'
|
|
else:
|
|
return 'bad'
|