193 lines
7.3 KiB
Python
193 lines
7.3 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# ICMP Passive Analyzer for D4
|
|
#
|
|
# Copyright (C) 2019 Romain Kieffer
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import redis
|
|
import os
|
|
import time
|
|
import configparser
|
|
import logging
|
|
|
|
from lib.inspection import get_cap, get_raw_cap, get_protocol, check_icmp_checksum, get_icmp_payload, get_icmp_ip, \
|
|
unassigned_icmp_types, deprecated_icmp_types, get_src_port, get_dst_port, list_caps, init_cap_list
|
|
|
|
|
|
class Analyzer:
|
|
"""
|
|
Defines a parser to make bulk statistics on a large dataset of network captures.
|
|
"""
|
|
|
|
def __init__(self, dataset_path: str=None):
|
|
config = configparser.RawConfigParser()
|
|
config.read('../etc/analyzer.conf')
|
|
|
|
logging_level = config.get('global', 'logging-level')
|
|
self.logger = logging.getLogger('ipa')
|
|
self.ch = logging.StreamHandler()
|
|
if logging_level == 'DEBUG':
|
|
self.logger.setLevel(logging.DEBUG)
|
|
self.ch.setLevel(logging.DEBUG)
|
|
elif logging_level == 'INFO':
|
|
self.logger.setLevel(logging.INFO)
|
|
self.ch.setLevel(logging.INFO)
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
self.ch.setFormatter(formatter)
|
|
self.logger.addHandler(self.ch)
|
|
|
|
analyzer_redis_host = os.getenv('D4_ANALYZER_REDIS_HOST', '127.0.0.1')
|
|
analyzer_redis_port = int(os.getenv('D4_ANALYZER_REDIS_PORT', 6405))
|
|
self.r = redis.Redis(host=analyzer_redis_host, port=analyzer_redis_port)
|
|
|
|
self.dataset = dataset_path
|
|
if not self.dataset:
|
|
self.uuid = config.get('global', 'my-uuid')
|
|
self.queue = "analyzer:1:{}".format(self.uuid)
|
|
self.logger.info("Starting and using FIFO {} from D4 server".format(self.queue))
|
|
d4_server, d4_port = config.get('global', 'd4-server').split(':')
|
|
host_redis_metadata = os.getenv('D4_REDIS_METADATA_HOST', d4_server)
|
|
port_redis_metadata = int(os.getenv('D4_REDIS_METADATA_PORT', d4_port))
|
|
self.r_d4 = redis.Redis(host=host_redis_metadata, port=port_redis_metadata, db=2)
|
|
else:
|
|
self.logger.info("Starting local analyzer")
|
|
self.queue = "to_scan"
|
|
self.cap_list = []
|
|
self.logger.info("Adding dataset caps to local queue")
|
|
self.cap_list = init_cap_list(self.dataset)
|
|
self.logger.info('Added ' + str(len(self.cap_list)) + ' caps.')
|
|
self.update_queue()
|
|
self.logger.info("Processing...")
|
|
self.process_local()
|
|
self.logger.info("Done.")
|
|
time.sleep(15)
|
|
c = self.update_queue()
|
|
if c == 0:
|
|
self.enqueue_caps(cap_list=list_caps('scanning', self.r))
|
|
self.r.delete('scanning')
|
|
self.process_local()
|
|
|
|
def enqueue_caps(self, cap_list: list):
|
|
p = self.r.pipeline()
|
|
for cap in cap_list:
|
|
p.rpush(self.queue, cap)
|
|
p.execute()
|
|
|
|
def update_queue(self):
|
|
remaining_caps = list_caps(self.queue, self.r)
|
|
current_caps = list_caps('scanning', self.r)
|
|
parsed_caps = list_caps('scanned', self.r)
|
|
caps_to_add = []
|
|
if remaining_caps:
|
|
self.logger.info('Queue already populated.')
|
|
if self.cap_list:
|
|
for cap in self.cap_list:
|
|
if cap not in remaining_caps and cap not in parsed_caps and cap not in current_caps:
|
|
caps_to_add.append(cap)
|
|
if not caps_to_add:
|
|
self.logger.info('Already up to date.')
|
|
return 1
|
|
self.logger.info('Queue updated.')
|
|
else:
|
|
if self.cap_list:
|
|
caps_to_add = self.cap_list
|
|
elif current_caps:
|
|
return 0
|
|
self.enqueue_caps(caps_to_add)
|
|
return 2
|
|
|
|
def parse_cap(self, cap):
|
|
"""
|
|
Dissects the cap file to extract info.
|
|
"""
|
|
if cap is None:
|
|
self.logger.info('No caps to parse!')
|
|
return 0
|
|
|
|
self.logger.info('Parsing cap ' + cap.input_filename[-15:])
|
|
|
|
pipeline = self.r.pipeline()
|
|
for packet in cap:
|
|
ip_layer = packet.ip
|
|
icmp_layer = packet.icmp
|
|
|
|
icmp_type = str(icmp_layer.type)
|
|
# icmp_code = str(icmp_layer.code)
|
|
protocol = get_protocol(packet)
|
|
# checksum_status = check_icmp_checksum(packet.icmp_raw.value)
|
|
|
|
if protocol == '1 : icmp':
|
|
payload = get_icmp_payload(packet)
|
|
pipeline.zadd('data', {'total': 1}, incr=True)
|
|
pipeline.zadd('data', {payload: 1}, incr=True)
|
|
|
|
if 'ip_src' in packet.icmp.field_names:
|
|
ip = get_icmp_ip(packet)
|
|
pipeline.hset('sources', ip, ip_layer.src)
|
|
|
|
pipeline.hincrby('icmp', 'total')
|
|
if icmp_type in unassigned_icmp_types:
|
|
pipeline.hincrby('icmp', icmp_type + ' (unassigned)')
|
|
elif icmp_type in deprecated_icmp_types:
|
|
pipeline.hincrby('icmp', icmp_type + ' (deprecated)')
|
|
else:
|
|
pipeline.hincrby('icmp', icmp_type)
|
|
|
|
# pipeline.hincrby('checksum', 'total')
|
|
# pipeline.hincrby('checksum', checksum_status)
|
|
|
|
# entry = str(get_src_port(packet)) + ':' + protocol + ':' + icmp_type + ':' + icmp_code
|
|
# pipeline.zadd(source_ip, {entry: 1}, incr=True)
|
|
|
|
pipeline.zadd('protocols', {protocol: 1}, incr=True)
|
|
# pipeline.zadd(protocol, {source_ip: 1}, incr=True)
|
|
|
|
dst_port = get_dst_port(packet)
|
|
if int(dst_port) == 80 | int(dst_port) == 443:
|
|
pass
|
|
# TODO
|
|
pipeline.execute()
|
|
|
|
self.logger.debug('Pipelining to redis.')
|
|
return 0
|
|
|
|
def pop_cap(self):
|
|
if not self.dataset:
|
|
absolute_path = self.r_d4.rpop(self.queue).decode()
|
|
else:
|
|
absolute_path = self.r.lpop('to_scan').decode()
|
|
return get_cap(absolute_path), get_raw_cap(absolute_path)
|
|
|
|
def process_d4(self):
|
|
while True:
|
|
d4_cap, d4_raw_cap = self.pop_cap()
|
|
if d4_cap is None:
|
|
time.sleep(1)
|
|
continue
|
|
self.logger.debug('Parsing file {}'.format(d4_cap.input_filename))
|
|
self.parse_cap(d4_cap)
|
|
d4_cap.close()
|
|
|
|
def process_local(self):
|
|
while self.r.llen(self.queue) != 0:
|
|
cap, raw_cap = self.pop_cap()
|
|
self.r.rpush('scanning', cap.input_filename)
|
|
self.parse_cap(cap)
|
|
self.r.lrem('scanning', 0, cap.input_filename)
|
|
self.r.rpush('scanned', cap.input_filename)
|
|
cap.close()
|