mirror of https://github.com/D4-project/d4-core
parent
8a792fe4ba
commit
cdc72e7998
|
@ -84,6 +84,12 @@ def get_all_queues_uuid_by_extended_type(extended_type, r_list=None):
|
||||||
return list(res)
|
return list(res)
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
def get_queues_list_by_type(queue_type):
|
||||||
|
if isinstance(queue_type ,int):
|
||||||
|
return get_all_queues_by_type(queue_type)
|
||||||
|
else:
|
||||||
|
return get_all_queues_by_extended_type(queue_type)
|
||||||
|
|
||||||
# ONLY NON GROUP
|
# ONLY NON GROUP
|
||||||
def get_all_queues_by_type(format_type, r_list=None):
|
def get_all_queues_by_type(format_type, r_list=None):
|
||||||
'''
|
'''
|
||||||
|
@ -122,8 +128,8 @@ def get_all_queues_group_by_extended_type(extended_type, r_list=None):
|
||||||
return list(res)
|
return list(res)
|
||||||
return res
|
return res
|
||||||
|
|
||||||
def get_all_queues_by_sensor_group(format_type, sensor_uuid, r_list=None):
|
def get_all_queues_by_sensor_group(queue_type, sensor_uuid, r_list=None):
|
||||||
res = r_serv_metadata.smembers('sensor:queues:{}:{}'.format(format_type, sensor_uuid))
|
res = r_serv_metadata.smembers('sensor:queues:{}:{}'.format(queue_type, sensor_uuid))
|
||||||
if r_list:
|
if r_list:
|
||||||
return list(res)
|
return list(res)
|
||||||
return res
|
return res
|
||||||
|
@ -269,10 +275,12 @@ def create_queues(format_type, queue_uuid=None, l_uuid=[], queue_type='list', me
|
||||||
# Group by UUID
|
# Group by UUID
|
||||||
if l_uuid:
|
if l_uuid:
|
||||||
# # TODO: check sensor_uuid is valid
|
# # TODO: check sensor_uuid is valid
|
||||||
|
if format_type == 254:
|
||||||
|
queue_type = metatype_name
|
||||||
for sensor_uuid in l_uuid:
|
for sensor_uuid in l_uuid:
|
||||||
sensor_uuid = sensor_uuid.replace('-', '')
|
sensor_uuid = sensor_uuid.replace('-', '')
|
||||||
r_serv_metadata.sadd('analyzer_sensor_group:{}'.format(queue_uuid), sensor_uuid)
|
r_serv_metadata.sadd('analyzer_sensor_group:{}'.format(queue_uuid), sensor_uuid)
|
||||||
r_serv_metadata.sadd('sensor:queues:{}:{}'.format(format_type, sensor_uuid), queue_uuid)
|
r_serv_metadata.sadd('sensor:queues:{}:{}'.format(queue_type, sensor_uuid), queue_uuid)
|
||||||
# ALL
|
# ALL
|
||||||
r_serv_metadata.sadd('all_analyzer_queues', queue_uuid)
|
r_serv_metadata.sadd('all_analyzer_queues', queue_uuid)
|
||||||
return queue_uuid
|
return queue_uuid
|
||||||
|
@ -280,20 +288,15 @@ def create_queues(format_type, queue_uuid=None, l_uuid=[], queue_type='list', me
|
||||||
# format_type int or str (extended type)
|
# format_type int or str (extended type)
|
||||||
def add_data_to_queue(sensor_uuid, queue_type, data):
|
def add_data_to_queue(sensor_uuid, queue_type, data):
|
||||||
if data:
|
if data:
|
||||||
# check if 254 type
|
|
||||||
if not isinstance(queue_type ,int):
|
|
||||||
format_type = 254
|
|
||||||
else:
|
|
||||||
format_type = queue_type
|
|
||||||
# by data type
|
# by data type
|
||||||
for queue_uuid in get_all_queues_by_type(format_type):
|
for queue_uuid in get_queues_list_by_type(queue_type):
|
||||||
r_serv_analyzer.lpush('analyzer:{}:{}'.format(queue_type, queue_uuid), data)
|
r_serv_analyzer.lpush('analyzer:{}:{}'.format(queue_type, queue_uuid), data)
|
||||||
r_serv_metadata.hset('analyzer:{}'.format(queue_uuid), 'last_updated', time.time())
|
r_serv_metadata.hset('analyzer:{}'.format(queue_uuid), 'last_updated', time.time())
|
||||||
analyser_queue_max_size = get_queue_max_size(queue_uuid)
|
analyser_queue_max_size = get_queue_max_size(queue_uuid)
|
||||||
r_serv_analyzer.ltrim('analyzer:{}:{}'.format(queue_type, queue_uuid), 0, analyser_queue_max_size)
|
r_serv_analyzer.ltrim('analyzer:{}:{}'.format(queue_type, queue_uuid), 0, analyser_queue_max_size)
|
||||||
|
|
||||||
# by data type
|
# by data type
|
||||||
for queue_uuid in get_all_queues_by_sensor_group(format_type, sensor_uuid):
|
for queue_uuid in get_all_queues_by_sensor_group(queue_type, sensor_uuid):
|
||||||
r_serv_analyzer.lpush('analyzer:{}:{}'.format(queue_type, queue_uuid), data)
|
r_serv_analyzer.lpush('analyzer:{}:{}'.format(queue_type, queue_uuid), data)
|
||||||
r_serv_metadata.hset('analyzer:{}'.format(queue_uuid), 'last_updated', time.time())
|
r_serv_metadata.hset('analyzer:{}'.format(queue_uuid), 'last_updated', time.time())
|
||||||
analyser_queue_max_size = get_queue_max_size(queue_uuid)
|
analyser_queue_max_size = get_queue_max_size(queue_uuid)
|
||||||
|
@ -328,8 +331,11 @@ def remove_queues(queue_uuid, format_type, metatype_name=None):
|
||||||
l_sensors_uuid = get_queue_group_all_sensors(queue_uuid)
|
l_sensors_uuid = get_queue_group_all_sensors(queue_uuid)
|
||||||
if l_sensors_uuid:
|
if l_sensors_uuid:
|
||||||
r_serv_metadata.delete('analyzer_sensor_group:{}'.format(queue_uuid))
|
r_serv_metadata.delete('analyzer_sensor_group:{}'.format(queue_uuid))
|
||||||
|
|
||||||
|
if format_type == 254:
|
||||||
|
queue_type = metatype_name
|
||||||
for sensor_uuid in l_sensors_uuid:
|
for sensor_uuid in l_sensors_uuid:
|
||||||
r_serv_metadata.srem('sensor:queues:{}:{}'.format(format_type, sensor_uuid), queue_uuid)
|
r_serv_metadata.srem('sensor:queues:{}:{}'.format(queue_type, sensor_uuid), queue_uuid)
|
||||||
|
|
||||||
if l_sensors_uuid:
|
if l_sensors_uuid:
|
||||||
analyzer_key_name = 'analyzer_uuid_group'
|
analyzer_key_name = 'analyzer_uuid_group'
|
||||||
|
|
Loading…
Reference in New Issue