From cdc72e79988ed919487cb94e57a637e6b32f8e01 Mon Sep 17 00:00:00 2001 From: Terrtia Date: Tue, 3 Mar 2020 15:34:51 +0100 Subject: [PATCH] fix: [Analyzer queue 254] fix metatype: push to queue --- server/lib/Analyzer_Queue.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/server/lib/Analyzer_Queue.py b/server/lib/Analyzer_Queue.py index e2bf52f..eb1d2a4 100755 --- a/server/lib/Analyzer_Queue.py +++ b/server/lib/Analyzer_Queue.py @@ -84,6 +84,12 @@ def get_all_queues_uuid_by_extended_type(extended_type, r_list=None): return list(res) return res +def get_queues_list_by_type(queue_type): + if isinstance(queue_type ,int): + return get_all_queues_by_type(queue_type) + else: + return get_all_queues_by_extended_type(queue_type) + # ONLY NON GROUP def get_all_queues_by_type(format_type, r_list=None): ''' @@ -122,8 +128,8 @@ def get_all_queues_group_by_extended_type(extended_type, r_list=None): return list(res) return res -def get_all_queues_by_sensor_group(format_type, sensor_uuid, r_list=None): - res = r_serv_metadata.smembers('sensor:queues:{}:{}'.format(format_type, sensor_uuid)) +def get_all_queues_by_sensor_group(queue_type, sensor_uuid, r_list=None): + res = r_serv_metadata.smembers('sensor:queues:{}:{}'.format(queue_type, sensor_uuid)) if r_list: return list(res) return res @@ -269,10 +275,12 @@ def create_queues(format_type, queue_uuid=None, l_uuid=[], queue_type='list', me # Group by UUID if l_uuid: # # TODO: check sensor_uuid is valid + if format_type == 254: + queue_type = metatype_name for sensor_uuid in l_uuid: sensor_uuid = sensor_uuid.replace('-', '') r_serv_metadata.sadd('analyzer_sensor_group:{}'.format(queue_uuid), sensor_uuid) - r_serv_metadata.sadd('sensor:queues:{}:{}'.format(format_type, sensor_uuid), queue_uuid) + r_serv_metadata.sadd('sensor:queues:{}:{}'.format(queue_type, sensor_uuid), queue_uuid) # ALL r_serv_metadata.sadd('all_analyzer_queues', queue_uuid) return queue_uuid @@ -280,20 +288,15 @@ def create_queues(format_type, queue_uuid=None, l_uuid=[], queue_type='list', me # format_type int or str (extended type) def add_data_to_queue(sensor_uuid, queue_type, data): if data: - # check if 254 type - if not isinstance(queue_type ,int): - format_type = 254 - else: - format_type = queue_type # by data type - for queue_uuid in get_all_queues_by_type(format_type): + for queue_uuid in get_queues_list_by_type(queue_type): r_serv_analyzer.lpush('analyzer:{}:{}'.format(queue_type, queue_uuid), data) r_serv_metadata.hset('analyzer:{}'.format(queue_uuid), 'last_updated', time.time()) analyser_queue_max_size = get_queue_max_size(queue_uuid) r_serv_analyzer.ltrim('analyzer:{}:{}'.format(queue_type, queue_uuid), 0, analyser_queue_max_size) # by data type - for queue_uuid in get_all_queues_by_sensor_group(format_type, sensor_uuid): + for queue_uuid in get_all_queues_by_sensor_group(queue_type, sensor_uuid): r_serv_analyzer.lpush('analyzer:{}:{}'.format(queue_type, queue_uuid), data) r_serv_metadata.hset('analyzer:{}'.format(queue_uuid), 'last_updated', time.time()) analyser_queue_max_size = get_queue_max_size(queue_uuid) @@ -328,8 +331,11 @@ def remove_queues(queue_uuid, format_type, metatype_name=None): l_sensors_uuid = get_queue_group_all_sensors(queue_uuid) if l_sensors_uuid: r_serv_metadata.delete('analyzer_sensor_group:{}'.format(queue_uuid)) + + if format_type == 254: + queue_type = metatype_name for sensor_uuid in l_sensors_uuid: - r_serv_metadata.srem('sensor:queues:{}:{}'.format(format_type, sensor_uuid), queue_uuid) + r_serv_metadata.srem('sensor:queues:{}:{}'.format(queue_type, sensor_uuid), queue_uuid) if l_sensors_uuid: analyzer_key_name = 'analyzer_uuid_group'