chg: [analyzer] Added local parsing

pull/1/head
airkeyp 2019-09-19 09:24:33 +02:00
parent 9ffa40c6bf
commit 559af4ed10
2 changed files with 93 additions and 17 deletions

View File

@ -25,7 +25,7 @@ import configparser
import logging import logging
from lib.inspection import get_cap, get_protocol, check_icmp_checksum, get_icmp_payload, get_icmp_ip, \ from lib.inspection import get_cap, get_protocol, check_icmp_checksum, get_icmp_payload, get_icmp_ip, \
unassigned_icmp_types, deprecated_icmp_types, get_src_port, get_dst_port unassigned_icmp_types, deprecated_icmp_types, get_src_port, get_dst_port, list_caps
class Analyzer: class Analyzer:
@ -33,14 +33,12 @@ class Analyzer:
Defines a parser to make bulk statistics on a large dataset of network captures. Defines a parser to make bulk statistics on a large dataset of network captures.
""" """
def __init__(self): def __init__(self, dataset_path: str=None):
config = configparser.RawConfigParser() config = configparser.RawConfigParser()
config.read('../etc/analyzer.conf') config.read('../etc/analyzer.conf')
self.uuid = config.get('global', 'my-uuid')
self.queue = "analyzer:1:{}".format(self.uuid)
logging_level = config.get('global', 'logging-level') logging_level = config.get('global', 'logging-level')
self.logger = logging.getLogger('') self.logger = logging.getLogger('ipa')
self.ch = logging.StreamHandler() self.ch = logging.StreamHandler()
if logging_level == 'DEBUG': if logging_level == 'DEBUG':
self.logger.setLevel(logging.DEBUG) self.logger.setLevel(logging.DEBUG)
@ -52,16 +50,65 @@ class Analyzer:
self.ch.setFormatter(formatter) self.ch.setFormatter(formatter)
self.logger.addHandler(self.ch) self.logger.addHandler(self.ch)
self.logger.info("Starting and using FIFO {} from D4 server".format(self.queue))
analyzer_redis_host = os.getenv('D4_ANALYZER_REDIS_HOST', '127.0.0.1') analyzer_redis_host = os.getenv('D4_ANALYZER_REDIS_HOST', '127.0.0.1')
analyzer_redis_port = int(os.getenv('D4_ANALYZER_REDIS_PORT', 6400)) analyzer_redis_port = int(os.getenv('D4_ANALYZER_REDIS_PORT', 6400))
self.r = redis.Redis(host=analyzer_redis_host, port=analyzer_redis_port) self.r = redis.Redis(host=analyzer_redis_host, port=analyzer_redis_port)
self.dataset = dataset_path
if not self.dataset:
self.uuid = config.get('global', 'my-uuid')
self.queue = "analyzer:1:{}".format(self.uuid)
self.logger.info("Starting and using FIFO {} from D4 server".format(self.queue))
d4_server, d4_port = config.get('global', 'd4-server').split(':') d4_server, d4_port = config.get('global', 'd4-server').split(':')
host_redis_metadata = os.getenv('D4_REDIS_METADATA_HOST', d4_server) host_redis_metadata = os.getenv('D4_REDIS_METADATA_HOST', d4_server)
port_redis_metadata = int(os.getenv('D4_REDIS_METADATA_PORT', d4_port)) port_redis_metadata = int(os.getenv('D4_REDIS_METADATA_PORT', d4_port))
self.r_d4 = redis.Redis(host=host_redis_metadata, port=port_redis_metadata, db=2) self.r_d4 = redis.Redis(host=host_redis_metadata, port=port_redis_metadata, db=2)
else:
self.logger.info("Starting local analyzer")
self.update_queue()
self.cap_list = []
self.process_local()
time.sleep(15)
c = self.update_queue()
if c == 0:
self.enqueue_caps(cap_list=list_caps('scanning', self.r))
self.r.delete('scanning')
print('[-] Process remaining unfinished caps.')
self.process_local()
def enqueue_caps(self, cap_list: list):
p = self.r.pipeline()
for cap in cap_list:
p.rpush(self.queue, cap)
p.execute()
def update_queue(self):
"""
Each parser instance is given a list of days, and thus a list of caps to parse.
This method lets the parser confront his list of caps with the caps in his queue.
"""
remaining_caps = list_caps(self.queue, self.r)
current_caps = list_caps('scanning', self.r)
parsed_caps = list_caps('scanned', self.r)
caps_to_add = []
if remaining_caps:
print('[*] Queue already populated.')
if self.cap_list:
for cap in self.cap_list:
if cap not in remaining_caps and cap not in parsed_caps and cap not in current_caps:
caps_to_add.append(cap)
if not caps_to_add:
print('[*] Already up to date.')
return 1
print('[o] Queue updated.')
else:
if self.cap_list:
print('[*] No caps, initializing...')
caps_to_add = self.cap_list
elif current_caps:
return 0
self.enqueue_caps(caps_to_add)
return 2
def parse_cap(self, cap): def parse_cap(self, cap):
""" """
@ -119,16 +166,28 @@ class Analyzer:
return 0 return 0
def pop_cap(self): def pop_cap(self):
absolute_path = None
if not self.dataset:
absolute_path = self.r_d4.rpop(self.queue) absolute_path = self.r_d4.rpop(self.queue)
else:
absolute_path = self.r.rpop('to_scan')
return get_cap(absolute_path) return get_cap(absolute_path)
def process(self): def process_d4(self):
while True: while True:
d4_cap = self.pop_cap() d4_cap = self.pop_cap()
if d4_cap is None: if d4_cap is None:
time.sleep(1) time.sleep(1)
continue continue
self.logger.debug('Parsing file {}'.format(d4_cap.input_filename)) self.logger.debug('Parsing file {}'.format(d4_cap.input_filename))
print('[*] Current cap file: {}'.format(d4_cap.input_filename[-15:]))
self.parse_cap(d4_cap) self.parse_cap(d4_cap)
d4_cap.close() d4_cap.close()
def process_local(self):
while self.r.llen(self.queue) != 0:
cap = self.pop_cap()
self.r.rpush('scanning', cap.input_filename)
self.parse_cap(cap)
self.r.lrem('scanning', 0, cap.input_filename)
self.r.rpush('scanned', cap.input_filename)
cap.close()

View File

@ -125,13 +125,30 @@ def get_cap(path_to_cap):
def get_files(path) -> list: def get_files(path) -> list:
"""
Gets the path of any file in the given directory.
"""
caps = glob(path) caps = glob(path)
return caps return caps
def init_cap_list(dataset_path: str, daylist: list) -> list:
cap_list = []
if not daylist:
return []
for day in daylist:
cap_path = dataset_path + str(day) + '/*.gz'
caps = get_files(cap_path)
caps.sort()
cap_list += caps
return cap_list
def list_caps(state: str, redis):
caps = []
b_list = redis.lrange(state, 0, -1)
for item in b_list:
caps.append(item.decode())
return caps
def get_protocol(packet): def get_protocol(packet):
if 'ip_proto' in packet.icmp.field_names: if 'ip_proto' in packet.icmp.field_names:
protocol = str(packet.icmp.ip_proto) protocol = str(packet.icmp.ip_proto)