mirror of https://github.com/D4-project/d4-core
fix: [Analyzer queue 254] fix list by type
parent
ab261a6bd2
commit
8a792fe4ba
|
@ -123,7 +123,6 @@ def get_all_queues_group_by_extended_type(extended_type, r_list=None):
|
|||
return res
|
||||
|
||||
def get_all_queues_by_sensor_group(format_type, sensor_uuid, r_list=None):
|
||||
print('sensor:queues:{}:{}'.format(format_type, sensor_uuid))
|
||||
res = r_serv_metadata.smembers('sensor:queues:{}:{}'.format(format_type, sensor_uuid))
|
||||
if r_list:
|
||||
return list(res)
|
||||
|
@ -279,21 +278,26 @@ def create_queues(format_type, queue_uuid=None, l_uuid=[], queue_type='list', me
|
|||
return queue_uuid
|
||||
|
||||
# format_type int or str (extended type)
|
||||
def add_data_to_queue(sensor_uuid, format_type, data):
|
||||
def add_data_to_queue(sensor_uuid, queue_type, data):
|
||||
if data:
|
||||
# check if 254 type
|
||||
if not isinstance(queue_type ,int):
|
||||
format_type = 254
|
||||
else:
|
||||
format_type = queue_type
|
||||
# by data type
|
||||
for queue_uuid in get_all_queues_by_type(format_type):
|
||||
r_serv_analyzer.lpush('analyzer:{}:{}'.format(format_type, queue_uuid), data)
|
||||
r_serv_analyzer.lpush('analyzer:{}:{}'.format(queue_type, queue_uuid), data)
|
||||
r_serv_metadata.hset('analyzer:{}'.format(queue_uuid), 'last_updated', time.time())
|
||||
analyser_queue_max_size = get_queue_max_size(queue_uuid)
|
||||
r_serv_analyzer.ltrim('analyzer:{}:{}'.format(format_type, queue_uuid), 0, analyser_queue_max_size)
|
||||
r_serv_analyzer.ltrim('analyzer:{}:{}'.format(queue_type, queue_uuid), 0, analyser_queue_max_size)
|
||||
|
||||
# by data type
|
||||
for queue_uuid in get_all_queues_by_sensor_group(format_type, sensor_uuid):
|
||||
r_serv_analyzer.lpush('analyzer:{}:{}'.format(format_type, queue_uuid), data)
|
||||
r_serv_analyzer.lpush('analyzer:{}:{}'.format(queue_type, queue_uuid), data)
|
||||
r_serv_metadata.hset('analyzer:{}'.format(queue_uuid), 'last_updated', time.time())
|
||||
analyser_queue_max_size = get_queue_max_size(queue_uuid)
|
||||
r_serv_analyzer.ltrim('analyzer:{}:{}'.format(format_type, queue_uuid), 0, analyser_queue_max_size)
|
||||
r_serv_analyzer.ltrim('analyzer:{}:{}'.format(queue_type, queue_uuid), 0, analyser_queue_max_size)
|
||||
|
||||
|
||||
def flush_queue(queue_uuid, format_type):
|
||||
|
|
Loading…
Reference in New Issue