diff --git a/server/server.py b/server/server.py index 3f2ce21..fe5893a 100755 --- a/server/server.py +++ b/server/server.py @@ -23,7 +23,8 @@ from twisted.protocols.policies import TimeoutMixin hmac_reset = bytearray(32) hmac_key = b'private key to change' -accepted_type = [1, 4, 8] +accepted_type = [1, 2, 4, 8, 254] +accepted_extended_type = ['ja3-jl'] timeout_time = 30 @@ -67,6 +68,9 @@ redis_server_metadata.set('server:hmac_default_key', hmac_key) redis_server_metadata.delete('server:accepted_type') for type in accepted_type: redis_server_metadata.sadd('server:accepted_type', type) +redis_server_metadata.delete('server:accepted_extended_type') +for type in accepted_type: + redis_server_metadata.sadd('server:accepted_extended_type', type) class D4_Server(Protocol, TimeoutMixin): @@ -75,6 +79,7 @@ class D4_Server(Protocol, TimeoutMixin): self.setTimeout(timeout_time) self.session_uuid = str(uuid.uuid4()) self.data_saved = False + self.update_stream_type = True self.first_connection = True self.ip = None self.source_port = None @@ -138,10 +143,10 @@ class D4_Server(Protocol, TimeoutMixin): logger.warning('Incorrect header data size: the server received more data than expected by default, expected={}, received={} , uuid={}, session_uuid={}'.format(data_default_size_limit, data_header['size'] ,data_header['uuid_header'], self.session_uuid)) # Worker: Incorrect type - if redis_server_stream.sismember('Error:IncorrectType:{}'.format(data_header['type']), self.session_uuid): + if redis_server_stream.sismember('Error:IncorrectType', self.session_uuid): self.transport.abortConnection() redis_server_stream.delete('stream:{}:{}'.format(data_header['type'], self.session_uuid)) - redis_server_stream.srem('Error:IncorrectType:{}'.format(data_header['type']), self.session_uuid) + redis_server_stream.srem('Error:IncorrectType', self.session_uuid) logger.warning('Incorrect type={} detected by worker, uuid={}, session_uuid={}'.format(data_header['type'] ,data_header['uuid_header'], self.session_uuid)) return data_header @@ -192,6 +197,23 @@ class D4_Server(Protocol, TimeoutMixin): self.transport.abortConnection() else: #self.version = None + # check if type change + if self.data_saved: + # type change detected + if self.type != data_header['type']: + # Meta types + if self.type == 2 and data_header['type'] == 254: + self.update_stream_type = True + # Type Error + else: + logger.warning('Unexpected type change, type={} new type={}, ip={} uuid={} session_uuid={}'.format(ip, data_header['uuid_header'], data_header['type'], self.session_uuid)) + redis_server_metadata.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'Error', 'Error: Unexpected type change type={}, new type={}'.format(self.type, data_header['type'])) + self.transport.abortConnection() + # type 254, check if previous type 2 saved + elif data_header['type'] == 254: + logger.warning('a type 2 packet must be sent, ip={} uuid={} type={} session_uuid={}'.format(ip, data_header['uuid_header'], data_header['type'], self.session_uuid)) + redis_server_metadata.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'Error', 'Error: a type 2 packet must be sent, type={}'.format(data_header['type'])) + self.transport.abortConnection() self.type = data_header['type'] self.uuid = data_header['uuid_header'] #active Connection @@ -308,14 +330,16 @@ class D4_Server(Protocol, TimeoutMixin): redis_server_metadata.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'last_seen', data_header['timestamp']) if not self.data_saved: - redis_server_stream.sadd('session_uuid:{}'.format(data_header['type']), self.session_uuid.encode()) - redis_server_stream.hset('map-type:session_uuid-uuid:{}'.format(data_header['type']), self.session_uuid, data_header['uuid_header']) - #UUID IP: ## TODO: use d4 timestamp ? redis_server_metadata.lpush('list_uuid_ip:{}'.format(data_header['uuid_header']), '{}-{}'.format(ip, datetime.datetime.now().strftime("%Y%m%d%H%M%S"))) redis_server_metadata.ltrim('list_uuid_ip:{}'.format(data_header['uuid_header']), 0, 15) self.data_saved = True + if self.update_stream_type: + redis_server_stream.sadd('session_uuid:{}'.format(data_header['type']), self.session_uuid.encode()) + redis_server_stream.hset('map-type:session_uuid-uuid:{}'.format(data_header['type']), self.session_uuid, data_header['uuid_header']) + + self.update_stream_type = False else: logger.warning("stream exceed max entries limit, uuid={}, session_uuid={}, type={}".format(data_header['uuid_header'], self.session_uuid, data_header['type'])) ## TODO: FIXME diff --git a/server/workers/workers_1/worker.py b/server/workers/workers_1/worker.py index 2e13166..44d5e04 100755 --- a/server/workers/workers_1/worker.py +++ b/server/workers/workers_1/worker.py @@ -10,7 +10,7 @@ import datetime import subprocess def data_incorrect_format(stream_name, session_uuid, uuid): - redis_server_stream.sadd('Error:IncorrectType:{}'.format(type), session_uuid) + redis_server_stream.sadd('Error:IncorrectType', session_uuid) redis_server_metadata.hset('metadata_uuid:{}'.format(uuid), 'Error', 'Error: Type={}, Incorrect file format'.format(type)) clean_stream(stream_name, session_uuid) print('Incorrect format') diff --git a/server/workers/workers_2/file_compressor.py b/server/workers/workers_2/file_compressor.py new file mode 100755 index 0000000..3406566 --- /dev/null +++ b/server/workers/workers_2/file_compressor.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 + +import os +import sys +import time +import gzip +import redis +import shutil +import datetime + +import signal + +class GracefulKiller: + kill_now = False + def __init__(self): + signal.signal(signal.SIGINT, self.exit_gracefully) + signal.signal(signal.SIGTERM, self.exit_gracefully) + + def exit_gracefully(self,signum, frame): + self.kill_now = True + +def compress_file(file_full_path, session_uuid,i=0): + redis_server_stream.set('data_in_process:{}'.format(session_uuid), file_full_path) + if i==0: + compressed_filename = '{}.gz'.format(file_full_path) + else: + compressed_filename = '{}.{}.gz'.format(file_full_path, i) + if os.path.isfile(compressed_filename): + compress_file(file_full_path, session_uuid, i+1) + else: + with open(file_full_path, 'rb') as f_in: + with gzip.open(compressed_filename, 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + try: + os.remove(file_full_path) + except FileNotFoundError: + pass + # save full path in anylyzer queue + for analyzer_uuid in redis_server_metadata.smembers('analyzer:{}'.format(type)): + analyzer_uuid = analyzer_uuid.decode() + redis_server_analyzer.lpush('analyzer:{}:{}'.format(type, analyzer_uuid), compressed_filename) + redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'last_updated', time.time()) + analyser_queue_max_size = redis_server_metadata.hget('analyzer:{}'.format(analyzer_uuid), 'max_size') + if analyser_queue_max_size is None: + analyser_queue_max_size = analyzer_list_max_default_size + redis_server_analyzer.ltrim('analyzer:{}:{}'.format(type, analyzer_uuid), 0, analyser_queue_max_size) + + +host_redis_stream = "localhost" +port_redis_stream = 6379 + +host_redis_metadata = "localhost" +port_redis_metadata = 6380 + +redis_server_stream = redis.StrictRedis( + host=host_redis_stream, + port=port_redis_stream, + db=0) + +redis_server_metadata = redis.StrictRedis( + host=host_redis_metadata, + port=port_redis_metadata, + db=0) + +redis_server_analyzer = redis.StrictRedis( + host=host_redis_metadata, + port=port_redis_metadata, + db=2) + +type = 1 +sleep_time = 300 + +analyzer_list_max_default_size = 10000 + +if __name__ == "__main__": + killer = GracefulKiller() + + if len(sys.argv) != 4: + print('usage:', 'Worker.py', 'session_uuid', 'tcpdump', 'date') + exit(1) + + # TODO sanityse input + session_uuid = sys.argv[1] + directory_data_uuid = sys.argv[2] + date = sys.argv[3] + + worker_data_directory = os.path.join(directory_data_uuid, date[0:4], date[4:6], date[6:8]) + full_datetime = datetime.datetime.now().strftime("%Y%m%d%H") + + current_file = None + time_change = False + + while True: + if killer.kill_now: + break + + new_date = datetime.datetime.now().strftime("%Y%m%d") + + # get all directory files + all_files = os.listdir(worker_data_directory) + not_compressed_file = [] + # filter: get all not compressed files + for file in all_files: + if file.endswith('.cap'): + not_compressed_file.append(os.path.join(worker_data_directory, file)) + + if not_compressed_file: + ### check time-change (minus one hour) ### + new_full_datetime = datetime.datetime.now().strftime("%Y%m%d%H") + if new_full_datetime < full_datetime: + # sort list, last modified + not_compressed_file.sort(key=os.path.getctime) + else: + # sort list + not_compressed_file.sort() + ### ### + + # new day + if date != new_date: + # compress all file + for file in not_compressed_file: + if killer.kill_now: + break + compress_file(file, session_uuid) + # reset file tracker + current_file = None + date = new_date + # update worker_data_directory + worker_data_directory = os.path.join(directory_data_uuid, date[0:4], date[4:6], date[6:8]) + # restart + continue + + # file used by tcpdump + max_file = not_compressed_file[-1] + full_datetime = new_full_datetime + + # Init: set current_file + if not current_file: + current_file = max_file + #print('max_file set: {}'.format(current_file)) + + # new file created + if max_file != current_file: + + # get all previous files + for file in not_compressed_file: + if file != max_file: + if killer.kill_now: + break + #print('new file: {}'.format(file)) + compress_file(file, session_uuid) + + # update current_file tracker + current_file = max_file + + if killer.kill_now: + break + + time.sleep(sleep_time) diff --git a/server/workers/workers_2/worker.py b/server/workers/workers_2/worker.py new file mode 100755 index 0000000..8c200da --- /dev/null +++ b/server/workers/workers_2/worker.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python3 + +import os +import sys +import time +import json +import redis + +import datetime + +host_redis_stream = "localhost" +port_redis_stream = 6379 + +redis_server_stream = redis.StrictRedis( + host=host_redis_stream, + port=port_redis_stream, + db=0) + +host_redis_metadata = "localhost" +port_redis_metadata = 6380 + +redis_server_metadata = redis.StrictRedis( + host=host_redis_metadata, + port=port_redis_metadata, + db=0) + +type_meta_header = 2 +type_defined = 254 +max_buffer_length = 100000 + +save_to_file = True + +def get_save_dir(dir_data_uuid, year, month, day): + dir_path = os.path.join(dir_data_uuid, year, month, day) + if not os.path.isdir(dir_path): + os.makedirs(dir_path) + return dir_path + +def check_json_file(json_file): + # the json object must contain a type field + if "type" in json_file: + return True + else: + return False + +def on_error(session_uuid, type_error, message): + redis_server_stream.sadd('Error:IncorrectType', session_uuid) + redis_server_metadata.hset('metadata_uuid:{}'.format(uuid), 'Error', 'Error: Type={}, {}'.format(type_error, message)) + clean_db(session_uuid) + print('Incorrect format') + sys.exit(1) + +def clean_db(session_uuid): + clean_stream(stream_meta_json, type_meta_header, session_uuid) + clean_stream(stream_defined, type_defined, session_uuid) + redis_server_stream.srem('ended_session', session_uuid) + redis_server_stream.srem('working_session_uuid:{}'.format(type_meta_header), session_uuid) + +def clean_stream(stream_name, type, session_uuid): + redis_server_stream.srem('session_uuid:{}'.format(type), session_uuid) + redis_server_stream.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid) + redis_server_stream.delete(stream_name) + +if __name__ == "__main__": + + if len(sys.argv) != 2: + print('usage:', 'Worker.py', 'session_uuid') + exit(1) + + session_uuid = sys.argv[1] + stream_meta_json = 'stream:{}:{}'.format(type_meta_header, session_uuid) + stream_defined = 'stream:{}:{}'.format(type_defined, session_uuid) + + id = '0' + buffer = b'' + + stream_name = stream_meta_json + type = type_meta_header + + # track launched worker + redis_server_stream.sadd('working_session_uuid:{}'.format(type_meta_header), session_uuid) + + # get uuid + res = redis_server_stream.xread({stream_name: id}, count=1) + if res: + uuid = res[0][1][0][1][b'uuid'].decode() + # init file rotation + if save_to_file: + rotate_file = False + time_file = time.time() + date_file = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + dir_data_uuid = os.path.join('../../data', uuid, str(type)) + dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8]) + filename = '{}-{}-{}-{}-{}.meta_json.txt'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14]) + save_path = os.path.join(dir_full_path, filename) + + print('---- worker launched, uuid={} session_uuid={}'.format(uuid, session_uuid)) + else: + print('Incorrect Stream, Closing worker: type={} session_uuid={}'.format(type, session_uuid)) + sys.exit(1) + + full_json = None + + # active session + while full_json is None: + + res = redis_server_stream.xread({stream_name: id}, count=1) + if res: + new_id = res[0][1][0][0].decode() + if id != new_id: + id = new_id + data = res[0][1][0][1] + + if id and data: + # reconstruct data + if buffer != b'': + data[b'message'] = b'{}{}'.format(buffer, data[b'message']) + buffer = b'' + try: + full_json = json.loads() + except: + buffer += data[b'message'] + # # TODO: filter too big json + redis_server_stream.xdel(stream_name, id) + + # complete json received + if full_json: + if check_json_file(full_json): + break + # Incorrect Json + else: + on_error(session_uuid, type, 'Incorrect JSON object') + else: + # end session, no json received + if redis_server_stream.sismember('ended_session', session_uuid): + clean_db(session_uuid) + print('---- Incomplete JSON object, DONE, uuid={} session_uuid={}'.format(uuid, session_uuid)) + sys.exit(0) + else: + time.sleep(10) + + # extract/parse JSON + extended_type = full_json['type'] + if not redis_server_metadata.sismember('server:accepted_extended_type', extended_type): + error_mess = 'Unsupported extended_type: {}'.format(extended_type) + on_error(session_uuid, type_error, error_mess) + print(error_mess) + clean_db(session_uuid) + sys.exit(1) + + # save json on disk + if save_to_file: + # get new save_path #use first or last received date ??? + dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8]) + filename = '{}-{}-{}-{}-{}.meta_json.txt'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14]) + save_path = os.path.join(dir_full_path, filename) + with open(save_path, 'w') as f: + f.write(full_json) + + # change stream_name/type + stream_name = stream_defined + type = type_defined + id = 0 + buffer = b'' + + # Do the magic on 254 type + while True: + res = redis_server_stream.xread({stream_name: id}, count=1) + if res: + new_id = res[0][1][0][0].decode() + if id != new_id: + id = new_id + data = res[0][1][0][1] + + if id and data: + print(data) + + else: + # end session, no json received + if redis_server_stream.sismember('ended_session', session_uuid): + clean_db(session_uuid) + print('---- Incomplete JSON object, DONE, uuid={} session_uuid={}'.format(uuid, session_uuid)) + sys.exit(0) + else: + time.sleep(10) diff --git a/server/workers/workers_2/workers_manager.py b/server/workers/workers_2/workers_manager.py new file mode 100755 index 0000000..d66b873 --- /dev/null +++ b/server/workers/workers_2/workers_manager.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 + +import os +import sys +import time +import redis +import subprocess + +host_redis_stream = "localhost" +port_redis_stream = 6379 + +redis_server_stream = redis.StrictRedis( + host=host_redis_stream, + port=port_redis_stream, + db=0) +type = 2 + +try: + redis_server_stream.ping() +except redis.exceptions.ConnectionError: + print('Error: Redis server {}:{}, ConnectionError'.format(host_redis, port_redis)) + sys.exit(1) + +if __name__ == "__main__": + stream_name = 'stream:{}'.format(type) + redis_server_stream.delete('working_session_uuid:{}'.format(type)) + + while True: + for session_uuid in redis_server_stream.smembers('session_uuid:{}'.format(type)): + session_uuid = session_uuid.decode() + if not redis_server_stream.sismember('working_session_uuid:{}'.format(type), session_uuid): + + process = subprocess.Popen(['./worker.py', session_uuid]) + print('Launching new worker{} ... session_uuid={}'.format(type, session_uuid)) + + #print('.') + time.sleep(10)