From bb3dad2873e3f68079a65fbdbad5ccc28597d612 Mon Sep 17 00:00:00 2001 From: terrtia Date: Thu, 7 Sep 2023 10:38:03 +0200 Subject: [PATCH] chg: [objs processed] xxhash messages --- bin/lib/ail_queues.py | 36 ++++++++++++++++++------------------ configs/core.cfg.sample | 5 +++++ requirements.txt | 1 + 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/bin/lib/ail_queues.py b/bin/lib/ail_queues.py index 3ab68708..38c5a42d 100755 --- a/bin/lib/ail_queues.py +++ b/bin/lib/ail_queues.py @@ -6,7 +6,7 @@ import sys import datetime import time -from hashlib import sha256 +import xxhash sys.path.append(os.environ['AIL_BIN']) ################################## @@ -80,12 +80,12 @@ class AILQueue: # raise Exception(f'Error: queue {self.name}, no AIL object provided') else: obj_global_id, mess = row_mess - sha256_mess = sha256(message.encode()).hexdigest() - add_processed_obj(obj_global_id, sha256_mess, module=self.name) - return obj_global_id, sha256_mess, mess + m_hash = xxhash.xxh3_64_hexdigest(message) + add_processed_obj(obj_global_id, m_hash, module=self.name) + return obj_global_id, m_hash, mess - def end_message(self, obj_global_id, sha256_mess): - end_processed_obj(obj_global_id, sha256_mess, module=self.name) + def end_message(self, obj_global_id, m_hash): + end_processed_obj(obj_global_id, m_hash, module=self.name) def send_message(self, obj_global_id, message='', queue_name=None): if not self.subscribers_modules: @@ -100,14 +100,14 @@ class AILQueue: message = f'{obj_global_id};{message}' if obj_global_id != '::': - sha256_mess = sha256(message.encode()).hexdigest() + m_hash = xxhash.xxh3_64_hexdigest(message) else: - sha256_mess = None + m_hash = None # Add message to all modules for module_name in self.subscribers_modules[queue_name]: - if sha256_mess: - add_processed_obj(obj_global_id, sha256_mess, queue=module_name) + if m_hash: + add_processed_obj(obj_global_id, m_hash, queue=module_name) r_queues.rpush(f'queue:{module_name}:in', message) # stats @@ -192,23 +192,23 @@ def get_processed_obj_queues(obj_global_id): def get_processed_obj(obj_global_id): return {'modules': get_processed_obj_modules(obj_global_id), 'queues': get_processed_obj_queues(obj_global_id)} -def add_processed_obj(obj_global_id, sha256_mess, module=None, queue=None): +def add_processed_obj(obj_global_id, m_hash, module=None, queue=None): obj_type = obj_global_id.split(':', 1)[0] new_obj = r_obj_process.sadd(f'objs:process', obj_global_id) # first process: if new_obj: r_obj_process.zadd(f'objs:process:{obj_type}', {obj_global_id: int(time.time())}) if queue: - r_obj_process.zadd(f'obj:queues:{obj_global_id}', {f'{queue}:{sha256_mess}': int(time.time())}) + r_obj_process.zadd(f'obj:queues:{obj_global_id}', {f'{queue}:{m_hash}': int(time.time())}) if module: - r_obj_process.zadd(f'obj:modules:{obj_global_id}', {f'{module}:{sha256_mess}': int(time.time())}) - r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{module}:{sha256_mess}') + r_obj_process.zadd(f'obj:modules:{obj_global_id}', {f'{module}:{m_hash}': int(time.time())}) + r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{module}:{m_hash}') -def end_processed_obj(obj_global_id, sha256_mess, module=None, queue=None): +def end_processed_obj(obj_global_id, m_hash, module=None, queue=None): if queue: - r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{queue}:{sha256_mess}') + r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{queue}:{m_hash}') if module: - r_obj_process.zrem(f'obj:modules:{obj_global_id}', f'{module}:{sha256_mess}') + r_obj_process.zrem(f'obj:modules:{obj_global_id}', f'{module}:{m_hash}') # TODO HANDLE QUEUE DELETE # process completed @@ -322,7 +322,7 @@ def save_queue_digraph(): if __name__ == '__main__': # clear_modules_queues_stats() # save_queue_digraph() - oobj_global_id = 'item::submitted/2023/06/22/submitted_f656119e-f2ea-49d7-9beb-fb97077f8fe5.gz' + oobj_global_id = 'item::submitted/2023/09/06/submitted_75fb9ff2-8c91-409d-8bd6-31769d73db8f.gz' while True: print(get_processed_obj(oobj_global_id)) time.sleep(0.5) diff --git a/configs/core.cfg.sample b/configs/core.cfg.sample index bb9054fc..62e9efc3 100644 --- a/configs/core.cfg.sample +++ b/configs/core.cfg.sample @@ -154,6 +154,11 @@ host = localhost port = 6381 db = 0 +[Redis_Process] +host = localhost +port = 6381 +db = 2 + [Redis_Mixer_Cache] host = localhost port = 6381 diff --git a/requirements.txt b/requirements.txt index 8bb16553..30cd6c3f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,6 +17,7 @@ websockets>9.0 crcmod mmh3>2.5 ssdeep>3.3 +xxhash>3.1.0 # ZMQ zmq