diff --git a/server/LAUNCH.sh b/server/LAUNCH.sh index 58cad7f..c31265b 100755 --- a/server/LAUNCH.sh +++ b/server/LAUNCH.sh @@ -72,11 +72,13 @@ function launching_workers { sleep 0.1 echo -e $GREEN"\t* Launching D4 Workers"$DEFAULT - screen -S "Workers_D4" -X screen -t "1_workers_manager" bash -c "cd ${D4_HOME}/workers/workers_1; ./workers_manager.py; read x" + screen -S "Workers_D4" -X screen -t "1_workers" bash -c "cd ${D4_HOME}/workers/workers_1; ./workers_manager.py; read x" sleep 0.1 - screen -S "Workers_D4" -X screen -t "4_workers_manager" bash -c "cd ${D4_HOME}/workers/workers_4; ./workers_manager.py; read x" + screen -S "Workers_D4" -X screen -t "2_workers" bash -c "cd ${D4_HOME}/workers/workers_2; ./workers_manager.py; read x" sleep 0.1 - screen -S "Workers_D4" -X screen -t "8_workers_manager" bash -c "cd ${D4_HOME}/workers/workers_8; ./workers_manager.py; read x" + screen -S "Workers_D4" -X screen -t "4_workers" bash -c "cd ${D4_HOME}/workers/workers_4; ./workers_manager.py; read x" + sleep 0.1 + screen -S "Workers_D4" -X screen -t "8_workers" bash -c "cd ${D4_HOME}/workers/workers_8; ./workers_manager.py; read x" sleep 0.1 } diff --git a/server/server.py b/server/server.py index fe5893a..0d063e7 100755 --- a/server/server.py +++ b/server/server.py @@ -69,7 +69,7 @@ redis_server_metadata.delete('server:accepted_type') for type in accepted_type: redis_server_metadata.sadd('server:accepted_type', type) redis_server_metadata.delete('server:accepted_extended_type') -for type in accepted_type: +for type in accepted_extended_type: redis_server_metadata.sadd('server:accepted_extended_type', type) class D4_Server(Protocol, TimeoutMixin): diff --git a/server/workers/workers_2/worker.py b/server/workers/workers_2/worker.py index 8c200da..7e7ae36 100755 --- a/server/workers/workers_2/worker.py +++ b/server/workers/workers_2/worker.py @@ -30,6 +30,9 @@ max_buffer_length = 100000 save_to_file = True +def get_dir_data_uuid(uuid, type): + return os.path.join('../../data', uuid, str(type)) + def get_save_dir(dir_data_uuid, year, month, day): dir_path = os.path.join(dir_data_uuid, year, month, day) if not os.path.isdir(dir_path): @@ -84,18 +87,9 @@ if __name__ == "__main__": res = redis_server_stream.xread({stream_name: id}, count=1) if res: uuid = res[0][1][0][1][b'uuid'].decode() - # init file rotation - if save_to_file: - rotate_file = False - time_file = time.time() - date_file = datetime.datetime.now().strftime("%Y%m%d%H%M%S") - dir_data_uuid = os.path.join('../../data', uuid, str(type)) - dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8]) - filename = '{}-{}-{}-{}-{}.meta_json.txt'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14]) - save_path = os.path.join(dir_full_path, filename) - print('---- worker launched, uuid={} session_uuid={}'.format(uuid, session_uuid)) else: + clean_db(session_uuid) print('Incorrect Stream, Closing worker: type={} session_uuid={}'.format(type, session_uuid)) sys.exit(1) @@ -112,12 +106,16 @@ if __name__ == "__main__": data = res[0][1][0][1] if id and data: + # remove line from json + data[b'message'] = data[b'message'].replace(b'\n', b'') + # reconstruct data if buffer != b'': - data[b'message'] = b'{}{}'.format(buffer, data[b'message']) + data[b'message'] = b''.join([buffer, data[b'message']]) buffer = b'' try: - full_json = json.loads() + full_json = json.loads(data[b'message'].decode()) + print(full_json['type']) except: buffer += data[b'message'] # # TODO: filter too big json @@ -125,6 +123,7 @@ if __name__ == "__main__": # complete json received if full_json: + print(full_json) if check_json_file(full_json): break # Incorrect Json @@ -143,19 +142,24 @@ if __name__ == "__main__": extended_type = full_json['type'] if not redis_server_metadata.sismember('server:accepted_extended_type', extended_type): error_mess = 'Unsupported extended_type: {}'.format(extended_type) - on_error(session_uuid, type_error, error_mess) + on_error(session_uuid, type, error_mess) print(error_mess) clean_db(session_uuid) sys.exit(1) # save json on disk if save_to_file: + rotate_file = False + time_file = time.time() + date_file = datetime.datetime.now().strftime("%Y%m%d%H%M%S") # get new save_path #use first or last received date ??? + dir_data_uuid = get_dir_data_uuid(uuid, type_meta_header) dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8]) - filename = '{}-{}-{}-{}-{}.meta_json.txt'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14]) + dir_full_path_extended_type = os.path.join(dir_full_path, extended_type) + filename = '{}-{}-{}-{}-{}.meta_json.json'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14]) save_path = os.path.join(dir_full_path, filename) with open(save_path, 'w') as f: - f.write(full_json) + f.write(json.dumps(full_json)) # change stream_name/type stream_name = stream_defined @@ -173,13 +177,13 @@ if __name__ == "__main__": data = res[0][1][0][1] if id and data: - print(data) + print(data[b'message']) else: # end session, no json received if redis_server_stream.sismember('ended_session', session_uuid): clean_db(session_uuid) - print('---- Incomplete JSON object, DONE, uuid={} session_uuid={}'.format(uuid, session_uuid)) + print('---- JSON object, DONE, uuid={} session_uuid={}'.format(uuid, session_uuid)) sys.exit(0) else: time.sleep(10)