From 3ad297799ea39c5e2d5a6ce6bc47ff8f62b7689f Mon Sep 17 00:00:00 2001 From: Terrtia Date: Fri, 8 Mar 2019 17:11:03 +0100 Subject: [PATCH 1/2] use dynamic Parent/Child class --- .../meta_types_modules/MetaTypesDefault.py | 133 ++++++++++++++++++ .../meta_types_modules/ja3-jl/ja3-jl.py | 17 +++ server/workers/workers_2/worker.py | 94 +++++-------- 3 files changed, 187 insertions(+), 57 deletions(-) create mode 100755 server/workers/workers_2/meta_types_modules/MetaTypesDefault.py create mode 100755 server/workers/workers_2/meta_types_modules/ja3-jl/ja3-jl.py diff --git a/server/workers/workers_2/meta_types_modules/MetaTypesDefault.py b/server/workers/workers_2/meta_types_modules/MetaTypesDefault.py new file mode 100755 index 0000000..34867d7 --- /dev/null +++ b/server/workers/workers_2/meta_types_modules/MetaTypesDefault.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 + +import os +import sys +import time +import json +import redis + +DEFAULT_FILE_EXTENSION = 'txt' +DEFAULT_FILE_SEPARATOR = b'\n' +ROTATION_SAVE_CYCLE = 10 # seconds +TYPE = 254 + +class MetaTypesDefault: + + def __init__(self, uuid, json_file): + self.session_uuid = uuid + self.type_name = json_file['type'] + self.parse_json(json_file) + print('end default init') + + def test(self): + print(self.session_uuid) + + ######## JSON PARSER ######## + def parse_json(self, uuid, json_file): + self.uuid = uuid + self.save_file_on_disk = True + self.is_file_rotation = True + self.file_separator = b'\n' + self.filename = b'{}.txt'.format(self.type_name) + + ######## PROCESS FUNCTIONS ######## + def process_data(self, data): + # save data on disk + if self.is_file_saved_on_disk(): + self.save_data_to_file(data) + + print('end process_data') + + ######## CORE FUNCTIONS ######## + + def check_json_file(self, json_file): + # the json object must contain a type field + if "type" in json_file: + return True + else: + return False + + def save_json_file(self, json_file): + self.last_time_saved = time.time() #time_file + self.last_saved_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S") #date_file + + save_path = os.path.join(self.get_save_dir(file_extention='json'), self.get_save_dir()) + with open(save_path, 'w') as f: + f.write(json.dumps(full_json)) + + def save_data_to_file(self, data): + if self.is_file_rotation(): + self.save_rotate_file(data) + + + def save_rotate_file(self, data): + new_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + # check if a new file rotation is needed # # TODO: change ROTATION_SAVE_CYCLE + if ( new_date[0:8] != self.get_last_time_saved()[0:8] ) or ( time.time() - self.get_last_time_saved > ROTATION_SAVE_CYCLE ): + date_file = new_date + self.set_last_saved_date(new_date) + self.set_rotate_file(True) + + + def save_same_directory(self, data): + pass + + ######## GET FUNCTIONS ######## + + def get_type_name(self): + return self.type_name + + def get_file_separator(self): + return self.file_separator + + def get_filename(self, file_extention=None): + if file_extention is None: + file_extention = DEFAULT_FILE_EXTENSION + # File Rotation, : data//254//// + if self.is_file_rotation(): + return '{}-{}-{}-{}-{}.{}'.format(self.uuid, self.get_last_saved_year(), self.get_last_saved_month(), self.get_last_saved_day(), self.get_last_saved_hour_minute(), file_extention) + + # todo save save_dir ??? + def get_save_dir(self): + # File Rotation, save data in directory: data//254//// + if self.is_file_rotation(): + data_directory_uuid_type = os.path.join('../../data', self.uuid, str(TYPE)) + return os.path.join(data_directory_uuid_type, self.get_last_saved_year(), self.get_last_saved_month(), self.get_last_saved_day() , self.type_name) + + # # TODO: save global type dir ??? + if self.is_file_saved_on_disk(): + pass + + def is_file_saved_on_disk(self): + return self.save_file_on_disk + + def is_file_rotation(self): + return self.is_file_rotation + + def get_last_time_saved(self): + return self.last_time_saved + + def get_last_saved_date(self): + return self.last_saved_date + + def get_last_saved_year(self): + return self.last_saved_date[0:4] + + def get_last_saved_month(self): + return self.last_saved_date[4:6] + + def get_last_saved_year(self): + return self.last_saved_date[6:8] + + def get_last_saved_hour_minute(self): + return self.last_saved_date[8:14] + + ######## SET FUNCTIONS ######## + + def set_rotate_file(self, boolean_value): + self.file_rotation = boolean_value + + def set_last_saved_date(self, date): + self.get_last_saved_date = date + +############## diff --git a/server/workers/workers_2/meta_types_modules/ja3-jl/ja3-jl.py b/server/workers/workers_2/meta_types_modules/ja3-jl/ja3-jl.py new file mode 100755 index 0000000..d4f53fd --- /dev/null +++ b/server/workers/workers_2/meta_types_modules/ja3-jl/ja3-jl.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python3 + +import os +import sys +import time +import json +import redis + +class TypeHandler(MetaTypesDefault: + + def __init__(self, json_file): + super().__init__(json_file) + print('init_spec') + + def test2(self): + print('ja3-jl type') + print(self.session_uuid) diff --git a/server/workers/workers_2/worker.py b/server/workers/workers_2/worker.py index b19cb0d..303e44e 100755 --- a/server/workers/workers_2/worker.py +++ b/server/workers/workers_2/worker.py @@ -8,6 +8,8 @@ import redis import datetime +from meta_types_modules import MetaTypesDefault + host_redis_stream = "localhost" port_redis_stream = 6379 @@ -30,48 +32,20 @@ max_buffer_length = 100000 rotation_save_cycle = 10 #seconds json_file_name = 'meta_json.json' -extended_type_name = None # # TODO: use default or json['file_type'] -save_to_file = True - -def get_dir_data_uuid(uuid, type): - return os.path.join('../../data', uuid, str(type)) - -def get_save_dir(dir_data_uuid, year, month, day, extended_type=None): - dir_path = os.path.join(dir_data_uuid, year, month, day) - if extended_type: - dir_path = os.path.join(dir_path, extended_type) - if not os.path.isdir(dir_path): - os.makedirs(dir_path) - return dir_path - -def check_json_file(json_file): - # the json object must contain a type field - if "type" in json_file: - return True - else: - return False - -def on_error(session_uuid, type_error, message): - redis_server_stream.sadd('Error:IncorrectType', session_uuid) - redis_server_metadata.hset('metadata_uuid:{}'.format(uuid), 'Error', 'Error: Type={}, {}'.format(type_error, message)) - clean_db(session_uuid) - print('Incorrect format') - sys.exit(1) - -def clean_db(session_uuid): - clean_stream(stream_meta_json, type_meta_header, session_uuid) - clean_stream(stream_defined, type_defined, session_uuid) - redis_server_stream.srem('ended_session', session_uuid) - redis_server_stream.srem('working_session_uuid:{}'.format(type_meta_header), session_uuid) - -def clean_stream(stream_name, type, session_uuid): - redis_server_stream.srem('session_uuid:{}'.format(type), session_uuid) - redis_server_stream.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid) - redis_server_stream.delete(stream_name) +def get_class( package_class ): + parts = package_class.split('.') + module = ".".join(parts[:-1]) + mod = __import__( module ) + for comp in parts[1:]: + mod = getattr(mod, comp) + return mod if __name__ == "__main__": + + ###################################################3 + if len(sys.argv) != 2: print('usage:', 'Worker.py', 'session_uuid') exit(1) @@ -151,22 +125,24 @@ if __name__ == "__main__": clean_db(session_uuid) sys.exit(1) - file_separator = b'\n' ## TODO: map all file separator or extract from json - extended_type_name = '{}.txt'.format(extended_type) # # TODO: create default or extract from JSON + + #### Handle Specific MetaTypes #### + # Use Specific Handler defined + if os.path.isdir(os.path.join('meta_types_modules', extended_type)): + class_type_handler = get_class('meta_types_modules.{}.{}.TypeHandler'.format(extended_type, extended_type)) + type_handler = class_type_handler(uuid, full_json) + # Use Standard Handler + else: + type_handler = MetaTypesDefault.MetaTypesDefault(uuid, full_json) + + #file_separator = type_handler.get_file_separator(self) + #extended_type_name = type_handler.get_file_name() # save json on disk - if save_to_file: - rotate_file = False - time_file = time.time() - date_file = datetime.datetime.now().strftime("%Y%m%d%H%M%S") - # get new save_path #use first or last received date ??? - dir_data_uuid = get_dir_data_uuid(uuid, type_defined) - dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8], extended_type=extended_type) - filename = '{}-{}-{}-{}-{}.{}'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14], json_file_name) - save_path = os.path.join(dir_full_path, filename) - with open(save_path, 'w') as f: - f.write(json.dumps(full_json)) - # get extended_type save_path + type_handler.save_json_file(json) + + + # get extended_type save_path ############################################################################################################# filename = '{}-{}-{}-{}-{}.{}'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14], extended_type_name) save_path = os.path.join(dir_full_path, filename) @@ -186,13 +162,14 @@ if __name__ == "__main__": data = res[0][1][0][1] if id and data: + + type_handler.process_data(data) + + ######################## # save data on disk if save_to_file: - new_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S") - # check if a new rotation is needed - if ( new_date[0:8] != date_file[0:8] ) or ( time.time() - time_file > rotation_save_cycle ): - date_file = new_date - rotate_file = True + + # file rotation if rotate_file and file_separator in data[b'message']: @@ -217,6 +194,9 @@ if __name__ == "__main__": with open(save_path, 'ab') as f: f.write(data[b'message']) + ####### + + redis_server_stream.xdel(stream_name, id) else: From 9b310f749856687f66062e7338811627b9d7fb4f Mon Sep 17 00:00:00 2001 From: Terrtia Date: Mon, 11 Mar 2019 11:54:20 +0100 Subject: [PATCH 2/2] chg: [worker 254] use module class --- .../meta_types_modules/MetaTypesDefault.py | 132 +++++++++++++----- .../meta_types_modules/ja3-jl/ja3-jl.py | 13 +- server/workers/workers_2/worker.py | 75 +++++----- 3 files changed, 138 insertions(+), 82 deletions(-) diff --git a/server/workers/workers_2/meta_types_modules/MetaTypesDefault.py b/server/workers/workers_2/meta_types_modules/MetaTypesDefault.py index 34867d7..a45689b 100755 --- a/server/workers/workers_2/meta_types_modules/MetaTypesDefault.py +++ b/server/workers/workers_2/meta_types_modules/MetaTypesDefault.py @@ -5,30 +5,31 @@ import sys import time import json import redis +import datetime DEFAULT_FILE_EXTENSION = 'txt' DEFAULT_FILE_SEPARATOR = b'\n' -ROTATION_SAVE_CYCLE = 10 # seconds +ROTATION_SAVE_CYCLE = 5 # seconds TYPE = 254 class MetaTypesDefault: def __init__(self, uuid, json_file): - self.session_uuid = uuid + self.uuid = uuid self.type_name = json_file['type'] + self.save_path = None self.parse_json(json_file) - print('end default init') def test(self): - print(self.session_uuid) + print('class: MetaTypesDefault') ######## JSON PARSER ######## - def parse_json(self, uuid, json_file): - self.uuid = uuid + def parse_json(self, json_file): self.save_file_on_disk = True - self.is_file_rotation = True + self.file_rotation_mode = True + self.file_rotation = False self.file_separator = b'\n' - self.filename = b'{}.txt'.format(self.type_name) + self.filename = b''.join([self.type_name.encode(), b'.txt']) ######## PROCESS FUNCTIONS ######## def process_data(self, data): @@ -36,8 +37,6 @@ class MetaTypesDefault: if self.is_file_saved_on_disk(): self.save_data_to_file(data) - print('end process_data') - ######## CORE FUNCTIONS ######## def check_json_file(self, json_file): @@ -47,27 +46,68 @@ class MetaTypesDefault: else: return False + # # TODO: update for non rotate_file mode def save_json_file(self, json_file): - self.last_time_saved = time.time() #time_file - self.last_saved_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S") #date_file - - save_path = os.path.join(self.get_save_dir(file_extention='json'), self.get_save_dir()) - with open(save_path, 'w') as f: - f.write(json.dumps(full_json)) + self.set_last_time_saved(time.time()) #time_file + self.set_last_saved_date(datetime.datetime.now().strftime("%Y%m%d%H%M%S")) #date_file + # update save path + self.set_save_path( os.path.join(self.get_save_dir(), self.get_filename(file_extention='json')) ) + # save json + with open(self.get_save_path(), 'w') as f: + f.write(json.dumps(json_file)) + # update save path for 254 files type + self.set_save_path( os.path.join(self.get_save_dir(), self.get_filename()) ) def save_data_to_file(self, data): - if self.is_file_rotation(): + if self.is_file_rotation_mode(): self.save_rotate_file(data) def save_rotate_file(self, data): - new_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S") - # check if a new file rotation is needed # # TODO: change ROTATION_SAVE_CYCLE - if ( new_date[0:8] != self.get_last_time_saved()[0:8] ) or ( time.time() - self.get_last_time_saved > ROTATION_SAVE_CYCLE ): - date_file = new_date - self.set_last_saved_date(new_date) - self.set_rotate_file(True) - + if not self.get_file_rotation(): + new_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + # check if a new file rotation is needed # # TODO: change ROTATION_SAVE_CYCLE + if ( new_date[0:8] != self.get_last_saved_date()[0:8] ) or ( int(time.time()) - self.get_last_time_saved() > ROTATION_SAVE_CYCLE ): + self.set_rotate_file(True) + + # rotate file + if self.get_file_rotation(): + # init save path + if self.get_save_path() is None: + self.set_last_time_saved(time.time()) + self.set_last_saved_date(datetime.datetime.now().strftime("%Y%m%d%H%M%S")) + # update save path + self.set_save_path( os.path.join(self.get_save_dir(), self.get_filename()) ) + + # rotate file + if self.get_file_separator() in data: + end_file, start_new_file = data.rsplit(self.get_file_separator(), maxsplit=1) + # save end of file + with open(self.get_save_path(), 'ab') as f: + f.write(end_file) + + # set last saved date/time + self.set_last_time_saved(time.time()) + self.set_last_saved_date(datetime.datetime.now().strftime("%Y%m%d%H%M%S")) + # update save path + self.set_save_path( os.path.join(self.get_save_dir(), self.get_filename()) ) + + # save start of new file + if start_new_file != b'': + with open(self.get_save_path(), 'ab') as f: + f.write(start_new_file) + # end of rotation + self.set_rotate_file(False) + + # wait file separator + else: + with open(self.get_save_path(), 'ab') as f: + f.write(data) + else: + # save file + with open(self.get_save_path(), 'ab') as f: + f.write(data) + def save_same_directory(self, data): pass @@ -80,29 +120,43 @@ class MetaTypesDefault: def get_file_separator(self): return self.file_separator + def get_uuid(self): + return self.uuid + def get_filename(self, file_extention=None): if file_extention is None: file_extention = DEFAULT_FILE_EXTENSION # File Rotation, : data//254//// - if self.is_file_rotation(): + if self.is_file_rotation_mode(): return '{}-{}-{}-{}-{}.{}'.format(self.uuid, self.get_last_saved_year(), self.get_last_saved_month(), self.get_last_saved_day(), self.get_last_saved_hour_minute(), file_extention) - # todo save save_dir ??? def get_save_dir(self): # File Rotation, save data in directory: data//254//// - if self.is_file_rotation(): - data_directory_uuid_type = os.path.join('../../data', self.uuid, str(TYPE)) + if self.is_file_rotation_mode(): + data_directory_uuid_type = os.path.join('../../data', self.get_uuid(), str(TYPE)) return os.path.join(data_directory_uuid_type, self.get_last_saved_year(), self.get_last_saved_month(), self.get_last_saved_day() , self.type_name) # # TODO: save global type dir ??? if self.is_file_saved_on_disk(): pass - def is_file_saved_on_disk(self): - return self.save_file_on_disk + def get_save_path(self): + return self.save_path - def is_file_rotation(self): - return self.is_file_rotation + def is_file_saved_on_disk(self): + if self.save_file_on_disk: + return True + else: + return False + + def is_file_rotation_mode(self): + if self.file_rotation_mode: + return True + else: + return False + + def get_file_rotation(self): + return self.file_rotation def get_last_time_saved(self): return self.last_time_saved @@ -116,7 +170,7 @@ class MetaTypesDefault: def get_last_saved_month(self): return self.last_saved_date[4:6] - def get_last_saved_year(self): + def get_last_saved_day(self): return self.last_saved_date[6:8] def get_last_saved_hour_minute(self): @@ -127,7 +181,17 @@ class MetaTypesDefault: def set_rotate_file(self, boolean_value): self.file_rotation = boolean_value + def set_last_time_saved(self, value_time): + self.last_time_saved = int(value_time) + def set_last_saved_date(self, date): - self.get_last_saved_date = date + self.last_saved_date = date + + def set_save_path(self, save_path): + # # TODO: create directory + dir_path = os.path.dirname(save_path) + if not os.path.isdir(dir_path): + os.makedirs(dir_path) + self.save_path = save_path ############## diff --git a/server/workers/workers_2/meta_types_modules/ja3-jl/ja3-jl.py b/server/workers/workers_2/meta_types_modules/ja3-jl/ja3-jl.py index d4f53fd..0461683 100755 --- a/server/workers/workers_2/meta_types_modules/ja3-jl/ja3-jl.py +++ b/server/workers/workers_2/meta_types_modules/ja3-jl/ja3-jl.py @@ -6,12 +6,13 @@ import time import json import redis -class TypeHandler(MetaTypesDefault: +from meta_types_modules.MetaTypesDefault import MetaTypesDefault - def __init__(self, json_file): - super().__init__(json_file) +class TypeHandler(MetaTypesDefault): + + def __init__(self, uuid, json_file): + super().__init__(uuid, json_file) print('init_spec') - def test2(self): - print('ja3-jl type') - print(self.session_uuid) + def test(self): + print('Class: ja3-jl') diff --git a/server/workers/workers_2/worker.py b/server/workers/workers_2/worker.py index 303e44e..362432d 100755 --- a/server/workers/workers_2/worker.py +++ b/server/workers/workers_2/worker.py @@ -41,6 +41,31 @@ def get_class( package_class ): mod = getattr(mod, comp) return mod +def check_default_json_file(json_file): + # the json object must contain a type field + if "type" in json_file: + return True + else: + return False + +def on_error(session_uuid, type_error, message): + redis_server_stream.sadd('Error:IncorrectType', session_uuid) + redis_server_metadata.hset('metadata_uuid:{}'.format(uuid), 'Error', 'Error: Type={}, {}'.format(type_error, message)) + clean_db(session_uuid) + print('Incorrect format') + sys.exit(1) + +def clean_db(session_uuid): + clean_stream(stream_meta_json, type_meta_header, session_uuid) + clean_stream(stream_defined, type_defined, session_uuid) + redis_server_stream.srem('ended_session', session_uuid) + redis_server_stream.srem('working_session_uuid:{}'.format(type_meta_header), session_uuid) + +def clean_stream(stream_name, type, session_uuid): + redis_server_stream.srem('session_uuid:{}'.format(type), session_uuid) + redis_server_stream.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid) + redis_server_stream.delete(stream_name) + if __name__ == "__main__": @@ -103,7 +128,8 @@ if __name__ == "__main__": # complete json received if full_json: print(full_json) - if check_json_file(full_json): + if check_default_json_file(full_json): + # end type 2 processing break # Incorrect Json else: @@ -139,12 +165,7 @@ if __name__ == "__main__": #extended_type_name = type_handler.get_file_name() # save json on disk - type_handler.save_json_file(json) - - - # get extended_type save_path ############################################################################################################# - filename = '{}-{}-{}-{}-{}.{}'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14], extended_type_name) - save_path = os.path.join(dir_full_path, filename) + type_handler.save_json_file(full_json) # change stream_name/type stream_name = stream_defined @@ -152,6 +173,8 @@ if __name__ == "__main__": id = 0 buffer = b'' + type_handler.test() + # handle 254 type while True: res = redis_server_stream.xread({stream_name: id}, count=1) @@ -162,41 +185,9 @@ if __name__ == "__main__": data = res[0][1][0][1] if id and data: - - type_handler.process_data(data) - - ######################## - # save data on disk - if save_to_file: - - - - # file rotation - if rotate_file and file_separator in data[b'message']: - end_file, start_new_file = data[b'message'].rsplit(file_separator, maxsplit=1) - # save end of file - with open(save_path, 'ab') as f: - f.write(end_file) - - # get new save_path - dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8], extended_type=extended_type) - filename = '{}-{}-{}-{}-{}.{}'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14], extended_type_name) - save_path = os.path.join(dir_full_path, filename) - - # save start of new file - if start_new_file != b'': - with open(save_path, 'ab') as f: - f.write(start_new_file) - # end of rotation - rotate_file = False - time_file = time.time() - else: - with open(save_path, 'ab') as f: - f.write(data[b'message']) - - ####### - - + # process 254 data type + type_handler.process_data(data[b'message']) + # remove data from redis stream redis_server_stream.xdel(stream_name, id) else: