From 08809cf57439e8dace993ca4459aa93a68fe37b9 Mon Sep 17 00:00:00 2001 From: Terrtia Date: Mon, 11 Feb 2019 19:49:46 +0100 Subject: [PATCH] chg: [worker 1] compress files + send full pathname in feeder queues --- server/LAUNCH.sh | 2 +- server/workers/workers_1/file_compressor.py | 156 ++++++++++++++++++++ server/workers/workers_1/worker.py | 59 +++++++- server/workers/workers_8/worker.py | 2 +- 4 files changed, 214 insertions(+), 5 deletions(-) create mode 100755 server/workers/workers_1/file_compressor.py diff --git a/server/LAUNCH.sh b/server/LAUNCH.sh index bd3ce31..58cad7f 100755 --- a/server/LAUNCH.sh +++ b/server/LAUNCH.sh @@ -51,7 +51,7 @@ function launching_redis { screen -dmS "Redis_D4" sleep 0.1 - echo -e $GREEN"\t* Launching D4 Redis ervers"$DEFAULT + echo -e $GREEN"\t* Launching D4 Redis Servers"$DEFAULT screen -S "Redis_D4" -X screen -t "6379" bash -c $redis_dir'redis-server '$conf_dir'6379.conf ; read x' sleep 0.1 screen -S "Redis_D4" -X screen -t "6380" bash -c $redis_dir'redis-server '$conf_dir'6380.conf ; read x' diff --git a/server/workers/workers_1/file_compressor.py b/server/workers/workers_1/file_compressor.py new file mode 100755 index 0000000..9e16bee --- /dev/null +++ b/server/workers/workers_1/file_compressor.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python3 + +import os +import sys +import time +import gzip +import redis +import shutil +import datetime + +import signal + +class GracefulKiller: + kill_now = False + def __init__(self): + signal.signal(signal.SIGINT, self.exit_gracefully) + signal.signal(signal.SIGTERM, self.exit_gracefully) + + def exit_gracefully(self,signum, frame): + self.kill_now = True + +def compress_file(file_full_path, session_uuid,i=0): + redis_server_stream.set('data_in_process:{}'.format(session_uuid), file_full_path) + if i==0: + compressed_filename = '{}.gz'.format(file_full_path) + else: + compressed_filename = '{}.{}.gz'.format(file_full_path, i) + if os.path.isfile(compressed_filename): + compress_file(file_full_path, session_uuid, i+1) + else: + with open(file_full_path, 'rb') as f_in: + with gzip.open(compressed_filename, 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + os.remove(file_full_path) + # save full path in anylyzer queue + for analyzer_uuid in redis_server_metadata.smembers('analyzer:{}'.format(type)): + analyzer_uuid = analyzer_uuid.decode() + redis_server_analyzer.lpush('analyzer:{}:{}'.format(type, analyzer_uuid), compressed_filename) + redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'last_updated', time.time()) + analyser_queue_max_size = redis_server_metadata.hget('analyzer:{}'.format(analyzer_uuid), 'max_size') + if analyser_queue_max_size is None: + analyser_queue_max_size = analyzer_list_max_default_size + redis_server_analyzer.ltrim('analyzer:{}:{}'.format(type, analyzer_uuid), 0, analyser_queue_max_size) + + +host_redis_stream = "localhost" +port_redis_stream = 6379 + +host_redis_metadata = "localhost" +port_redis_metadata = 6380 + +redis_server_stream = redis.StrictRedis( + host=host_redis_stream, + port=port_redis_stream, + db=0) + +redis_server_metadata = redis.StrictRedis( + host=host_redis_metadata, + port=port_redis_metadata, + db=0) + +redis_server_analyzer = redis.StrictRedis( + host=host_redis_metadata, + port=port_redis_metadata, + db=2) + +type = 1 +sleep_time = 20 + +analyzer_list_max_default_size = 10000 + +if __name__ == "__main__": + killer = GracefulKiller() + + if len(sys.argv) != 4: + print('usage:', 'Worker.py', 'session_uuid', 'tcpdump', 'date') + exit(1) + + # TODO sanityse input + session_uuid = sys.argv[1] + directory_data_uuid = sys.argv[2] + date = sys.argv[3] + + worker_data_directory = os.path.join(directory_data_uuid, date[0:4], date[4:6], date[6:8]) + full_datetime = datetime.datetime.now().strftime("%Y%m%d%H") + + current_file = None + time_change = False + + while True: + if killer.kill_now: + break + + new_date = datetime.datetime.now().strftime("%Y%m%d") + + # get all directory files + all_files = os.listdir(worker_data_directory) + not_compressed_file = [] + # filter: get all not compressed files + for file in all_files: + if file.endswith('.cap'): + not_compressed_file.append(os.path.join(worker_data_directory, file)) + + if not_compressed_file: + ### check time-change (minus one hour) ### + new_full_datetime = datetime.datetime.now().strftime("%Y%m%d%H") + if new_full_datetime < full_datetime: + # sort list, last modified + not_compressed_file.sort(key=os.path.getctime) + else: + # sort list + not_compressed_file.sort() + ### ### + + # new day + if date != new_date: + # compress all file + for file in not_compressed_file: + if killer.kill_now: + break + compress_file(file, session_uuid) + # reset file tracker + current_file = None + date = new_date + # update worker_data_directory + worker_data_directory = os.path.join(directory_data_uuid, date[0:4], date[4:6], date[6:8]) + # restart + continue + + # file used by tcpdump + max_file = not_compressed_file[-1] + full_datetime = new_full_datetime + + # Init: set current_file + if not current_file: + current_file = max_file + #print('max_file set: {}'.format(current_file)) + + # new file created + if max_file != current_file: + + # get all previous files + for file in not_compressed_file: + if file != max_file: + if killer.kill_now: + break + #print('new file: {}'.format(file)) + compress_file(file, session_uuid) + + # update current_file tracker + current_file = max_file + + if killer.kill_now: + break + + time.sleep(10) diff --git a/server/workers/workers_1/worker.py b/server/workers/workers_1/worker.py index 0d89030..1162893 100755 --- a/server/workers/workers_1/worker.py +++ b/server/workers/workers_1/worker.py @@ -3,10 +3,11 @@ import os import sys import time +import gzip import redis -import subprocess - +import shutil import datetime +import subprocess def data_incorrect_format(stream_name, session_uuid, uuid): redis_server_stream.sadd('Error:IncorrectType:{}'.format(type), session_uuid) @@ -22,6 +23,28 @@ def clean_stream(stream_name, session_uuid): redis_server_stream.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid) redis_server_stream.delete(stream_name) +def compress_file(file_full_path, i=0): + if i==0: + compressed_filename = '{}.gz'.format(file_full_path) + else: + compressed_filename = '{}.{}.gz'.format(file_full_path, i) + if os.path.isfile(compressed_filename): + compress_file(file_full_path, i+1) + else: + with open(file_full_path, 'rb') as f_in: + with gzip.open(compressed_filename, 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + os.remove(file_full_path) + # save full path in anylyzer queue + for analyzer_uuid in redis_server_metadata.smembers('analyzer:{}'.format(type)): + analyzer_uuid = analyzer_uuid.decode() + redis_server_analyzer.lpush('analyzer:{}:{}'.format(type, analyzer_uuid), compressed_filename) + redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'last_updated', time.time()) + analyser_queue_max_size = redis_server_metadata.hget('analyzer:{}'.format(analyzer_uuid), 'max_size') + if analyser_queue_max_size is None: + analyser_queue_max_size = analyzer_list_max_default_size + redis_server_analyzer.ltrim('analyzer:{}:{}'.format(type, analyzer_uuid), 0, analyser_queue_max_size) + host_redis_stream = "localhost" port_redis_stream = 6379 @@ -38,10 +61,18 @@ redis_server_metadata = redis.StrictRedis( port=port_redis_metadata, db=0) +redis_server_analyzer = redis.StrictRedis( + host=host_redis_metadata, + port=port_redis_metadata, + db=2) + type = 1 tcp_dump_cycle = '300' +tcp_dump_cycle = '10' stream_buffer = 100 +analyzer_list_max_default_size = 10000 + id_to_delete = [] if __name__ == "__main__": @@ -59,6 +90,7 @@ if __name__ == "__main__": uuid = res[0][1][0][1][b'uuid'].decode() date = datetime.datetime.now().strftime("%Y%m%d") tcpdump_path = os.path.join('../../data', uuid, str(type)) + full_tcpdump_path = os.path.join(os.environ['D4_HOME'], 'data', uuid, str(type)) rel_path = os.path.join(tcpdump_path, date[0:4], date[4:6], date[6:8]) if not os.path.isdir(rel_path): os.makedirs(rel_path) @@ -72,6 +104,8 @@ if __name__ == "__main__": process = subprocess.Popen(["tcpdump", '-n', '-r', '-', '-G', tcp_dump_cycle, '-w', '{}/%Y/%m/%d/{}-%Y-%m-%d-%H%M%S.cap'.format(tcpdump_path, uuid)], stdin=subprocess.PIPE, stderr=subprocess.PIPE) nb_save = 0 + process_compressor = subprocess.Popen(['./file_compressor.py', session_uuid, full_tcpdump_path, date]) + while True: res = redis_server_stream.xread({stream_name: id}, count=1) @@ -108,7 +142,7 @@ if __name__ == "__main__": nb_save = 0 else: - # sucess, all data are saved + # success, all data are saved if redis_server_stream.sismember('ended_session', session_uuid): out, err = process.communicate(timeout= 0.5) #print(out) @@ -117,12 +151,31 @@ if __name__ == "__main__": elif err: print(err) + # close child + try: + process_compressor.communicate(timeout= 0.5) + except subprocess.TimeoutExpired: + process_compressor.kill() + ### compress all files ### + date = datetime.datetime.now().strftime("%Y%m%d") + worker_data_directory = os.path.join(full_tcpdump_path, date[0:4], date[4:6], date[6:8]) + all_files = os.listdir(worker_data_directory) + all_files.sort() + if all_files: + for file in all_files: + if file.endswith('.cap'): + full_path = os.path.join(worker_data_directory, file) + if redis_server_stream.get('data_in_process:{}'.format(session_uuid)) != full_path: + compress_file(full_path) + ### ### + #print(process.stderr.read()) redis_server_stream.srem('ended_session', session_uuid) redis_server_stream.srem('session_uuid:{}'.format(type), session_uuid) redis_server_stream.srem('working_session_uuid:{}'.format(type), session_uuid) redis_server_stream.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid) redis_server_stream.delete(stream_name) + redis_server_stream.delete('data_in_process:{}'.format(session_uuid)) # make sure that tcpdump can save all datas time.sleep(10) print('---- tcpdump DONE, uuid={} session_uuid={}'.format(uuid, session_uuid)) diff --git a/server/workers/workers_8/worker.py b/server/workers/workers_8/worker.py index 39de71f..b9cf39c 100755 --- a/server/workers/workers_8/worker.py +++ b/server/workers/workers_8/worker.py @@ -115,7 +115,7 @@ if __name__ == "__main__": if len(buffer) < max_buffer_length: buffer += data[b'message'] else: - print('Error, infinite loop, buffer may length reached') + print('Error, infinite loop, max buffer length reached') # force new line buffer += b'{}\n'.format(data[b'message'])