From fa442a2f70d8694c7fdb0fe38fa02beec2990ce4 Mon Sep 17 00:00:00 2001 From: Terrtia Date: Wed, 13 Mar 2019 10:04:20 +0100 Subject: [PATCH] chg: [254 default module] add functions to reconstruct data and handle buffer --- .../meta_types_modules/MetaTypesDefault.py | 54 ++++++++++++++++++- server/workers/workers_8/worker.py | 2 +- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/server/workers/workers_2/meta_types_modules/MetaTypesDefault.py b/server/workers/workers_2/meta_types_modules/MetaTypesDefault.py index 39c7334..138db6b 100755 --- a/server/workers/workers_2/meta_types_modules/MetaTypesDefault.py +++ b/server/workers/workers_2/meta_types_modules/MetaTypesDefault.py @@ -12,6 +12,7 @@ import datetime DEFAULT_FILE_EXTENSION = 'txt' DEFAULT_FILE_SEPARATOR = b'\n' ROTATION_SAVE_CYCLE = 300 # seconds +MAX_BUFFER_LENGTH = 10000 TYPE = 254 class MetaTypesDefault: @@ -20,6 +21,7 @@ class MetaTypesDefault: self.uuid = uuid self.type_name = json_file['type'] self.save_path = None + self.buffer = b'' self.parse_json(json_file) def test(self): @@ -115,6 +117,36 @@ class MetaTypesDefault: def save_same_directory(self, data): pass + def reconstruct_data(self, data): + # add buffer to data + if not is_empty_buffer(): + self.add_to_buffer(data) + data = self.get_buffer() + + # end of element found in data + if self.get_file_separator() in data: + # empty buffer + self.reset_buffer() + all_reconstructed_data = data.split(self.file_separator()) + for reconstructed_data in all_line[:-1]: + self.handle_reconstructed_data(reconstructed_data) + + # save incomplete element in buffer + if all_line[-1] != b'': + self.add_to_buffer(all_line[-1]) + # no elements + else: + # save data in buffer + self.add_to_buffer(data) + # force file_separator when max buffer size is reached + if self.get_size_buffer() > MAX_BUFFER_LENGTH: + print('Error, infinite loop, max buffer length reached') + self.add_to_buffer(self.get_file_separator()) + + def handle_reconstructed_data(self, data): + # send data to analyzer + self.send_to_analyzers(data) + def compress_file(self, file_full_path, i=0): if i==0: compressed_filename = '{}.gz'.format(file_full_path) @@ -150,6 +182,12 @@ class MetaTypesDefault: def get_uuid(self): return self.uuid + def get_buffer(self): + return self.buffer + + def get_size_buffer(self): + return len(self.buffer) + def get_filename(self, file_extention=None): if file_extention is None: file_extention = DEFAULT_FILE_EXTENSION @@ -170,6 +208,12 @@ class MetaTypesDefault: def get_save_path(self): return self.save_path + def is_empty_buffer(self): + if self.buffer==b'': + return True + else: + return False + def is_file_saved_on_disk(self): if self.save_file_on_disk: return True @@ -205,6 +249,15 @@ class MetaTypesDefault: ######## SET FUNCTIONS ######## + def reset_buffer(self): + self.buffer = b'' + + def set_buffer(self, data): + self.buffer = data + + def add_to_buffer(self, data): + self.buffer = b''.join([self.buffer, data]) + def set_rotate_file(self, boolean_value): self.file_rotation = boolean_value @@ -215,7 +268,6 @@ class MetaTypesDefault: self.last_saved_date = date def set_save_path(self, save_path): - # # TODO: create directory dir_path = os.path.dirname(save_path) if not os.path.isdir(dir_path): os.makedirs(dir_path) diff --git a/server/workers/workers_8/worker.py b/server/workers/workers_8/worker.py index 92bd9f6..6e4eac7 100755 --- a/server/workers/workers_8/worker.py +++ b/server/workers/workers_8/worker.py @@ -111,7 +111,7 @@ if __name__ == "__main__": redis_server_analyzer.ltrim('analyzer:{}:{}'.format(type, analyzer_uuid), 0, analyser_queue_max_size) # keep incomplete line if all_line[-1] != b'': - buffer += data[b'message'] + buffer += all_line[-1] else: if len(buffer) < max_buffer_length: buffer += data[b'message']