mirror of https://github.com/D4-project/d4-core
Merge pull request #9 from D4-project/metatypes
chg: [254 default module] add compress file + send data to analyzersvisu-type
commit
6bdd70f55c
|
@ -4,12 +4,14 @@ import os
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
import json
|
import json
|
||||||
|
import gzip
|
||||||
import redis
|
import redis
|
||||||
|
import shutil
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
DEFAULT_FILE_EXTENSION = 'txt'
|
DEFAULT_FILE_EXTENSION = 'txt'
|
||||||
DEFAULT_FILE_SEPARATOR = b'\n'
|
DEFAULT_FILE_SEPARATOR = b'\n'
|
||||||
ROTATION_SAVE_CYCLE = 5 # seconds
|
ROTATION_SAVE_CYCLE = 300 # seconds
|
||||||
TYPE = 254
|
TYPE = 254
|
||||||
|
|
||||||
class MetaTypesDefault:
|
class MetaTypesDefault:
|
||||||
|
@ -85,6 +87,7 @@ class MetaTypesDefault:
|
||||||
# save end of file
|
# save end of file
|
||||||
with open(self.get_save_path(), 'ab') as f:
|
with open(self.get_save_path(), 'ab') as f:
|
||||||
f.write(end_file)
|
f.write(end_file)
|
||||||
|
self.compress_file(self.get_save_path())
|
||||||
|
|
||||||
# set last saved date/time
|
# set last saved date/time
|
||||||
self.set_last_time_saved(time.time())
|
self.set_last_time_saved(time.time())
|
||||||
|
@ -112,6 +115,30 @@ class MetaTypesDefault:
|
||||||
def save_same_directory(self, data):
|
def save_same_directory(self, data):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def compress_file(self, file_full_path, i=0):
|
||||||
|
if i==0:
|
||||||
|
compressed_filename = '{}.gz'.format(file_full_path)
|
||||||
|
else:
|
||||||
|
compressed_filename = '{}.{}.gz'.format(file_full_path, i)
|
||||||
|
if os.path.isfile(compressed_filename):
|
||||||
|
compress_file(file_full_path, i+1)
|
||||||
|
else:
|
||||||
|
with open(file_full_path, 'rb') as f_in:
|
||||||
|
with gzip.open(compressed_filename, 'wb') as f_out:
|
||||||
|
shutil.copyfileobj(f_in, f_out)
|
||||||
|
os.remove(file_full_path)
|
||||||
|
|
||||||
|
def send_to_analyzers(self, data_to_send):
|
||||||
|
## save full path in anylyzer queue
|
||||||
|
for analyzer_uuid in redis_server_metadata.smembers('analyzer:{}:{}'.format(TYPE, self.get_type_name())):
|
||||||
|
analyzer_uuid = analyzer_uuid.decode()
|
||||||
|
redis_server_analyzer.lpush('analyzer:{}:{}'.format(TYPE, analyzer_uuid), data_to_send)
|
||||||
|
redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'last_updated', time.time())
|
||||||
|
analyser_queue_max_size = redis_server_metadata.hget('analyzer:{}'.format(analyzer_uuid), 'max_size')
|
||||||
|
if analyser_queue_max_size is None:
|
||||||
|
analyser_queue_max_size = analyzer_list_max_default_size
|
||||||
|
redis_server_analyzer.ltrim('analyzer:{}:{}'.format(TYPE, analyzer_uuid), 0, analyser_queue_max_size)
|
||||||
|
|
||||||
######## GET FUNCTIONS ########
|
######## GET FUNCTIONS ########
|
||||||
|
|
||||||
def get_type_name(self):
|
def get_type_name(self):
|
||||||
|
|
Loading…
Reference in New Issue