mirror of https://github.com/D4-project/d4-core
chg: [worker 2] save 254 with file rotation
parent
7444bcdf7b
commit
10a335cdf2
|
@ -27,14 +27,20 @@ redis_server_metadata = redis.StrictRedis(
|
||||||
type_meta_header = 2
|
type_meta_header = 2
|
||||||
type_defined = 254
|
type_defined = 254
|
||||||
max_buffer_length = 100000
|
max_buffer_length = 100000
|
||||||
|
rotation_save_cycle = 10 #seconds
|
||||||
|
|
||||||
|
json_file_name = 'meta_json.json'
|
||||||
|
extended_type_name = None # # TODO: use default or json['file_type']
|
||||||
|
|
||||||
save_to_file = True
|
save_to_file = True
|
||||||
|
|
||||||
def get_dir_data_uuid(uuid, type):
|
def get_dir_data_uuid(uuid, type):
|
||||||
return os.path.join('../../data', uuid, str(type))
|
return os.path.join('../../data', uuid, str(type))
|
||||||
|
|
||||||
def get_save_dir(dir_data_uuid, year, month, day):
|
def get_save_dir(dir_data_uuid, year, month, day, extended_type=None):
|
||||||
dir_path = os.path.join(dir_data_uuid, year, month, day)
|
dir_path = os.path.join(dir_data_uuid, year, month, day)
|
||||||
|
if extended_type:
|
||||||
|
dir_path = os.path.join(dir_path, extended_type)
|
||||||
if not os.path.isdir(dir_path):
|
if not os.path.isdir(dir_path):
|
||||||
os.makedirs(dir_path)
|
os.makedirs(dir_path)
|
||||||
return dir_path
|
return dir_path
|
||||||
|
@ -115,7 +121,6 @@ if __name__ == "__main__":
|
||||||
buffer = b''
|
buffer = b''
|
||||||
try:
|
try:
|
||||||
full_json = json.loads(data[b'message'].decode())
|
full_json = json.loads(data[b'message'].decode())
|
||||||
print(full_json['type'])
|
|
||||||
except:
|
except:
|
||||||
buffer += data[b'message']
|
buffer += data[b'message']
|
||||||
# # TODO: filter too big json
|
# # TODO: filter too big json
|
||||||
|
@ -143,23 +148,27 @@ if __name__ == "__main__":
|
||||||
if not redis_server_metadata.sismember('server:accepted_extended_type', extended_type):
|
if not redis_server_metadata.sismember('server:accepted_extended_type', extended_type):
|
||||||
error_mess = 'Unsupported extended_type: {}'.format(extended_type)
|
error_mess = 'Unsupported extended_type: {}'.format(extended_type)
|
||||||
on_error(session_uuid, type, error_mess)
|
on_error(session_uuid, type, error_mess)
|
||||||
print(error_mess)
|
|
||||||
clean_db(session_uuid)
|
clean_db(session_uuid)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
file_separator = b'/n' ## TODO: map all file separator or extract from json
|
||||||
|
extended_type_name = '{}.txt'.format(extended_type) # # TODO: create default or extract from JSON
|
||||||
|
|
||||||
# save json on disk
|
# save json on disk
|
||||||
if save_to_file:
|
if save_to_file:
|
||||||
rotate_file = False
|
rotate_file = False
|
||||||
time_file = time.time()
|
time_file = time.time()
|
||||||
date_file = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
date_file = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
||||||
# get new save_path #use first or last received date ???
|
# get new save_path #use first or last received date ???
|
||||||
dir_data_uuid = get_dir_data_uuid(uuid, type_meta_header)
|
dir_data_uuid = get_dir_data_uuid(uuid, type_defined)
|
||||||
dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8])
|
dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8], extended_type=extended_type)
|
||||||
dir_full_path_extended_type = os.path.join(dir_full_path, extended_type)
|
filename = '{}-{}-{}-{}-{}.{}'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14], json_file_name)
|
||||||
filename = '{}-{}-{}-{}-{}.meta_json.json'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14])
|
|
||||||
save_path = os.path.join(dir_full_path, filename)
|
save_path = os.path.join(dir_full_path, filename)
|
||||||
with open(save_path, 'w') as f:
|
with open(save_path, 'w') as f:
|
||||||
f.write(json.dumps(full_json))
|
f.write(json.dumps(full_json))
|
||||||
|
# get extended_type save_path
|
||||||
|
filename = '{}-{}-{}-{}-{}.{}'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14], extended_type_name)
|
||||||
|
save_path = os.path.join(dir_full_path, filename)
|
||||||
|
|
||||||
# change stream_name/type
|
# change stream_name/type
|
||||||
stream_name = stream_defined
|
stream_name = stream_defined
|
||||||
|
@ -167,7 +176,7 @@ if __name__ == "__main__":
|
||||||
id = 0
|
id = 0
|
||||||
buffer = b''
|
buffer = b''
|
||||||
|
|
||||||
# Do the magic on 254 type
|
# handle 254 type
|
||||||
while True:
|
while True:
|
||||||
res = redis_server_stream.xread({stream_name: id}, count=1)
|
res = redis_server_stream.xread({stream_name: id}, count=1)
|
||||||
if res:
|
if res:
|
||||||
|
@ -177,7 +186,38 @@ if __name__ == "__main__":
|
||||||
data = res[0][1][0][1]
|
data = res[0][1][0][1]
|
||||||
|
|
||||||
if id and data:
|
if id and data:
|
||||||
print(data[b'message'])
|
# save data on disk
|
||||||
|
if save_to_file:
|
||||||
|
new_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
||||||
|
# check if a new rotation is needed
|
||||||
|
if ( new_date[0:8] != date_file[0:8] ) or ( time.time() - time_file > rotation_save_cycle ):
|
||||||
|
date_file = new_date
|
||||||
|
rotate_file = True
|
||||||
|
|
||||||
|
# file rotation
|
||||||
|
if rotate_file and file_separator in data[b'message']:
|
||||||
|
end_file, start_new_file = data[b'message'].rsplit(file_separator, maxsplit=1)
|
||||||
|
# save end of file
|
||||||
|
with open(save_path, 'ab') as f:
|
||||||
|
f.write(end_file)
|
||||||
|
|
||||||
|
# get new save_path
|
||||||
|
dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8], extended_type=extended_type)
|
||||||
|
filename = '{}-{}-{}-{}-{}.{}'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14], extended_type_name)
|
||||||
|
save_path = os.path.join(dir_full_path, filename)
|
||||||
|
|
||||||
|
# save start of new file
|
||||||
|
if start_new_file != b'':
|
||||||
|
with open(save_path, 'ab') as f:
|
||||||
|
f.write(start_new_file)
|
||||||
|
# end of rotation
|
||||||
|
rotate_file = False
|
||||||
|
time_file = time.time()
|
||||||
|
else:
|
||||||
|
with open(save_path, 'ab') as f:
|
||||||
|
f.write(data[b'message'])
|
||||||
|
|
||||||
|
redis_server_stream.xdel(stream_name, id)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# end session, no json received
|
# end session, no json received
|
||||||
|
|
Loading…
Reference in New Issue