mirror of https://github.com/CIRCL/AIL-framework
411 lines
15 KiB
Python
Executable File
411 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*-coding:UTF-8 -*
|
|
|
|
import os
|
|
import sys
|
|
import datetime
|
|
import time
|
|
|
|
import xxhash
|
|
|
|
sys.path.append(os.environ['AIL_BIN'])
|
|
##################################
|
|
# Import Project packages
|
|
##################################
|
|
from lib.exceptions import ModuleQueueError
|
|
from lib.ConfigLoader import ConfigLoader
|
|
from lib import ail_core
|
|
|
|
config_loader = ConfigLoader()
|
|
r_queues = config_loader.get_redis_conn("Redis_Queues")
|
|
r_obj_process = config_loader.get_redis_conn("Redis_Process")
|
|
timeout_queue_obj = 172800
|
|
config_loader = None
|
|
|
|
MODULES_FILE = os.path.join(os.environ['AIL_HOME'], 'configs', 'modules.cfg')
|
|
|
|
# # # # # # # #
|
|
# #
|
|
# AIL QUEUE #
|
|
# #
|
|
# # # # # # # #
|
|
|
|
class AILQueue:
|
|
|
|
def __init__(self, module_name, module_pid):
|
|
self.name = module_name
|
|
self.pid = module_pid
|
|
self._set_subscriber()
|
|
# Update queue stat
|
|
r_queues.hset('queues', self.name, self.get_nb_messages())
|
|
r_queues.hset(f'modules', f'{self.pid}:{self.name}', -1)
|
|
|
|
def _set_subscriber(self):
|
|
subscribers = {}
|
|
module_config_loader = ConfigLoader(config_file=MODULES_FILE) # TODO CHECK IF FILE EXISTS
|
|
if not module_config_loader.has_section(self.name):
|
|
raise ModuleQueueError(f'No Section defined for this module: {self.name}. Please add one in configs/module.cfg')
|
|
|
|
if module_config_loader.has_option(self.name, 'publish'):
|
|
subscribers_queues = module_config_loader.get_config_str(self.name, 'publish')
|
|
if subscribers_queues:
|
|
subscribers_queues = set(subscribers_queues.split(','))
|
|
for queue_name in subscribers_queues:
|
|
subscribers[queue_name] = set()
|
|
if subscribers_queues:
|
|
for module in module_config_loader.get_config_sections():
|
|
if module_config_loader.has_option(module, 'subscribe'):
|
|
queue_name = module_config_loader.get_config_str(module, 'subscribe')
|
|
if queue_name in subscribers:
|
|
subscribers[queue_name].add(module)
|
|
self.subscribers_modules = subscribers
|
|
|
|
def get_out_queues(self):
|
|
return list(self.subscribers_modules.keys())
|
|
|
|
def get_nb_messages(self):
|
|
return r_queues.llen(f'queue:{self.name}:in')
|
|
|
|
def get_message(self):
|
|
# Update queues stats
|
|
r_queues.hset('queues', self.name, self.get_nb_messages())
|
|
r_queues.hset(f'modules', f'{self.pid}:{self.name}', int(time.time()))
|
|
|
|
# Get Message
|
|
message = r_queues.lpop(f'queue:{self.name}:in')
|
|
if not message:
|
|
return None
|
|
else:
|
|
row_mess = message.split(';', 1)
|
|
if len(row_mess) != 2:
|
|
return None, None, message
|
|
# raise Exception(f'Error: queue {self.name}, no AIL object provided')
|
|
else:
|
|
obj_global_id, mess = row_mess
|
|
m_hash = xxhash.xxh3_64_hexdigest(message)
|
|
add_processed_obj(obj_global_id, m_hash, module=self.name)
|
|
return obj_global_id, m_hash, mess
|
|
|
|
def rename_message_obj(self, new_id, old_id):
|
|
# restrict rename function
|
|
if self.name == 'Mixer' or self.name == 'Global':
|
|
rename_processed_obj(new_id, old_id)
|
|
else:
|
|
raise ModuleQueueError('This Module can\'t rename an object ID')
|
|
|
|
# condition -> not in any queue
|
|
# TODO EDIT meta
|
|
|
|
|
|
|
|
def end_message(self, obj_global_id, m_hash):
|
|
end_processed_obj(obj_global_id, m_hash, module=self.name)
|
|
|
|
def send_message(self, obj_global_id, message='', queue_name=None):
|
|
if not self.subscribers_modules:
|
|
raise ModuleQueueError('This Module don\'t have any subscriber')
|
|
if queue_name:
|
|
if queue_name not in self.subscribers_modules:
|
|
raise ModuleQueueError(f'send_message: Unknown queue_name {queue_name}')
|
|
else:
|
|
if len(self.subscribers_modules) > 1:
|
|
raise ModuleQueueError('Queue name required. This module push to multiple queues')
|
|
queue_name = list(self.subscribers_modules)[0]
|
|
|
|
message = f'{obj_global_id};{message}'
|
|
if obj_global_id != '::':
|
|
m_hash = xxhash.xxh3_64_hexdigest(message)
|
|
else:
|
|
m_hash = None
|
|
|
|
# Add message to all modules
|
|
for module_name in self.subscribers_modules[queue_name]:
|
|
if m_hash:
|
|
add_processed_obj(obj_global_id, m_hash, queue=module_name)
|
|
|
|
r_queues.rpush(f'queue:{module_name}:in', message)
|
|
# stats
|
|
nb_mess = r_queues.llen(f'queue:{module_name}:in')
|
|
r_queues.hset('queues', module_name, nb_mess)
|
|
|
|
# TODO
|
|
def refresh(self):
|
|
# TODO check cache
|
|
self._set_subscriber()
|
|
|
|
def clear(self):
|
|
r_queues.delete(f'queue:{self.name}:in')
|
|
|
|
def error(self):
|
|
r_queues.hdel(f'modules', f'{self.pid}:{self.name}')
|
|
|
|
def end(self):
|
|
self.clear()
|
|
r_queues.hdel(f'modules', f'{self.pid}:{self.name}')
|
|
|
|
|
|
def get_queues_modules():
|
|
return r_queues.hkeys('queues')
|
|
|
|
def get_nb_queues_modules():
|
|
return r_queues.hgetall('queues')
|
|
|
|
def get_nb_sorted_queues_modules():
|
|
res = r_queues.hgetall('queues')
|
|
res = sorted(res.items())
|
|
return res
|
|
|
|
def get_modules_pid_last_mess():
|
|
return r_queues.hgetall('modules')
|
|
|
|
def get_modules_queues_stats():
|
|
modules_queues_stats = []
|
|
nb_queues_modules = get_nb_queues_modules()
|
|
modules_pid_last_mess = get_modules_pid_last_mess()
|
|
added_modules = set()
|
|
for row_module in modules_pid_last_mess:
|
|
pid, module = row_module.split(':', 1)
|
|
last_time = modules_pid_last_mess[row_module]
|
|
last_time = datetime.datetime.fromtimestamp(int(last_time))
|
|
seconds = int((datetime.datetime.now() - last_time).total_seconds())
|
|
modules_queues_stats.append((module, nb_queues_modules[module], seconds, pid))
|
|
added_modules.add(module)
|
|
for module in nb_queues_modules:
|
|
if module not in added_modules:
|
|
modules_queues_stats.append((module, nb_queues_modules[module], -1, 'Not Launched'))
|
|
return sorted(modules_queues_stats)
|
|
|
|
def clear_modules_queues_stats():
|
|
r_queues.delete('modules')
|
|
|
|
|
|
# # # # # # # # #
|
|
# #
|
|
# OBJ QUEUES # PROCESS ??
|
|
# #
|
|
# # # # # # # # #
|
|
|
|
|
|
def get_processed_objs():
|
|
return r_obj_process.smembers(f'objs:process')
|
|
|
|
def get_processed_end_objs():
|
|
return r_obj_process.smembers(f'objs:processed')
|
|
|
|
def get_processed_end_obj():
|
|
return r_obj_process.spop(f'objs:processed')
|
|
|
|
def is_obj_in_process(obj_gid):
|
|
return r_obj_process.sismember(f'objs:process', obj_gid)
|
|
|
|
def get_processed_objs_by_type(obj_type):
|
|
return r_obj_process.zrange(f'objs:process:{obj_type}', 0, -1)
|
|
|
|
def is_processed_obj_queued(obj_global_id):
|
|
return r_obj_process.exists(f'obj:queues:{obj_global_id}')
|
|
|
|
def is_processed_obj_moduled(obj_global_id):
|
|
return r_obj_process.exists(f'obj:modules:{obj_global_id}')
|
|
|
|
def is_processed_obj(obj_global_id):
|
|
return is_processed_obj_queued(obj_global_id) or is_processed_obj_moduled(obj_global_id)
|
|
|
|
def get_processed_obj_modules(obj_global_id):
|
|
return r_obj_process.zrange(f'obj:modules:{obj_global_id}', 0, -1)
|
|
|
|
def get_processed_obj_queues(obj_global_id):
|
|
return r_obj_process.zrange(f'obj:queues:{obj_global_id}', 0, -1)
|
|
|
|
def get_processed_obj(obj_global_id):
|
|
return {'modules': get_processed_obj_modules(obj_global_id), 'queues': get_processed_obj_queues(obj_global_id)}
|
|
|
|
def add_processed_obj(obj_global_id, m_hash, module=None, queue=None):
|
|
obj_type = obj_global_id.split(':', 1)[0]
|
|
new_obj = r_obj_process.sadd(f'objs:process', obj_global_id)
|
|
# first process:
|
|
if new_obj:
|
|
r_obj_process.zadd(f'objs:process:{obj_type}', {obj_global_id: int(time.time())})
|
|
if queue:
|
|
r_obj_process.zadd(f'obj:queues:{obj_global_id}', {f'{queue}:{m_hash}': int(time.time())})
|
|
if module:
|
|
r_obj_process.zadd(f'obj:modules:{obj_global_id}', {f'{module}:{m_hash}': int(time.time())})
|
|
r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{module}:{m_hash}')
|
|
|
|
def end_processed_obj(obj_global_id, m_hash, module=None, queue=None):
|
|
if queue:
|
|
r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{queue}:{m_hash}')
|
|
if module:
|
|
r_obj_process.zrem(f'obj:modules:{obj_global_id}', f'{module}:{m_hash}')
|
|
|
|
# TODO HANDLE QUEUE DELETE
|
|
# process completed
|
|
if not is_processed_obj(obj_global_id):
|
|
obj_type = obj_global_id.split(':', 1)[0]
|
|
r_obj_process.zrem(f'objs:process:{obj_type}', obj_global_id)
|
|
r_obj_process.srem(f'objs:process', obj_global_id)
|
|
|
|
r_obj_process.sadd(f'objs:processed', obj_global_id) # TODO use list ??????
|
|
|
|
def rename_processed_obj(new_id, old_id):
|
|
module = get_processed_obj_modules(old_id)
|
|
# currently in a module
|
|
if len(module) == 1:
|
|
module, x_hash = module[0].split(':', 1)
|
|
obj_type = old_id.split(':', 1)[0]
|
|
r_obj_process.zrem(f'obj:modules:{old_id}', f'{module}:{x_hash}')
|
|
r_obj_process.zrem(f'objs:process:{obj_type}', old_id)
|
|
r_obj_process.srem(f'objs:process', old_id)
|
|
add_processed_obj(new_id, x_hash, module=module)
|
|
|
|
def get_last_queue_timeout():
|
|
epoch_update = r_obj_process.get('queue:obj:timeout:last')
|
|
if not epoch_update:
|
|
epoch_update = 0
|
|
return float(epoch_update)
|
|
|
|
def timeout_process_obj(obj_global_id):
|
|
for q in get_processed_obj_queues(obj_global_id):
|
|
queue, x_hash = q.split(':', 1)
|
|
r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{queue}:{x_hash}')
|
|
for m in get_processed_obj_modules(obj_global_id):
|
|
module, x_hash = m.split(':', 1)
|
|
r_obj_process.zrem(f'obj:modules:{obj_global_id}', f'{module}:{x_hash}')
|
|
|
|
obj_type = obj_global_id.split(':', 1)[0]
|
|
r_obj_process.zrem(f'objs:process:{obj_type}', obj_global_id)
|
|
r_obj_process.srem(f'objs:process', obj_global_id)
|
|
|
|
r_obj_process.sadd(f'objs:processed', obj_global_id)
|
|
print(f'timeout: {obj_global_id}')
|
|
|
|
|
|
def timeout_processed_objs():
|
|
curr_time = int(time.time())
|
|
time_limit = curr_time - timeout_queue_obj
|
|
for obj_type in ail_core.get_obj_queued():
|
|
for obj_global_id in r_obj_process.zrangebyscore(f'objs:process:{obj_type}', 0, time_limit):
|
|
timeout_process_obj(obj_global_id)
|
|
r_obj_process.set('queue:obj:timeout:last', time.time())
|
|
|
|
def delete_processed_obj(obj_global_id):
|
|
for q in get_processed_obj_queues(obj_global_id):
|
|
queue, x_hash = q.split(':', 1)
|
|
r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{queue}:{x_hash}')
|
|
for m in get_processed_obj_modules(obj_global_id):
|
|
module, x_hash = m.split(':', 1)
|
|
r_obj_process.zrem(f'obj:modules:{obj_global_id}', f'{module}:{x_hash}')
|
|
obj_type = obj_global_id.split(':', 1)[0]
|
|
r_obj_process.zrem(f'objs:process:{obj_type}', obj_global_id)
|
|
r_obj_process.srem(f'objs:process', obj_global_id)
|
|
|
|
###################################################################################
|
|
|
|
|
|
# # # # # # # #
|
|
# #
|
|
# GRAPH #
|
|
# #
|
|
# # # # # # # #
|
|
|
|
def get_queue_digraph():
|
|
queues_ail = {}
|
|
modules = {}
|
|
module_config_loader = ConfigLoader(config_file=MODULES_FILE)
|
|
for module in module_config_loader.get_config_sections():
|
|
if module_config_loader.has_option(module, 'subscribe'):
|
|
if module not in modules:
|
|
modules[module] = {'in': set(), 'out': set()}
|
|
queue = module_config_loader.get_config_str(module, 'subscribe')
|
|
modules[module]['in'].add(queue)
|
|
if queue not in queues_ail:
|
|
queues_ail[queue] = []
|
|
queues_ail[queue].append(module)
|
|
|
|
if module_config_loader.has_option(module, 'publish'):
|
|
if module not in modules:
|
|
modules[module] = {'in': set(), 'out': set()}
|
|
queues = module_config_loader.get_config_str(module, 'publish')
|
|
for queue in queues.split(','):
|
|
modules[module]['out'].add(queue)
|
|
|
|
# print(modules)
|
|
# print(queues_ail)
|
|
|
|
mapped = set()
|
|
import_modules = set()
|
|
edges = '# Define edges between nodes\n'
|
|
for module in modules:
|
|
for queue_name in modules[module]['out']:
|
|
if queue_name == 'Importers':
|
|
import_modules.add(module)
|
|
if queue_name in queues_ail:
|
|
for module2 in queues_ail[queue_name]:
|
|
to_break = False
|
|
new_edge = None
|
|
cluster_out = f'cluster_{queue_name.lower()}'
|
|
queue_in = modules[module]['in']
|
|
if queue_in:
|
|
queue_in = next(iter(queue_in))
|
|
if len(queues_ail.get(queue_in, [])) == 1:
|
|
cluster_in = f'cluster_{queue_in.lower()}'
|
|
new_edge = f'{module} -> {module2} [ltail="{cluster_in}" lhead="{cluster_out}"];\n'
|
|
to_break = True
|
|
if not new_edge:
|
|
new_edge = f'{module} -> {module2} [lhead="{cluster_out}"];\n'
|
|
to_map = f'{module}:{cluster_out}'
|
|
if to_map not in mapped:
|
|
mapped.add(to_map)
|
|
edges = f'{edges}{new_edge}'
|
|
if to_break:
|
|
break
|
|
|
|
subgraph = '# Define subgraphs for each queue\n'
|
|
for queue_name in queues_ail:
|
|
cluster_name = f'cluster_{queue_name.lower()}'
|
|
subgraph = f'{subgraph} subgraph {cluster_name} {{\n label="Queue {queue_name}";\n color=blue;\n'
|
|
for module in queues_ail[queue_name]:
|
|
subgraph = f'{subgraph} {module};\n'
|
|
subgraph = f'{subgraph}}}\n\n'
|
|
|
|
cluster_name = f'cluster_importers'
|
|
subgraph = f'{subgraph} subgraph {cluster_name} {{\n label="AIL Importers";\n color=red;\n'
|
|
for module in import_modules:
|
|
subgraph = f'{subgraph} {module};\n'
|
|
subgraph = f'{subgraph}}}\n\n'
|
|
|
|
digraph = 'digraph Modules {\ngraph [rankdir=LR splines=ortho];\nnode [shape=rectangle]\ncompound=true;\n'
|
|
digraph = f'{digraph}edge[arrowhead=open color=salmon]\n\n'
|
|
digraph = f'{digraph}{subgraph}{edges}\n}}\n'
|
|
return digraph
|
|
|
|
def save_queue_digraph():
|
|
import subprocess
|
|
digraph = get_queue_digraph()
|
|
dot_file = os.path.join(os.environ['AIL_HOME'], 'doc/ail_queues.dot')
|
|
svg_file = os.path.join(os.environ['AIL_HOME'], 'doc/ail_queues.svg')
|
|
svg_static = os.path.join(os.environ['AIL_HOME'], 'var/www/static/image/ail_queues.svg')
|
|
with open(dot_file, 'w') as f:
|
|
f.write(digraph)
|
|
|
|
print('dot', '-Tsvg', dot_file, '-o', svg_file)
|
|
process = subprocess.run(['dot', '-Tsvg', dot_file, '-o', svg_file, '-o', svg_static], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
if process.returncode == 0:
|
|
# modified_files = process.stdout
|
|
# print(process.stdout)
|
|
return True
|
|
else:
|
|
print(process.stderr.decode())
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# clear_modules_queues_stats()
|
|
# save_queue_digraph()
|
|
oobj_global_id = 'item::submitted/2023/10/11/submitted_b5440009-05d5-4494-a807-a6d8e4a900cf.gz'
|
|
# print(get_processed_obj(oobj_global_id))
|
|
# delete_processed_obj(oobj_global_id)
|
|
# while True:
|
|
# print(get_processed_obj(oobj_global_id))
|
|
# time.sleep(0.5)
|
|
print(get_processed_end_objs())
|