mirror of https://github.com/D4-project/d4-core
chg: [254 default module] add functions to reconstruct data and handle buffer
parent
6c90b140f7
commit
fa442a2f70
server/workers
workers_2/meta_types_modules
workers_8
|
@ -12,6 +12,7 @@ import datetime
|
|||
DEFAULT_FILE_EXTENSION = 'txt'
|
||||
DEFAULT_FILE_SEPARATOR = b'\n'
|
||||
ROTATION_SAVE_CYCLE = 300 # seconds
|
||||
MAX_BUFFER_LENGTH = 10000
|
||||
TYPE = 254
|
||||
|
||||
class MetaTypesDefault:
|
||||
|
@ -20,6 +21,7 @@ class MetaTypesDefault:
|
|||
self.uuid = uuid
|
||||
self.type_name = json_file['type']
|
||||
self.save_path = None
|
||||
self.buffer = b''
|
||||
self.parse_json(json_file)
|
||||
|
||||
def test(self):
|
||||
|
@ -115,6 +117,36 @@ class MetaTypesDefault:
|
|||
def save_same_directory(self, data):
|
||||
pass
|
||||
|
||||
def reconstruct_data(self, data):
|
||||
# add buffer to data
|
||||
if not is_empty_buffer():
|
||||
self.add_to_buffer(data)
|
||||
data = self.get_buffer()
|
||||
|
||||
# end of element found in data
|
||||
if self.get_file_separator() in data:
|
||||
# empty buffer
|
||||
self.reset_buffer()
|
||||
all_reconstructed_data = data.split(self.file_separator())
|
||||
for reconstructed_data in all_line[:-1]:
|
||||
self.handle_reconstructed_data(reconstructed_data)
|
||||
|
||||
# save incomplete element in buffer
|
||||
if all_line[-1] != b'':
|
||||
self.add_to_buffer(all_line[-1])
|
||||
# no elements
|
||||
else:
|
||||
# save data in buffer
|
||||
self.add_to_buffer(data)
|
||||
# force file_separator when max buffer size is reached
|
||||
if self.get_size_buffer() > MAX_BUFFER_LENGTH:
|
||||
print('Error, infinite loop, max buffer length reached')
|
||||
self.add_to_buffer(self.get_file_separator())
|
||||
|
||||
def handle_reconstructed_data(self, data):
|
||||
# send data to analyzer
|
||||
self.send_to_analyzers(data)
|
||||
|
||||
def compress_file(self, file_full_path, i=0):
|
||||
if i==0:
|
||||
compressed_filename = '{}.gz'.format(file_full_path)
|
||||
|
@ -150,6 +182,12 @@ class MetaTypesDefault:
|
|||
def get_uuid(self):
|
||||
return self.uuid
|
||||
|
||||
def get_buffer(self):
|
||||
return self.buffer
|
||||
|
||||
def get_size_buffer(self):
|
||||
return len(self.buffer)
|
||||
|
||||
def get_filename(self, file_extention=None):
|
||||
if file_extention is None:
|
||||
file_extention = DEFAULT_FILE_EXTENSION
|
||||
|
@ -170,6 +208,12 @@ class MetaTypesDefault:
|
|||
def get_save_path(self):
|
||||
return self.save_path
|
||||
|
||||
def is_empty_buffer(self):
|
||||
if self.buffer==b'':
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def is_file_saved_on_disk(self):
|
||||
if self.save_file_on_disk:
|
||||
return True
|
||||
|
@ -205,6 +249,15 @@ class MetaTypesDefault:
|
|||
|
||||
######## SET FUNCTIONS ########
|
||||
|
||||
def reset_buffer(self):
|
||||
self.buffer = b''
|
||||
|
||||
def set_buffer(self, data):
|
||||
self.buffer = data
|
||||
|
||||
def add_to_buffer(self, data):
|
||||
self.buffer = b''.join([self.buffer, data])
|
||||
|
||||
def set_rotate_file(self, boolean_value):
|
||||
self.file_rotation = boolean_value
|
||||
|
||||
|
@ -215,7 +268,6 @@ class MetaTypesDefault:
|
|||
self.last_saved_date = date
|
||||
|
||||
def set_save_path(self, save_path):
|
||||
# # TODO: create directory
|
||||
dir_path = os.path.dirname(save_path)
|
||||
if not os.path.isdir(dir_path):
|
||||
os.makedirs(dir_path)
|
||||
|
|
|
@ -111,7 +111,7 @@ if __name__ == "__main__":
|
|||
redis_server_analyzer.ltrim('analyzer:{}:{}'.format(type, analyzer_uuid), 0, analyser_queue_max_size)
|
||||
# keep incomplete line
|
||||
if all_line[-1] != b'':
|
||||
buffer += data[b'message']
|
||||
buffer += all_line[-1]
|
||||
else:
|
||||
if len(buffer) < max_buffer_length:
|
||||
buffer += data[b'message']
|
||||
|
|
Loading…
Reference in New Issue