fix: [queues] fix ended duplicate + sync queue

dev
terrtia 2023-10-11 14:31:13 +02:00
parent 676b0f84ef
commit 623ba455ff
No known key found for this signature in database
GPG Key ID: 1E1B1F50D84613D0
3 changed files with 9 additions and 17 deletions

View File

@ -33,8 +33,8 @@ class Sync_module(AbstractModule): # TODO KEEP A QUEUE ?????????????????????????
Sync_module module for AIL framework Sync_module module for AIL framework
""" """
def __init__(self): def __init__(self, queue=False): # FIXME MODIFY/ADD QUEUE
super(Sync_module, self).__init__() super(Sync_module, self).__init__(queue=queue)
# Waiting time in seconds between to message processed # Waiting time in seconds between to message processed
self.pending_seconds = 10 self.pending_seconds = 10
@ -102,13 +102,6 @@ class Sync_module(AbstractModule): # TODO KEEP A QUEUE ?????????????????????????
if isinstance(err, ModuleQueueError): if isinstance(err, ModuleQueueError):
self.queue.error() self.queue.error()
raise err raise err
# remove from set_module
## check if item process == completed
if self.obj:
self.queue.end_message(self.obj.get_global_id(), self.sha256_mess)
self.obj = None
self.sha256_mess = None
else: else:
self.computeNone() self.computeNone()
@ -119,5 +112,5 @@ class Sync_module(AbstractModule): # TODO KEEP A QUEUE ?????????????????????????
if __name__ == '__main__': if __name__ == '__main__':
module = Sync_module() module = Sync_module(queue=False) # FIXME MODIFY/ADD QUEUE
module.run() module.run()

View File

@ -199,7 +199,7 @@ def is_processed_obj_moduled(obj_global_id):
return r_obj_process.exists(f'obj:modules:{obj_global_id}') return r_obj_process.exists(f'obj:modules:{obj_global_id}')
def is_processed_obj(obj_global_id): def is_processed_obj(obj_global_id):
return is_processed_obj_queued(obj_global_id) and is_processed_obj_moduled(obj_global_id) return is_processed_obj_queued(obj_global_id) or is_processed_obj_moduled(obj_global_id)
def get_processed_obj_modules(obj_global_id): def get_processed_obj_modules(obj_global_id):
return r_obj_process.zrange(f'obj:modules:{obj_global_id}', 0, -1) return r_obj_process.zrange(f'obj:modules:{obj_global_id}', 0, -1)

View File

@ -24,7 +24,7 @@ publish = Importers,Tags
[Global] [Global]
subscribe = SaveObj subscribe = SaveObj
publish = Item,Sync publish = Item
[Duplicates] [Duplicates]
subscribe = Duplicate subscribe = Duplicate
@ -108,11 +108,7 @@ publish = Tags
[Tags] [Tags]
subscribe = Tags subscribe = Tags
publish = Tag_feed,Sync publish = Tag_feed
# dirty fix
[Sync_module]
subscribe = Sync
[MISP_Thehive_Auto_Push] [MISP_Thehive_Auto_Push]
subscribe = Tag_feed subscribe = Tag_feed
@ -165,6 +161,9 @@ publish = Tags
[Zerobins] [Zerobins]
subscribe = Url subscribe = Url
#[Sync_module]
#publish = Sync
# [My_Module_Name] # [My_Module_Name]
# subscribe = Global # Queue name # subscribe = Global # Queue name
# publish = Tags # Queue name # publish = Tags # Queue name