diff --git a/bin/core/Sync_module.py b/bin/core/Sync_module.py index c6bdfeeb..065be8f3 100755 --- a/bin/core/Sync_module.py +++ b/bin/core/Sync_module.py @@ -22,18 +22,18 @@ sys.path.append(os.environ['AIL_BIN']) # Import Project packages ################################## from core import ail_2_ail -from lib.ail_queues import get_processed_end_obj +from lib.ail_queues import get_processed_end_obj, timeout_processed_objs from lib.exceptions import ModuleQueueError from lib.objects import ail_objects from modules.abstract_module import AbstractModule -class Sync_module(AbstractModule): # TODO KEEP A QUEUE ??????????????????????????????????????????????? +class Sync_module(AbstractModule): """ Sync_module module for AIL framework """ - def __init__(self, queue=False): # FIXME MODIFY/ADD QUEUE + def __init__(self, queue=False): # FIXME MODIFY/ADD QUEUE super(Sync_module, self).__init__(queue=queue) # Waiting time in seconds between to message processed @@ -81,6 +81,10 @@ class Sync_module(AbstractModule): # TODO KEEP A QUEUE ????????????????????????? # Endless loop processing messages from the input queue while self.proceed: + + # Timeout queues + timeout_processed_objs() + # Get one message (paste) from the QueueIn (copy of Redis_Global publish) global_id = get_processed_end_obj() if global_id: diff --git a/bin/lib/ail_core.py b/bin/lib/ail_core.py index 00cefd4d..1006be73 100755 --- a/bin/lib/ail_core.py +++ b/bin/lib/ail_core.py @@ -57,6 +57,9 @@ def get_object_all_subtypes(obj_type): # TODO Dynamic subtype return r_object.smembers(f'all_chat:subtypes') return [] +def get_obj_queued(): + return ['item', 'image'] + def get_objects_tracked(): return ['decoded', 'item', 'pgp', 'title'] diff --git a/bin/lib/ail_queues.py b/bin/lib/ail_queues.py index 9ce647e0..b4218a07 100755 --- a/bin/lib/ail_queues.py +++ b/bin/lib/ail_queues.py @@ -14,10 +14,12 @@ sys.path.append(os.environ['AIL_BIN']) ################################## from lib.exceptions import ModuleQueueError from lib.ConfigLoader import ConfigLoader +from lib import ail_core config_loader = ConfigLoader() r_queues = config_loader.get_redis_conn("Redis_Queues") r_obj_process = config_loader.get_redis_conn("Redis_Process") +timeout_queue_obj = 172800 config_loader = None MODULES_FILE = os.path.join(os.environ['AIL_HOME'], 'configs', 'modules.cfg') @@ -248,6 +250,29 @@ def rename_processed_obj(new_id, old_id): r_obj_process.srem(f'objs:process', old_id) add_processed_obj(new_id, x_hash, module=module) +def timeout_process_obj(obj_global_id): + for q in get_processed_obj_queues(obj_global_id): + queue, x_hash = q.split(':', 1) + r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{queue}:{x_hash}') + for m in get_processed_obj_modules(obj_global_id): + module, x_hash = m.split(':', 1) + r_obj_process.zrem(f'obj:modules:{obj_global_id}', f'{module}:{x_hash}') + + obj_type = obj_global_id.split(':', 1)[0] + r_obj_process.zrem(f'objs:process:{obj_type}', obj_global_id) + r_obj_process.srem(f'objs:process', obj_global_id) + + r_obj_process.sadd(f'objs:processed', obj_global_id) + print(f'timeout: {obj_global_id}') + + +def timeout_processed_objs(): + curr_time = int(time.time()) + time_limit = curr_time - timeout_queue_obj + for obj_type in ail_core.get_obj_queued(): + for obj_global_id in r_obj_process.zrangebyscore(f'objs:process:{obj_type}', 0, time_limit): + timeout_process_obj(obj_global_id) + def delete_processed_obj(obj_global_id): for q in get_processed_obj_queues(obj_global_id): queue, x_hash = q.split(':', 1)