mirror of https://github.com/CIRCL/AIL-framework
fix: [queue] save last timout in cache
parent
bd2ca4b319
commit
f851cc9f42
|
@ -22,7 +22,7 @@ sys.path.append(os.environ['AIL_BIN'])
|
||||||
# Import Project packages
|
# Import Project packages
|
||||||
##################################
|
##################################
|
||||||
from core import ail_2_ail
|
from core import ail_2_ail
|
||||||
from lib.ail_queues import get_processed_end_obj, timeout_processed_objs
|
from lib.ail_queues import get_processed_end_obj, timeout_processed_objs, get_last_queue_timeout
|
||||||
from lib.exceptions import ModuleQueueError
|
from lib.exceptions import ModuleQueueError
|
||||||
from lib.objects import ail_objects
|
from lib.objects import ail_objects
|
||||||
from modules.abstract_module import AbstractModule
|
from modules.abstract_module import AbstractModule
|
||||||
|
@ -41,6 +41,7 @@ class Sync_module(AbstractModule):
|
||||||
|
|
||||||
self.dict_sync_queues = ail_2_ail.get_all_sync_queue_dict()
|
self.dict_sync_queues = ail_2_ail.get_all_sync_queue_dict()
|
||||||
self.last_refresh = time.time()
|
self.last_refresh = time.time()
|
||||||
|
self.last_refresh_queues = time.time()
|
||||||
|
|
||||||
print(self.dict_sync_queues)
|
print(self.dict_sync_queues)
|
||||||
|
|
||||||
|
@ -83,7 +84,12 @@ class Sync_module(AbstractModule):
|
||||||
while self.proceed:
|
while self.proceed:
|
||||||
|
|
||||||
# Timeout queues
|
# Timeout queues
|
||||||
|
# timeout_processed_objs()
|
||||||
|
if self.last_refresh_queues < time.time():
|
||||||
timeout_processed_objs()
|
timeout_processed_objs()
|
||||||
|
self.last_refresh_queues = time.time() + 120
|
||||||
|
self.redis_logger.debug('Timeout queues')
|
||||||
|
# print('Timeout queues')
|
||||||
|
|
||||||
# Get one message (paste) from the QueueIn (copy of Redis_Global publish)
|
# Get one message (paste) from the QueueIn (copy of Redis_Global publish)
|
||||||
global_id = get_processed_end_obj()
|
global_id = get_processed_end_obj()
|
||||||
|
|
|
@ -250,6 +250,12 @@ def rename_processed_obj(new_id, old_id):
|
||||||
r_obj_process.srem(f'objs:process', old_id)
|
r_obj_process.srem(f'objs:process', old_id)
|
||||||
add_processed_obj(new_id, x_hash, module=module)
|
add_processed_obj(new_id, x_hash, module=module)
|
||||||
|
|
||||||
|
def get_last_queue_timeout():
|
||||||
|
epoch_update = r_obj_process.get('queue:obj:timeout:last')
|
||||||
|
if not epoch_update:
|
||||||
|
epoch_update = 0
|
||||||
|
return float(epoch_update)
|
||||||
|
|
||||||
def timeout_process_obj(obj_global_id):
|
def timeout_process_obj(obj_global_id):
|
||||||
for q in get_processed_obj_queues(obj_global_id):
|
for q in get_processed_obj_queues(obj_global_id):
|
||||||
queue, x_hash = q.split(':', 1)
|
queue, x_hash = q.split(':', 1)
|
||||||
|
@ -272,6 +278,7 @@ def timeout_processed_objs():
|
||||||
for obj_type in ail_core.get_obj_queued():
|
for obj_type in ail_core.get_obj_queued():
|
||||||
for obj_global_id in r_obj_process.zrangebyscore(f'objs:process:{obj_type}', 0, time_limit):
|
for obj_global_id in r_obj_process.zrangebyscore(f'objs:process:{obj_type}', 0, time_limit):
|
||||||
timeout_process_obj(obj_global_id)
|
timeout_process_obj(obj_global_id)
|
||||||
|
r_obj_process.set('queue:obj:timeout:last', time.time())
|
||||||
|
|
||||||
def delete_processed_obj(obj_global_id):
|
def delete_processed_obj(obj_global_id):
|
||||||
for q in get_processed_obj_queues(obj_global_id):
|
for q in get_processed_obj_queues(obj_global_id):
|
||||||
|
|
Loading…
Reference in New Issue