mirror of https://github.com/D4-project/d4-core
84 lines
2.5 KiB
Python
Executable File
84 lines
2.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import sys
|
|
import redis
|
|
import time
|
|
import gzip
|
|
import datetime
|
|
|
|
redis_server = redis.StrictRedis(
|
|
host="localhost",
|
|
port=6379,
|
|
db=0)
|
|
|
|
type = 1
|
|
max_timestamp = 60*5
|
|
|
|
def gzip_file(filepath):
|
|
with open(filepath, 'rb') as f:
|
|
content = f.read()
|
|
with gzip.open(filepath+'.gz', 'wb') as f2:
|
|
f2.write(content)
|
|
os.remove(filepath)
|
|
|
|
if __name__ == "__main__":
|
|
stream_name = 'stream:{}'.format(type)
|
|
|
|
#group_name = 'group_stream:{}'.format(type)
|
|
#try:
|
|
# redis_server.xgroup_create(stream_name, group_name)
|
|
#except:
|
|
# pass
|
|
|
|
while True:
|
|
|
|
#print(redis_server.xpending(stream_name, group_name))
|
|
|
|
#res = redis_server.xreadgroup(group_name, 'consumername', {stream_name: '>'}, count=1)
|
|
res = redis_server.xread({stream_name: '0'}, count=1, block=500)
|
|
if res:
|
|
id = res[0][1][0][0]
|
|
data = res[0][1][0][1]
|
|
if id and data:
|
|
#print(id.decode())
|
|
#print(data)
|
|
|
|
date = datetime.datetime.now().strftime("%Y/%m/%d")
|
|
dir_path = os.path.join('data', date, data[b'uuid'].decode())
|
|
filename = ''
|
|
data_timestamp = data[b'timestamp'].decode()
|
|
|
|
try:
|
|
it = os.scandir(dir_path)
|
|
for entry in it:
|
|
if not entry.name.endswith(".gz") and entry.is_file():
|
|
filename = entry.name
|
|
break
|
|
filepath = os.path.join(dir_path, filename)
|
|
|
|
#if os.path.getsize(filepath) > 500000000: #bytes
|
|
# gzip_file(filepath)
|
|
# filename = data_timestamp
|
|
|
|
except FileNotFoundError:
|
|
os.makedirs(dir_path)
|
|
# # TODO: use contexte manager in python 3.6
|
|
it = []
|
|
# #
|
|
|
|
if not filename:
|
|
filename = data_timestamp
|
|
|
|
if int(data_timestamp) - int(filename) > max_timestamp:
|
|
gzip_file(filepath)
|
|
filename = data_timestamp
|
|
|
|
with open(os.path.join(dir_path, filename), 'ab') as f:
|
|
f.write(data[b'message'])
|
|
|
|
#redis_server.xack(stream_name, group_name, id)
|
|
redis_server.xdel(stream_name, id)
|
|
else:
|
|
time.sleep(10)
|