mirror of https://github.com/D4-project/d4-core
chg: [server] support binary, add time rotation
parent
9fd028f17f
commit
57981dd452
|
@ -10,10 +10,10 @@ import datetime
|
|||
redis_server = redis.StrictRedis(
|
||||
host="localhost",
|
||||
port=6379,
|
||||
db=0,
|
||||
decode_responses=True)
|
||||
db=0)
|
||||
|
||||
type = 1
|
||||
max_timestamp = 60*5
|
||||
|
||||
def gzip_file(filepath):
|
||||
with open(filepath, 'rb') as f:
|
||||
|
@ -41,12 +41,13 @@ if __name__ == "__main__":
|
|||
id = res[0][1][0][0]
|
||||
data = res[0][1][0][1]
|
||||
if id and data:
|
||||
print(id)
|
||||
#print(id.decode())
|
||||
#print(data)
|
||||
|
||||
date = datetime.datetime.now().strftime("%Y/%m/%d")
|
||||
dir_path = os.path.join('data', date, data['uuid'])
|
||||
dir_path = os.path.join('data', date, data[b'uuid'].decode())
|
||||
filename = ''
|
||||
data_timestamp = data[b'timestamp'].decode()
|
||||
|
||||
try:
|
||||
it = os.scandir(dir_path)
|
||||
|
@ -55,9 +56,10 @@ if __name__ == "__main__":
|
|||
filename = entry.name
|
||||
break
|
||||
filepath = os.path.join(dir_path, filename)
|
||||
if os.path.getsize(filepath) > 5000000: #bytes
|
||||
|
||||
if os.path.getsize(filepath) > 500000000: #bytes
|
||||
gzip_file(filepath)
|
||||
filename = data['timestamp']
|
||||
filename = data_timestamp
|
||||
|
||||
except FileNotFoundError:
|
||||
os.makedirs(dir_path)
|
||||
|
@ -66,12 +68,16 @@ if __name__ == "__main__":
|
|||
# #
|
||||
|
||||
if not filename:
|
||||
filename = data['timestamp']
|
||||
filename = data_timestamp
|
||||
|
||||
with open(os.path.join(dir_path, filename), 'a') as f:
|
||||
f.write(data['message'])
|
||||
if int(data_timestamp) - int(filename) > max_timestamp:
|
||||
gzip_file(filepath)
|
||||
filename = data_timestamp
|
||||
|
||||
redis_server.xack(stream_name, group_name, id)
|
||||
with open(os.path.join(dir_path, filename), 'ab') as f:
|
||||
f.write(data[b'message'])
|
||||
|
||||
#redis_server.xack(stream_name, group_name, id)
|
||||
redis_server.xdel(stream_name, id)
|
||||
else:
|
||||
time.sleep(10)
|
||||
|
|
|
@ -30,8 +30,7 @@ header_size = 62
|
|||
redis_server = redis.StrictRedis(
|
||||
host="localhost",
|
||||
port=6379,
|
||||
db=0,
|
||||
decode_responses=True)
|
||||
db=0)
|
||||
|
||||
class Echo(Protocol, TimeoutMixin):
|
||||
|
||||
|
@ -96,7 +95,7 @@ class Echo(Protocol, TimeoutMixin):
|
|||
#print(next_data)
|
||||
self.process_d4_data(data, data_header, ip)
|
||||
# process next d4 header
|
||||
self.process_header(next_data)
|
||||
self.process_header(next_data, ip, source_port)
|
||||
# data_header['size'] > (len(data) - header_size)
|
||||
# buffer the data
|
||||
else:
|
||||
|
@ -138,7 +137,7 @@ class Echo(Protocol, TimeoutMixin):
|
|||
#print(data)
|
||||
#print()
|
||||
self.buffer = b''
|
||||
self.process_header(data)
|
||||
self.process_header(data, ip, source_port)
|
||||
|
||||
def process_d4_data(self, data, data_header, ip):
|
||||
# empty buffer
|
||||
|
|
Loading…
Reference in New Issue