diff --git a/server/save_data.py b/server/save_data.py index 42bfe44..a5ba63c 100755 --- a/server/save_data.py +++ b/server/save_data.py @@ -10,10 +10,10 @@ import datetime redis_server = redis.StrictRedis( host="localhost", port=6379, - db=0, - decode_responses=True) + db=0) type = 1 +max_timestamp = 60*5 def gzip_file(filepath): with open(filepath, 'rb') as f: @@ -41,12 +41,13 @@ if __name__ == "__main__": id = res[0][1][0][0] data = res[0][1][0][1] if id and data: - print(id) + #print(id.decode()) #print(data) date = datetime.datetime.now().strftime("%Y/%m/%d") - dir_path = os.path.join('data', date, data['uuid']) + dir_path = os.path.join('data', date, data[b'uuid'].decode()) filename = '' + data_timestamp = data[b'timestamp'].decode() try: it = os.scandir(dir_path) @@ -55,9 +56,10 @@ if __name__ == "__main__": filename = entry.name break filepath = os.path.join(dir_path, filename) - if os.path.getsize(filepath) > 5000000: #bytes + + if os.path.getsize(filepath) > 500000000: #bytes gzip_file(filepath) - filename = data['timestamp'] + filename = data_timestamp except FileNotFoundError: os.makedirs(dir_path) @@ -66,12 +68,16 @@ if __name__ == "__main__": # # if not filename: - filename = data['timestamp'] + filename = data_timestamp - with open(os.path.join(dir_path, filename), 'a') as f: - f.write(data['message']) + if int(data_timestamp) - int(filename) > max_timestamp: + gzip_file(filepath) + filename = data_timestamp - redis_server.xack(stream_name, group_name, id) + with open(os.path.join(dir_path, filename), 'ab') as f: + f.write(data[b'message']) + + #redis_server.xack(stream_name, group_name, id) redis_server.xdel(stream_name, id) else: time.sleep(10) diff --git a/server/server.py b/server/server.py index e8e1b15..784339a 100755 --- a/server/server.py +++ b/server/server.py @@ -30,8 +30,7 @@ header_size = 62 redis_server = redis.StrictRedis( host="localhost", port=6379, - db=0, - decode_responses=True) + db=0) class Echo(Protocol, TimeoutMixin): @@ -96,7 +95,7 @@ class Echo(Protocol, TimeoutMixin): #print(next_data) self.process_d4_data(data, data_header, ip) # process next d4 header - self.process_header(next_data) + self.process_header(next_data, ip, source_port) # data_header['size'] > (len(data) - header_size) # buffer the data else: @@ -138,7 +137,7 @@ class Echo(Protocol, TimeoutMixin): #print(data) #print() self.buffer = b'' - self.process_header(data) + self.process_header(data, ip, source_port) def process_d4_data(self, data, data_header, ip): # empty buffer