mirror of https://github.com/D4-project/d4-core
				
				
				
			
		
			
				
	
	
		
			84 lines
		
	
	
		
			2.5 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
			
		
		
	
	
			84 lines
		
	
	
		
			2.5 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
| #!/usr/bin/env python3
 | |
| 
 | |
| import os
 | |
| import sys
 | |
| import redis
 | |
| import time
 | |
| import gzip
 | |
| import datetime
 | |
| 
 | |
| redis_server = redis.StrictRedis(
 | |
|                     host="localhost",
 | |
|                     port=6379,
 | |
|                     db=0)
 | |
| 
 | |
| type = 1
 | |
| max_timestamp = 60*5
 | |
| 
 | |
| def gzip_file(filepath):
 | |
|     with open(filepath, 'rb') as f:
 | |
|         content = f.read()
 | |
|     with gzip.open(filepath+'.gz', 'wb') as f2:
 | |
|         f2.write(content)
 | |
|     os.remove(filepath)
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     stream_name = 'stream:{}'.format(type)
 | |
| 
 | |
|     #group_name = 'group_stream:{}'.format(type)
 | |
|     #try:
 | |
|     #    redis_server.xgroup_create(stream_name, group_name)
 | |
|     #except:
 | |
|     #    pass
 | |
| 
 | |
|     while True:
 | |
| 
 | |
|         #print(redis_server.xpending(stream_name, group_name))
 | |
| 
 | |
|         #res = redis_server.xreadgroup(group_name, 'consumername', {stream_name: '>'}, count=1)
 | |
|         res = redis_server.xread({stream_name: '0'}, count=1, block=500)
 | |
|         if res:
 | |
|             id = res[0][1][0][0]
 | |
|             data = res[0][1][0][1]
 | |
|             if id and data:
 | |
|                 #print(id.decode())
 | |
|                 #print(data)
 | |
| 
 | |
|                 date = datetime.datetime.now().strftime("%Y/%m/%d")
 | |
|                 dir_path = os.path.join('data', date, data[b'uuid'].decode())
 | |
|                 filename = ''
 | |
|                 data_timestamp = data[b'timestamp'].decode()
 | |
| 
 | |
|                 try:
 | |
|                     it = os.scandir(dir_path)
 | |
|                     for entry in it:
 | |
|                         if not entry.name.endswith(".gz") and entry.is_file():
 | |
|                             filename = entry.name
 | |
|                             break
 | |
|                     filepath = os.path.join(dir_path, filename)
 | |
| 
 | |
|                     #if os.path.getsize(filepath) > 500000000: #bytes
 | |
|                     #    gzip_file(filepath)
 | |
|                     #    filename = data_timestamp
 | |
| 
 | |
|                 except FileNotFoundError:
 | |
|                     os.makedirs(dir_path)
 | |
|                 # # TODO:  use contexte manager in python 3.6
 | |
|                 it = []
 | |
|                 # #
 | |
| 
 | |
|                 if not filename:
 | |
|                     filename = data_timestamp
 | |
| 
 | |
|                 if int(data_timestamp) - int(filename) > max_timestamp:
 | |
|                     gzip_file(filepath)
 | |
|                     filename = data_timestamp
 | |
| 
 | |
|                 with open(os.path.join(dir_path, filename), 'ab') as f:
 | |
|                     f.write(data[b'message'])
 | |
| 
 | |
|                 #redis_server.xack(stream_name, group_name, id)
 | |
|                 redis_server.xdel(stream_name, id)
 | |
|         else:
 | |
|             time.sleep(10)
 |