chg: [server] use redis stream + save file on disk + file rotation (max_size)

pull/23/head
Terrtia 2019-01-07 16:11:04 +01:00
parent 81044af438
commit 9fd028f17f
No known key found for this signature in database
GPG Key ID: 1E1B1F50D84613D0
2 changed files with 92 additions and 9 deletions

77
server/save_data.py Executable file
View File

@ -0,0 +1,77 @@
#!/usr/bin/env python3
import os
import sys
import redis
import time
import gzip
import datetime
redis_server = redis.StrictRedis(
host="localhost",
port=6379,
db=0,
decode_responses=True)
type = 1
def gzip_file(filepath):
with open(filepath, 'rb') as f:
content = f.read()
with gzip.open(filepath+'.gz', 'wb') as f2:
f2.write(content)
os.remove(filepath)
if __name__ == "__main__":
stream_name = 'stream:{}'.format(type)
#group_name = 'group_stream:{}'.format(type)
#try:
# redis_server.xgroup_create(stream_name, group_name)
#except:
# pass
while True:
#print(redis_server.xpending(stream_name, group_name))
#res = redis_server.xreadgroup(group_name, 'consumername', {stream_name: '>'}, count=1)
res = redis_server.xread({stream_name: '0'}, count=1, block=500)
if res:
id = res[0][1][0][0]
data = res[0][1][0][1]
if id and data:
print(id)
#print(data)
date = datetime.datetime.now().strftime("%Y/%m/%d")
dir_path = os.path.join('data', date, data['uuid'])
filename = ''
try:
it = os.scandir(dir_path)
for entry in it:
if not entry.name.endswith(".gz") and entry.is_file():
filename = entry.name
break
filepath = os.path.join(dir_path, filename)
if os.path.getsize(filepath) > 5000000: #bytes
gzip_file(filepath)
filename = data['timestamp']
except FileNotFoundError:
os.makedirs(dir_path)
# # TODO: use contexte manager in python 3.6
it = []
# #
if not filename:
filename = data['timestamp']
with open(os.path.join(dir_path, filename), 'a') as f:
f.write(data['message'])
redis_server.xack(stream_name, group_name, id)
redis_server.xdel(stream_name, id)
else:
time.sleep(10)

View File

@ -7,6 +7,7 @@ import stat
import redis import redis
import struct import struct
import time import time
import datetime
from twisted.internet import ssl, task, protocol, endpoints, defer from twisted.internet import ssl, task, protocol, endpoints, defer
from twisted.python import log from twisted.python import log
@ -40,8 +41,10 @@ class Echo(Protocol, TimeoutMixin):
def dataReceived(self, data): def dataReceived(self, data):
self.resetTimeout() self.resetTimeout()
self.process_header(data) ip, source_port = self.transport.client
#print(self.transport.client) #print(ip)
#print(source_port)
self.process_header(data, ip, source_port)
def timeoutConnection(self): def timeoutConnection(self):
#print('timeout') #print('timeout')
@ -75,14 +78,14 @@ class Echo(Protocol, TimeoutMixin):
else: else:
return False return False
def process_header(self, data): def process_header(self, data, ip, source_port):
if not self.buffer: if not self.buffer:
data_header = self.unpack_header(data) data_header = self.unpack_header(data)
if data_header: if data_header:
if self.is_valid_header(data_header['uuid_header']): if self.is_valid_header(data_header['uuid_header']):
# check data size # check data size
if data_header['size'] == (len(data) - header_size): if data_header['size'] == (len(data) - header_size):
self.process_d4_data(data, data_header) self.process_d4_data(data, data_header, ip)
# multiple d4 headers # multiple d4 headers
elif data_header['size'] < (len(data) - header_size): elif data_header['size'] < (len(data) - header_size):
next_data = data[data_header['size'] + header_size:] next_data = data[data_header['size'] + header_size:]
@ -91,7 +94,7 @@ class Echo(Protocol, TimeoutMixin):
#print(data) #print(data)
#print() #print()
#print(next_data) #print(next_data)
self.process_d4_data(data, data_header) self.process_d4_data(data, data_header, ip)
# process next d4 header # process next d4 header
self.process_header(next_data) self.process_header(next_data)
# data_header['size'] > (len(data) - header_size) # data_header['size'] > (len(data) - header_size)
@ -137,7 +140,7 @@ class Echo(Protocol, TimeoutMixin):
self.buffer = b'' self.buffer = b''
self.process_header(data) self.process_header(data)
def process_d4_data(self, data, data_header): def process_d4_data(self, data, data_header, ip):
# empty buffer # empty buffer
self.buffer = b'' self.buffer = b''
# set hmac_header to 0 # set hmac_header to 0
@ -158,9 +161,12 @@ class Echo(Protocol, TimeoutMixin):
if data_header['hmac_header'] == HMAC.hexdigest(): if data_header['hmac_header'] == HMAC.hexdigest():
#print('hmac match') #print('hmac match')
#redis_server.xadd('stream:{}'.format(data_header['type']), {'message': data[header_size:], 'uuid': data_header['uuid_header'], 'timestamp': data_header['timestamp'], 'version': data_header['version']}) date = datetime.datetime.now().strftime("%Y%m%d")
with open(data_header['uuid_header'], 'ab') as f: redis_server.xadd('stream:{}'.format(data_header['type']), {'message': data[header_size:], 'uuid': data_header['uuid_header'], 'timestamp': data_header['timestamp'], 'version': data_header['version']})
f.write(data[header_size:]) redis_server.sadd('daily_uuid:{}'.format(date), data_header['uuid_header'])
redis_server.zincrby('stat_uuid_ip:{}:{}'.format(date, data_header['uuid_header']), 1, ip)
#with open(data_header['uuid_header'], 'ab') as f:
# f.write(data[header_size:])
else: else:
print('hmac do not match') print('hmac do not match')
print(data) print(data)