diff --git a/lib/analyzer.py b/lib/analyzer.py index 0745053..6174bb4 100644 --- a/lib/analyzer.py +++ b/lib/analyzer.py @@ -25,7 +25,7 @@ import configparser import logging from lib.inspection import get_cap, get_protocol, check_icmp_checksum, get_icmp_payload, get_icmp_ip, \ - unassigned_icmp_types, deprecated_icmp_types, get_src_port, get_dst_port + unassigned_icmp_types, deprecated_icmp_types, get_src_port, get_dst_port, list_caps class Analyzer: @@ -33,14 +33,12 @@ class Analyzer: Defines a parser to make bulk statistics on a large dataset of network captures. """ - def __init__(self): + def __init__(self, dataset_path: str=None): config = configparser.RawConfigParser() config.read('../etc/analyzer.conf') - self.uuid = config.get('global', 'my-uuid') - self.queue = "analyzer:1:{}".format(self.uuid) logging_level = config.get('global', 'logging-level') - self.logger = logging.getLogger('') + self.logger = logging.getLogger('ipa') self.ch = logging.StreamHandler() if logging_level == 'DEBUG': self.logger.setLevel(logging.DEBUG) @@ -52,16 +50,65 @@ class Analyzer: self.ch.setFormatter(formatter) self.logger.addHandler(self.ch) - self.logger.info("Starting and using FIFO {} from D4 server".format(self.queue)) - analyzer_redis_host = os.getenv('D4_ANALYZER_REDIS_HOST', '127.0.0.1') analyzer_redis_port = int(os.getenv('D4_ANALYZER_REDIS_PORT', 6400)) self.r = redis.Redis(host=analyzer_redis_host, port=analyzer_redis_port) - d4_server, d4_port = config.get('global', 'd4-server').split(':') - host_redis_metadata = os.getenv('D4_REDIS_METADATA_HOST', d4_server) - port_redis_metadata = int(os.getenv('D4_REDIS_METADATA_PORT', d4_port)) - self.r_d4 = redis.Redis(host=host_redis_metadata, port=port_redis_metadata, db=2) + self.dataset = dataset_path + if not self.dataset: + self.uuid = config.get('global', 'my-uuid') + self.queue = "analyzer:1:{}".format(self.uuid) + self.logger.info("Starting and using FIFO {} from D4 server".format(self.queue)) + d4_server, d4_port = config.get('global', 'd4-server').split(':') + host_redis_metadata = os.getenv('D4_REDIS_METADATA_HOST', d4_server) + port_redis_metadata = int(os.getenv('D4_REDIS_METADATA_PORT', d4_port)) + self.r_d4 = redis.Redis(host=host_redis_metadata, port=port_redis_metadata, db=2) + else: + self.logger.info("Starting local analyzer") + self.update_queue() + self.cap_list = [] + self.process_local() + time.sleep(15) + c = self.update_queue() + if c == 0: + self.enqueue_caps(cap_list=list_caps('scanning', self.r)) + self.r.delete('scanning') + print('[-] Process remaining unfinished caps.') + self.process_local() + + def enqueue_caps(self, cap_list: list): + p = self.r.pipeline() + for cap in cap_list: + p.rpush(self.queue, cap) + p.execute() + + def update_queue(self): + """ + Each parser instance is given a list of days, and thus a list of caps to parse. + This method lets the parser confront his list of caps with the caps in his queue. + """ + remaining_caps = list_caps(self.queue, self.r) + current_caps = list_caps('scanning', self.r) + parsed_caps = list_caps('scanned', self.r) + caps_to_add = [] + if remaining_caps: + print('[*] Queue already populated.') + if self.cap_list: + for cap in self.cap_list: + if cap not in remaining_caps and cap not in parsed_caps and cap not in current_caps: + caps_to_add.append(cap) + if not caps_to_add: + print('[*] Already up to date.') + return 1 + print('[o] Queue updated.') + else: + if self.cap_list: + print('[*] No caps, initializing...') + caps_to_add = self.cap_list + elif current_caps: + return 0 + self.enqueue_caps(caps_to_add) + return 2 def parse_cap(self, cap): """ @@ -119,16 +166,28 @@ class Analyzer: return 0 def pop_cap(self): - absolute_path = self.r_d4.rpop(self.queue) + absolute_path = None + if not self.dataset: + absolute_path = self.r_d4.rpop(self.queue) + else: + absolute_path = self.r.rpop('to_scan') return get_cap(absolute_path) - def process(self): + def process_d4(self): while True: d4_cap = self.pop_cap() if d4_cap is None: time.sleep(1) continue self.logger.debug('Parsing file {}'.format(d4_cap.input_filename)) - print('[*] Current cap file: {}'.format(d4_cap.input_filename[-15:])) self.parse_cap(d4_cap) d4_cap.close() + + def process_local(self): + while self.r.llen(self.queue) != 0: + cap = self.pop_cap() + self.r.rpush('scanning', cap.input_filename) + self.parse_cap(cap) + self.r.lrem('scanning', 0, cap.input_filename) + self.r.rpush('scanned', cap.input_filename) + cap.close() diff --git a/lib/inspection.py b/lib/inspection.py index 8e28f8c..61d8e99 100644 --- a/lib/inspection.py +++ b/lib/inspection.py @@ -125,13 +125,30 @@ def get_cap(path_to_cap): def get_files(path) -> list: - """ - Gets the path of any file in the given directory. - """ caps = glob(path) return caps +def init_cap_list(dataset_path: str, daylist: list) -> list: + cap_list = [] + if not daylist: + return [] + for day in daylist: + cap_path = dataset_path + str(day) + '/*.gz' + caps = get_files(cap_path) + caps.sort() + cap_list += caps + return cap_list + + +def list_caps(state: str, redis): + caps = [] + b_list = redis.lrange(state, 0, -1) + for item in b_list: + caps.append(item.decode()) + return caps + + def get_protocol(packet): if 'ip_proto' in packet.icmp.field_names: protocol = str(packet.icmp.ip_proto)