From 10a335cdf2a30a839e612809a463bf24a1605737 Mon Sep 17 00:00:00 2001 From: Terrtia Date: Fri, 1 Mar 2019 10:19:04 +0100 Subject: [PATCH] chg: [worker 2] save 254 with file rotation --- server/workers/workers_2/worker.py | 58 +++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 9 deletions(-) diff --git a/server/workers/workers_2/worker.py b/server/workers/workers_2/worker.py index 7e7ae36..56b0050 100755 --- a/server/workers/workers_2/worker.py +++ b/server/workers/workers_2/worker.py @@ -27,14 +27,20 @@ redis_server_metadata = redis.StrictRedis( type_meta_header = 2 type_defined = 254 max_buffer_length = 100000 +rotation_save_cycle = 10 #seconds + +json_file_name = 'meta_json.json' +extended_type_name = None # # TODO: use default or json['file_type'] save_to_file = True def get_dir_data_uuid(uuid, type): return os.path.join('../../data', uuid, str(type)) -def get_save_dir(dir_data_uuid, year, month, day): +def get_save_dir(dir_data_uuid, year, month, day, extended_type=None): dir_path = os.path.join(dir_data_uuid, year, month, day) + if extended_type: + dir_path = os.path.join(dir_path, extended_type) if not os.path.isdir(dir_path): os.makedirs(dir_path) return dir_path @@ -115,7 +121,6 @@ if __name__ == "__main__": buffer = b'' try: full_json = json.loads(data[b'message'].decode()) - print(full_json['type']) except: buffer += data[b'message'] # # TODO: filter too big json @@ -143,23 +148,27 @@ if __name__ == "__main__": if not redis_server_metadata.sismember('server:accepted_extended_type', extended_type): error_mess = 'Unsupported extended_type: {}'.format(extended_type) on_error(session_uuid, type, error_mess) - print(error_mess) clean_db(session_uuid) sys.exit(1) + file_separator = b'/n' ## TODO: map all file separator or extract from json + extended_type_name = '{}.txt'.format(extended_type) # # TODO: create default or extract from JSON + # save json on disk if save_to_file: rotate_file = False time_file = time.time() date_file = datetime.datetime.now().strftime("%Y%m%d%H%M%S") # get new save_path #use first or last received date ??? - dir_data_uuid = get_dir_data_uuid(uuid, type_meta_header) - dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8]) - dir_full_path_extended_type = os.path.join(dir_full_path, extended_type) - filename = '{}-{}-{}-{}-{}.meta_json.json'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14]) + dir_data_uuid = get_dir_data_uuid(uuid, type_defined) + dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8], extended_type=extended_type) + filename = '{}-{}-{}-{}-{}.{}'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14], json_file_name) save_path = os.path.join(dir_full_path, filename) with open(save_path, 'w') as f: f.write(json.dumps(full_json)) + # get extended_type save_path + filename = '{}-{}-{}-{}-{}.{}'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14], extended_type_name) + save_path = os.path.join(dir_full_path, filename) # change stream_name/type stream_name = stream_defined @@ -167,7 +176,7 @@ if __name__ == "__main__": id = 0 buffer = b'' - # Do the magic on 254 type + # handle 254 type while True: res = redis_server_stream.xread({stream_name: id}, count=1) if res: @@ -177,7 +186,38 @@ if __name__ == "__main__": data = res[0][1][0][1] if id and data: - print(data[b'message']) + # save data on disk + if save_to_file: + new_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + # check if a new rotation is needed + if ( new_date[0:8] != date_file[0:8] ) or ( time.time() - time_file > rotation_save_cycle ): + date_file = new_date + rotate_file = True + + # file rotation + if rotate_file and file_separator in data[b'message']: + end_file, start_new_file = data[b'message'].rsplit(file_separator, maxsplit=1) + # save end of file + with open(save_path, 'ab') as f: + f.write(end_file) + + # get new save_path + dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8], extended_type=extended_type) + filename = '{}-{}-{}-{}-{}.{}'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14], extended_type_name) + save_path = os.path.join(dir_full_path, filename) + + # save start of new file + if start_new_file != b'': + with open(save_path, 'ab') as f: + f.write(start_new_file) + # end of rotation + rotate_file = False + time_file = time.time() + else: + with open(save_path, 'ab') as f: + f.write(data[b'message']) + + redis_server_stream.xdel(stream_name, id) else: # end session, no json received