mirror of https://github.com/D4-project/d4-core
chg: [worker 2] basic worker 2, save json on disk
parent
bd50fea4ef
commit
25d11be213
|
@ -72,11 +72,13 @@ function launching_workers {
|
||||||
sleep 0.1
|
sleep 0.1
|
||||||
echo -e $GREEN"\t* Launching D4 Workers"$DEFAULT
|
echo -e $GREEN"\t* Launching D4 Workers"$DEFAULT
|
||||||
|
|
||||||
screen -S "Workers_D4" -X screen -t "1_workers_manager" bash -c "cd ${D4_HOME}/workers/workers_1; ./workers_manager.py; read x"
|
screen -S "Workers_D4" -X screen -t "1_workers" bash -c "cd ${D4_HOME}/workers/workers_1; ./workers_manager.py; read x"
|
||||||
sleep 0.1
|
sleep 0.1
|
||||||
screen -S "Workers_D4" -X screen -t "4_workers_manager" bash -c "cd ${D4_HOME}/workers/workers_4; ./workers_manager.py; read x"
|
screen -S "Workers_D4" -X screen -t "2_workers" bash -c "cd ${D4_HOME}/workers/workers_2; ./workers_manager.py; read x"
|
||||||
sleep 0.1
|
sleep 0.1
|
||||||
screen -S "Workers_D4" -X screen -t "8_workers_manager" bash -c "cd ${D4_HOME}/workers/workers_8; ./workers_manager.py; read x"
|
screen -S "Workers_D4" -X screen -t "4_workers" bash -c "cd ${D4_HOME}/workers/workers_4; ./workers_manager.py; read x"
|
||||||
|
sleep 0.1
|
||||||
|
screen -S "Workers_D4" -X screen -t "8_workers" bash -c "cd ${D4_HOME}/workers/workers_8; ./workers_manager.py; read x"
|
||||||
sleep 0.1
|
sleep 0.1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -69,7 +69,7 @@ redis_server_metadata.delete('server:accepted_type')
|
||||||
for type in accepted_type:
|
for type in accepted_type:
|
||||||
redis_server_metadata.sadd('server:accepted_type', type)
|
redis_server_metadata.sadd('server:accepted_type', type)
|
||||||
redis_server_metadata.delete('server:accepted_extended_type')
|
redis_server_metadata.delete('server:accepted_extended_type')
|
||||||
for type in accepted_type:
|
for type in accepted_extended_type:
|
||||||
redis_server_metadata.sadd('server:accepted_extended_type', type)
|
redis_server_metadata.sadd('server:accepted_extended_type', type)
|
||||||
|
|
||||||
class D4_Server(Protocol, TimeoutMixin):
|
class D4_Server(Protocol, TimeoutMixin):
|
||||||
|
|
|
@ -30,6 +30,9 @@ max_buffer_length = 100000
|
||||||
|
|
||||||
save_to_file = True
|
save_to_file = True
|
||||||
|
|
||||||
|
def get_dir_data_uuid(uuid, type):
|
||||||
|
return os.path.join('../../data', uuid, str(type))
|
||||||
|
|
||||||
def get_save_dir(dir_data_uuid, year, month, day):
|
def get_save_dir(dir_data_uuid, year, month, day):
|
||||||
dir_path = os.path.join(dir_data_uuid, year, month, day)
|
dir_path = os.path.join(dir_data_uuid, year, month, day)
|
||||||
if not os.path.isdir(dir_path):
|
if not os.path.isdir(dir_path):
|
||||||
|
@ -84,18 +87,9 @@ if __name__ == "__main__":
|
||||||
res = redis_server_stream.xread({stream_name: id}, count=1)
|
res = redis_server_stream.xread({stream_name: id}, count=1)
|
||||||
if res:
|
if res:
|
||||||
uuid = res[0][1][0][1][b'uuid'].decode()
|
uuid = res[0][1][0][1][b'uuid'].decode()
|
||||||
# init file rotation
|
|
||||||
if save_to_file:
|
|
||||||
rotate_file = False
|
|
||||||
time_file = time.time()
|
|
||||||
date_file = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
|
||||||
dir_data_uuid = os.path.join('../../data', uuid, str(type))
|
|
||||||
dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8])
|
|
||||||
filename = '{}-{}-{}-{}-{}.meta_json.txt'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14])
|
|
||||||
save_path = os.path.join(dir_full_path, filename)
|
|
||||||
|
|
||||||
print('---- worker launched, uuid={} session_uuid={}'.format(uuid, session_uuid))
|
print('---- worker launched, uuid={} session_uuid={}'.format(uuid, session_uuid))
|
||||||
else:
|
else:
|
||||||
|
clean_db(session_uuid)
|
||||||
print('Incorrect Stream, Closing worker: type={} session_uuid={}'.format(type, session_uuid))
|
print('Incorrect Stream, Closing worker: type={} session_uuid={}'.format(type, session_uuid))
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
@ -112,12 +106,16 @@ if __name__ == "__main__":
|
||||||
data = res[0][1][0][1]
|
data = res[0][1][0][1]
|
||||||
|
|
||||||
if id and data:
|
if id and data:
|
||||||
|
# remove line from json
|
||||||
|
data[b'message'] = data[b'message'].replace(b'\n', b'')
|
||||||
|
|
||||||
# reconstruct data
|
# reconstruct data
|
||||||
if buffer != b'':
|
if buffer != b'':
|
||||||
data[b'message'] = b'{}{}'.format(buffer, data[b'message'])
|
data[b'message'] = b''.join([buffer, data[b'message']])
|
||||||
buffer = b''
|
buffer = b''
|
||||||
try:
|
try:
|
||||||
full_json = json.loads()
|
full_json = json.loads(data[b'message'].decode())
|
||||||
|
print(full_json['type'])
|
||||||
except:
|
except:
|
||||||
buffer += data[b'message']
|
buffer += data[b'message']
|
||||||
# # TODO: filter too big json
|
# # TODO: filter too big json
|
||||||
|
@ -125,6 +123,7 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
# complete json received
|
# complete json received
|
||||||
if full_json:
|
if full_json:
|
||||||
|
print(full_json)
|
||||||
if check_json_file(full_json):
|
if check_json_file(full_json):
|
||||||
break
|
break
|
||||||
# Incorrect Json
|
# Incorrect Json
|
||||||
|
@ -143,19 +142,24 @@ if __name__ == "__main__":
|
||||||
extended_type = full_json['type']
|
extended_type = full_json['type']
|
||||||
if not redis_server_metadata.sismember('server:accepted_extended_type', extended_type):
|
if not redis_server_metadata.sismember('server:accepted_extended_type', extended_type):
|
||||||
error_mess = 'Unsupported extended_type: {}'.format(extended_type)
|
error_mess = 'Unsupported extended_type: {}'.format(extended_type)
|
||||||
on_error(session_uuid, type_error, error_mess)
|
on_error(session_uuid, type, error_mess)
|
||||||
print(error_mess)
|
print(error_mess)
|
||||||
clean_db(session_uuid)
|
clean_db(session_uuid)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
# save json on disk
|
# save json on disk
|
||||||
if save_to_file:
|
if save_to_file:
|
||||||
|
rotate_file = False
|
||||||
|
time_file = time.time()
|
||||||
|
date_file = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
||||||
# get new save_path #use first or last received date ???
|
# get new save_path #use first or last received date ???
|
||||||
|
dir_data_uuid = get_dir_data_uuid(uuid, type_meta_header)
|
||||||
dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8])
|
dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8])
|
||||||
filename = '{}-{}-{}-{}-{}.meta_json.txt'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14])
|
dir_full_path_extended_type = os.path.join(dir_full_path, extended_type)
|
||||||
|
filename = '{}-{}-{}-{}-{}.meta_json.json'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14])
|
||||||
save_path = os.path.join(dir_full_path, filename)
|
save_path = os.path.join(dir_full_path, filename)
|
||||||
with open(save_path, 'w') as f:
|
with open(save_path, 'w') as f:
|
||||||
f.write(full_json)
|
f.write(json.dumps(full_json))
|
||||||
|
|
||||||
# change stream_name/type
|
# change stream_name/type
|
||||||
stream_name = stream_defined
|
stream_name = stream_defined
|
||||||
|
@ -173,13 +177,13 @@ if __name__ == "__main__":
|
||||||
data = res[0][1][0][1]
|
data = res[0][1][0][1]
|
||||||
|
|
||||||
if id and data:
|
if id and data:
|
||||||
print(data)
|
print(data[b'message'])
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# end session, no json received
|
# end session, no json received
|
||||||
if redis_server_stream.sismember('ended_session', session_uuid):
|
if redis_server_stream.sismember('ended_session', session_uuid):
|
||||||
clean_db(session_uuid)
|
clean_db(session_uuid)
|
||||||
print('---- Incomplete JSON object, DONE, uuid={} session_uuid={}'.format(uuid, session_uuid))
|
print('---- JSON object, DONE, uuid={} session_uuid={}'.format(uuid, session_uuid))
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
else:
|
else:
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
|
|
Loading…
Reference in New Issue