diff --git a/server/configs/server.conf.sample b/server/configs/server.conf.sample index 66c4cca..face924 100644 --- a/server/configs/server.conf.sample +++ b/server/configs/server.conf.sample @@ -6,6 +6,7 @@ save_directory = None [D4_Server] # registration or shared-secret server_mode = registration +analyzer_queues_max_size = 10000 [Flask_Server] # UI port number diff --git a/server/lib/Analyzer_Queue.py b/server/lib/Analyzer_Queue.py new file mode 100755 index 0000000..2785382 --- /dev/null +++ b/server/lib/Analyzer_Queue.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* + +import os +import sys +import datetime +import time +import uuid +import redis + +sys.path.append(os.path.join(os.environ['D4_HOME'], 'lib/')) +import ConfigLoader + +### Config ### +config_loader = ConfigLoader.ConfigLoader() +r_serv_metadata = config_loader.get_redis_conn("Redis_METADATA") +r_serv_analyzer = config_loader.get_redis_conn("Redis_ANALYZER") +LIST_DEFAULT_SIZE = config_loader.get_config_int('D4_Server', 'analyzer_queues_max_size') +config_loader = None +### ### + +def is_valid_uuid_v4(uuid_v4): + if uuid_v4: + uuid_v4 = uuid_v4.replace('-', '') + else: + return False + + try: + uuid_test = uuid.UUID(hex=uuid_v4, version=4) + return uuid_test.hex == uuid_v4 + except: + return False + +def sanitize_uuid(uuid_v4): + if not is_valid_uuid_v4(uuid_v4): + uuid_v4 = str(uuid.uuid4()) + return uuid_v4 + +def get_all_queues(r_list=None): + res = r_serv_metadata.smembers('all_analyzer_queues') + if r_list: + return list(res) + return res + +def get_all_queues_standard_format(r_list=None): + res = r_serv_metadata.smembers('all_analyzer_queues') + if r_list: + return list(res) + return res + +def get_all_queues_standard_extended_format(r_list=None): + res = r_serv_metadata.smembers('all_analyzer_queues') + if r_list: + return list(res) + return res + +def get_all_queues_by_type(format_type, r_list=None): + ''' + Get all analyzer Queues by type + + :param format_type: data type + :type domain_type: int + :param r_list: return list + :type r_list: boolean + + :return: list or set of queus (uuid) + :rtype: list or set + ''' + # 'all_analyzer_queues_by_type' + res = r_serv_metadata.smembers('analyzer:{}'.format(format_type)) + if r_list: + return list(res) + return res + +def get_all_queues_group_by_type(format_type, r_list=None): + res = r_serv_metadata.smembers('analyzer_uuid_group:{}'.format(format_type)) + if r_list: + return list(res) + return res + +def get_all_queues_by_sensor_group(format_type, sensor_uuid, r_list=None): + print('sensor:queues:{}:{}'.format(format_type, sensor_uuid)) + res = r_serv_metadata.smembers('sensor:queues:{}:{}'.format(format_type, sensor_uuid)) + if r_list: + return list(res) + return res + +def get_queue_group_all_sensors(queue_uuid, r_list=None): + res = r_serv_metadata.smembers('analyzer_sensor_group:{}'.format(queue_uuid)) + if r_list: + return list(res) + return res + +def get_queue_last_seen(queue_uuid, f_date='str_time'): + res = r_serv_metadata.hget('analyzer:{}'.format(queue_uuid), 'last_updated') + if f_date == 'str_date': + if res is None: + res = 'Never' + else: + res = datetime.datetime.fromtimestamp(float(res)).strftime('%Y-%m-%d %H:%M:%S') + return res + +def get_queue_max_size(queue_uuid): + max_size = r_serv_metadata.hget('analyzer:{}'.format(queue_uuid), 'max_size') + if max_size is None: + max_size = LIST_DEFAULT_SIZE + return max_size + +def get_queue_metadata(queue_uuid, format_type=None, f_date='str_date'): + dict_queue_meta = {} + dict_queue_meta['uuid'] = queue_uuid + dict_queue_meta['size_limit'] = get_queue_max_size(queue_uuid) + dict_queue_meta['last_updated'] = get_queue_last_seen(queue_uuid, f_date=f_date) + + dict_queue_meta['description'] = r_serv_metadata.hget('analyzer:{}'.format(queue_uuid), 'description') + if dict_queue_meta['description'] is None: + dict_queue_meta['description'] = '' + + if format_type: + dict_queue_meta['length'] = r_serv_analyzer.llen('analyzer:{}:{}'.format(format_type, queue_uuid)) + if dict_queue_meta['length'] is None: + dict_queue_meta['length'] = 0 + return dict_queue_meta + +def edit_queue_description(queue_uuid, description): + if r_serv_metadata.exists('analyzer:{}'.format(queue_uuid)) and description: + r_serv_metadata.hset('analyzer:{}'.format(queue_uuid), 'description', description) + +def edit_queue_max_size(queue_uuid, max_size): + try: + max_size = int(max_size) + except: + return 'analyzer max size, Invalid Integer' + + if r_serv_metadata.exists('analyzer:{}'.format(queue_uuid)) and max_size > 0: + r_serv_metadata.hset('analyzer:{}'.format(queue_uuid), 'max_size', max_size) + +# create queu by type or by group of uuid +# # TODO: add size limit +def create_queues(format_type, queue_uuid=None, l_uuid=[], queue_type='list', metatype_name=None, description=None): + queue_uuid = sanitize_uuid(queue_uuid) + r_serv_metadata.hset('analyzer:{}'.format(queue_uuid), 'type', format_type) + edit_queue_description(queue_uuid, description) + + # # TODO: check l_uuid is valid + if l_uuid: + analyzer_key_name = 'analyzer_uuid_group' + else: + analyzer_key_name = 'analyzer' + + if format_type == 254: + # TODO: check metatype_name + r_serv_metadata.sadd('{}:{}:{}'.format(analyzer_key_name, format_type, metatype_name), queue_uuid) + else: + r_serv_metadata.sadd('{}:{}'.format(analyzer_key_name, format_type), queue_uuid) + + # Group by UUID + if l_uuid: + # # TODO: check sensor_uuid is valid + for sensor_uuid in l_uuid: + sensor_uuid = sensor_uuid.replace('-', '') + r_serv_metadata.sadd('analyzer_sensor_group:{}'.format(queue_uuid), sensor_uuid) + r_serv_metadata.sadd('sensor:queues:{}:{}'.format(format_type, sensor_uuid), queue_uuid) + # ALL + r_serv_metadata.sadd('all_analyzer_queues', queue_uuid) + return queue_uuid + +# format_type int or str (extended type) +def add_data_to_queue(sensor_uuid, format_type, data): + if data: + # by data type + for queue_uuid in get_all_queues_by_type(format_type): + r_serv_analyzer.lpush('analyzer:{}:{}'.format(format_type, queue_uuid), data) + r_serv_metadata.hset('analyzer:{}'.format(queue_uuid), 'last_updated', time.time()) + analyser_queue_max_size = get_queue_max_size(queue_uuid) + r_serv_analyzer.ltrim('analyzer:{}:{}'.format(format_type, queue_uuid), 0, analyser_queue_max_size) + + # by data type + for queue_uuid in get_all_queues_by_sensor_group(format_type, sensor_uuid): + r_serv_analyzer.lpush('analyzer:{}:{}'.format(format_type, queue_uuid), data) + r_serv_metadata.hset('analyzer:{}'.format(queue_uuid), 'last_updated', time.time()) + analyser_queue_max_size = get_queue_max_size(queue_uuid) + r_serv_analyzer.ltrim('analyzer:{}:{}'.format(format_type, queue_uuid), 0, analyser_queue_max_size) + + +def is_queue_group_of_sensors(queue_uuid): + if r_serv_metadata.exists('analyzer_sensor_group:{}'.format(queue_uuid)): + return True + else: + return False + +def flush_queue(queue_uuid, format_type): + r_serv_analyzer.delete('analyzer:{}:{}'.format(format_type, queue_uuid)) + +def remove_queues(queue_uuid, format_type, metatype_name=None): + if not is_valid_uuid_v4(queue_uuid): + return {'error': 'Invalid uuid'} + + # delete metadata + r_serv_metadata.delete('analyzer:{}'.format(queue_uuid)) + + # delete queue group of sensors uuid + l_sensors_uuid = get_queue_group_all_sensors(queue_uuid, r_list=None) + if l_sensors_uuid: + r_serv_metadata.delete('analyzer_sensor_group:{}'.format(queue_uuid)) + for sensor_uuid in l_sensors_uuid: + r_serv_metadata.srem('sensor:queues:{}:{}'.format(format_type, sensor_uuid), queue_uuid) + + if l_sensors_uuid: + analyzer_key_name = 'analyzer_uuid_group' + else: + analyzer_key_name = 'analyzer' + + if format_type == 254: + # TODO: check metatype_name + r_serv_metadata.srem('{}:{}:{}'.format(analyzer_key_name, format_type, metatype_name), queue_uuid) + else: + r_serv_metadata.srem('{}:{}'.format(analyzer_key_name, format_type), queue_uuid) + + r_serv_metadata.srem('all_analyzer_queues', queue_uuid) + + # delete qeue + r_serv_analyzer.delete('analyzer:{}:{}'.format(format_type, queue_uuid)) + +def get_sensor_queues(sensor_uuid): + pass + +if __name__ == '__main__': + #create_queues(3, l_uuid=['03c00bcf-fe53-46a1-85bb-ee6084cb5bb2']) + remove_queues('a2e6f95c-1efe-4d2b-a0f5-d8e205d85670', 3) diff --git a/server/web/Flask_server.py b/server/web/Flask_server.py index 3946187..0273ae0 100755 --- a/server/web/Flask_server.py +++ b/server/web/Flask_server.py @@ -29,6 +29,7 @@ sys.path.append(os.path.join(os.environ['D4_HOME'], 'lib')) from User import User import Sensor import ConfigLoader +import Analyzer_Queue # Import Blueprint from blueprints.restApi import restApi @@ -532,6 +533,9 @@ def server_management(): len_queue = 0 list_analyzer_uuid.append({'uuid': analyzer_uuid, 'description': description_analyzer, 'size_limit': size_limit,'last_updated': last_updated, 'length': len_queue}) + for analyzer_uuid in Analyzer_Queue.get_all_queues_group_by_type(type): + list_analyzer_uuid.append(Analyzer_Queue.get_queue_metadata(analyzer_uuid, type)) + list_accepted_types.append({"id": int(type), "description": description, 'list_analyzer_uuid': list_analyzer_uuid}) list_accepted_extended_types = [] @@ -789,25 +793,20 @@ def uuid_change_description(): @login_required @login_user_basic def add_new_analyzer(): - type = request.args.get('type') + format_type = request.args.get('type') user = request.args.get('redirect') metatype_name = request.args.get('metatype_name') analyzer_description = request.args.get('analyzer_description') analyzer_uuid = request.args.get('analyzer_uuid') if is_valid_uuid_v4(analyzer_uuid): try: - type = int(type) - if type < 0: + format_type = int(format_type) + if format_type < 0: return 'type, Invalid Integer' except: return 'type, Invalid Integer' - if type == 254: - # # TODO: check metatype_name - redis_server_metadata.sadd('analyzer:{}:{}'.format(type, metatype_name), analyzer_uuid) - else: - redis_server_metadata.sadd('analyzer:{}'.format(type), analyzer_uuid) - if redis_server_metadata.exists('analyzer:{}:{}'.format(type, metatype_name)) or redis_server_metadata.exists('analyzer:{}'.format(type)): - redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'description', analyzer_description) + + Analyzer_Queue.create_queues(format_type, queue_uuid=analyzer_uuid, l_uuid=[], queue_type='list', metatype_name=metatype_name, description=analyzer_description) if user: return redirect(url_for('server_management')) else: @@ -818,20 +817,13 @@ def add_new_analyzer(): @login_user_basic def empty_analyzer_queue(): analyzer_uuid = request.args.get('analyzer_uuid') - type = request.args.get('type') + format_type = request.args.get('type') metatype_name = request.args.get('metatype_name') user = request.args.get('redirect') if is_valid_uuid_v4(analyzer_uuid): - try: - type = int(type) - if type < 0: - return 'type, Invalid Integer' - except: - return 'type, Invalid Integer' - if type == 254: - redis_server_analyzer.delete('analyzer:{}:{}'.format(metatype_name, analyzer_uuid)) - else: - redis_server_analyzer.delete('analyzer:{}:{}'.format(type, analyzer_uuid)) + if format_type == 254: + format_type = metatype_name + Analyzer_Queue.flush_queue(analyzer_uuid, format_type) if user: return redirect(url_for('server_management')) else: @@ -842,23 +834,11 @@ def empty_analyzer_queue(): @login_user_basic def remove_analyzer(): analyzer_uuid = request.args.get('analyzer_uuid') - type = request.args.get('type') + format_type = request.args.get('type') metatype_name = request.args.get('metatype_name') user = request.args.get('redirect') if is_valid_uuid_v4(analyzer_uuid): - try: - type = int(type) - if type < 0: - return 'type, Invalid Integer' - except: - return 'type, Invalid Integer' - if type == 254: - redis_server_metadata.srem('analyzer:{}:{}'.format(type, metatype_name), analyzer_uuid) - redis_server_analyzer.delete('analyzer:{}:{}'.format(metatype_name, analyzer_uuid)) - else: - redis_server_metadata.srem('analyzer:{}'.format(type), analyzer_uuid) - redis_server_analyzer.delete('analyzer:{}:{}'.format(type, analyzer_uuid)) - redis_server_metadata.delete('analyzer:{}'.format(analyzer_uuid)) + Analyzer_Queue.remove_queues(analyzer_uuid, format_type) if user: return redirect(url_for('server_management')) else: @@ -872,13 +852,7 @@ def analyzer_change_max_size(): user = request.args.get('redirect') max_size_analyzer = request.args.get('max_size_analyzer') if is_valid_uuid_v4(analyzer_uuid): - try: - max_size_analyzer = int(max_size_analyzer) - if max_size_analyzer < 0: - return 'analyzer max size, Invalid Integer' - except: - return 'analyzer max size, Invalid Integer' - redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'max_size', max_size_analyzer) + Analyzer_Queue.edit_queue_max_size(analyzer_uuid, max_size_analyzer) if user: return redirect(url_for('server_management')) else: diff --git a/server/workers/workers_1/file_compressor.py b/server/workers/workers_1/file_compressor.py index 86804f3..ee31a1e 100755 --- a/server/workers/workers_1/file_compressor.py +++ b/server/workers/workers_1/file_compressor.py @@ -84,13 +84,13 @@ if __name__ == "__main__": new_date = datetime.datetime.now().strftime("%Y%m%d") - # get all directory files - all_files = os.listdir(worker_data_directory) not_compressed_file = [] # filter: get all not compressed files - for file in all_files: - if file.endswith('.cap'): - not_compressed_file.append(os.path.join(worker_data_directory, file)) + if os.path.isdir(worker_data_directory): + all_files = os.listdir(worker_data_directory) + for file in all_files: + if file.endswith('.cap'): + not_compressed_file.append(os.path.join(worker_data_directory, file)) if not_compressed_file: ### check time-change (minus one hour) ### diff --git a/server/workers/workers_1/worker.py b/server/workers/workers_1/worker.py index eff0d6c..4aadd65 100755 --- a/server/workers/workers_1/worker.py +++ b/server/workers/workers_1/worker.py @@ -11,6 +11,7 @@ import subprocess sys.path.append(os.path.join(os.environ['D4_HOME'], 'lib/')) import ConfigLoader +import Analyzer_Queue def data_incorrect_format(stream_name, session_uuid, uuid): redis_server_stream.sadd('Error:IncorrectType', session_uuid) @@ -39,14 +40,7 @@ def compress_file(file_full_path, i=0): shutil.copyfileobj(f_in, f_out) os.remove(file_full_path) # save full path in anylyzer queue - for analyzer_uuid in redis_server_metadata.smembers('analyzer:{}'.format(type)): - analyzer_uuid = analyzer_uuid.decode() - redis_server_analyzer.lpush('analyzer:{}:{}'.format(type, analyzer_uuid), compressed_filename) - redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'last_updated', time.time()) - analyser_queue_max_size = redis_server_metadata.hget('analyzer:{}'.format(analyzer_uuid), 'max_size') - if analyser_queue_max_size is None: - analyser_queue_max_size = analyzer_list_max_default_size - redis_server_analyzer.ltrim('analyzer:{}:{}'.format(type, analyzer_uuid), 0, analyser_queue_max_size) + Analyzer_Queue.add_data_to_queue(uuid, type, compressed_filename) config_loader = ConfigLoader.ConfigLoader() redis_server_stream = config_loader.get_redis_conn("Redis_STREAM", decode_responses=False) diff --git a/server/workers/workers_2/meta_types_modules/MetaTypesDefault.py b/server/workers/workers_2/meta_types_modules/MetaTypesDefault.py index 7b38417..e738752 100755 --- a/server/workers/workers_2/meta_types_modules/MetaTypesDefault.py +++ b/server/workers/workers_2/meta_types_modules/MetaTypesDefault.py @@ -11,6 +11,7 @@ import datetime sys.path.append(os.path.join(os.environ['D4_HOME'], 'lib/')) import ConfigLoader +import Analyzer_Queue DEFAULT_FILE_EXTENSION = 'txt' @@ -172,15 +173,7 @@ class MetaTypesDefault: os.remove(file_full_path) def send_to_analyzers(self, data_to_send): - ## save full path in anylyzer queue - for analyzer_uuid in redis_server_metadata.smembers('analyzer:{}:{}'.format(TYPE, self.get_type_name())): - analyzer_uuid = analyzer_uuid.decode() - redis_server_analyzer.lpush('analyzer:{}:{}'.format(self.get_type_name(), analyzer_uuid), data_to_send) - redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'last_updated', time.time()) - analyser_queue_max_size = redis_server_metadata.hget('analyzer:{}'.format(analyzer_uuid), 'max_size') - if analyser_queue_max_size is None: - analyser_queue_max_size = analyzer_list_max_default_size - redis_server_analyzer.ltrim('analyzer:{}:{}'.format(self.get_type_name(), analyzer_uuid), 0, analyser_queue_max_size) + Analyzer_Queue.add_data_to_queue(self.uuid, self.get_type_name(), data_to_send) ######## GET FUNCTIONS ######## diff --git a/server/workers/workers_3/worker.py b/server/workers/workers_3/worker.py index 64ec7be..8858962 100755 --- a/server/workers/workers_3/worker.py +++ b/server/workers/workers_3/worker.py @@ -11,6 +11,7 @@ import datetime sys.path.append(os.path.join(os.environ['D4_HOME'], 'lib/')) import ConfigLoader +import Analyzer_Queue def data_incorrect_format(session_uuid): print('Incorrect format') @@ -33,8 +34,6 @@ config_loader = None type = 3 rotation_save_cycle = 300 #seconds -analyzer_list_max_default_size = 10000 - max_buffer_length = 10000 save_to_file = True @@ -112,14 +111,8 @@ if __name__ == "__main__": if b'\n' in data[b'message']: all_line = data[b'message'].split(b'\n') for line in all_line[:-1]: - for analyzer_uuid in redis_server_metadata.smembers('analyzer:{}'.format(type)): - analyzer_uuid = analyzer_uuid.decode() - redis_server_analyzer.lpush('analyzer:{}:{}'.format(type, analyzer_uuid), line) - redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'last_updated', time.time()) - analyser_queue_max_size = redis_server_metadata.hget('analyzer:{}'.format(analyzer_uuid), 'max_size') - if analyser_queue_max_size is None: - analyser_queue_max_size = analyzer_list_max_default_size - redis_server_analyzer.ltrim('analyzer:{}:{}'.format(type, analyzer_uuid), 0, analyser_queue_max_size) + Analyzer_Queue.add_data_to_queue(uuid, type, line) + # analyzer_uuid = analyzer_uuid.decode() # keep incomplete line if all_line[-1] != b'': buffer += all_line[-1] diff --git a/server/workers/workers_4/worker.py b/server/workers/workers_4/worker.py index 275bc43..f5acbf1 100755 --- a/server/workers/workers_4/worker.py +++ b/server/workers/workers_4/worker.py @@ -9,6 +9,7 @@ import datetime sys.path.append(os.path.join(os.environ['D4_HOME'], 'lib/')) import ConfigLoader +import Analyzer_Queue def data_incorrect_format(session_uuid): print('Incorrect format') diff --git a/server/workers/workers_8/worker.py b/server/workers/workers_8/worker.py index bbb90d9..5c821c8 100755 --- a/server/workers/workers_8/worker.py +++ b/server/workers/workers_8/worker.py @@ -11,6 +11,7 @@ import datetime sys.path.append(os.path.join(os.environ['D4_HOME'], 'lib/')) import ConfigLoader +import Analyzer_Queue def data_incorrect_format(session_uuid): print('Incorrect format') @@ -112,14 +113,7 @@ if __name__ == "__main__": if b'\n' in data[b'message']: all_line = data[b'message'].split(b'\n') for line in all_line[:-1]: - for analyzer_uuid in redis_server_metadata.smembers('analyzer:{}'.format(type)): - analyzer_uuid = analyzer_uuid.decode() - redis_server_analyzer.lpush('analyzer:{}:{}'.format(type, analyzer_uuid), line) - redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'last_updated', time.time()) - analyser_queue_max_size = redis_server_metadata.hget('analyzer:{}'.format(analyzer_uuid), 'max_size') - if analyser_queue_max_size is None: - analyser_queue_max_size = analyzer_list_max_default_size - redis_server_analyzer.ltrim('analyzer:{}:{}'.format(type, analyzer_uuid), 0, analyser_queue_max_size) + Analyzer_Queue.add_data_to_queue(uuid, type, line) # keep incomplete line if all_line[-1] != b'': buffer += all_line[-1]