From 676b0f84effea7e08196591ca0f2e56ca2865c76 Mon Sep 17 00:00:00 2001 From: terrtia Date: Wed, 11 Oct 2023 12:06:01 +0200 Subject: [PATCH] chg: [module + queues] track + rename object global ID by module --- bin/core/Sync_importer.py | 9 +-- bin/core/Sync_module.py | 53 ++++++++++++++-- bin/crawlers/Crawler.py | 19 +++--- bin/importer/FeederImporter.py | 10 +-- bin/importer/FileImporter.py | 10 ++- bin/importer/PystemonImporter.py | 14 ++++- bin/importer/ZMQImporter.py | 15 ++--- bin/importer/abstract_importer.py | 40 +++++++----- bin/importer/feeders/Telegram.py | 1 + bin/lib/ail_core.py | 4 ++ bin/lib/ail_queues.py | 51 +++++++++++++-- bin/lib/objects/Items.py | 21 ++++++- bin/lib/objects/ail_objects.py | 2 + bin/modules/Global.py | 101 +++++++++++++----------------- bin/modules/Mixer.py | 43 ++++++++----- bin/modules/SubmitPaste.py | 8 ++- bin/modules/Tags.py | 3 - bin/modules/abstract_module.py | 3 +- 18 files changed, 267 insertions(+), 140 deletions(-) diff --git a/bin/core/Sync_importer.py b/bin/core/Sync_importer.py index bf70b67e..8bb11669 100755 --- a/bin/core/Sync_importer.py +++ b/bin/core/Sync_importer.py @@ -23,7 +23,7 @@ sys.path.append(os.environ['AIL_BIN']) ################################## from core import ail_2_ail from modules.abstract_module import AbstractModule -# from lib.ConfigLoader import ConfigLoader +from lib.objects.Items import Item #### CONFIG #### # config_loader = ConfigLoader() @@ -76,10 +76,11 @@ class Sync_importer(AbstractModule): # # TODO: create default id item_id = ail_stream['meta']['ail:id'] + item = Item(item_id) - message = f'sync {item_id} {b64_gzip_content}' - print(item_id) - self.add_message_to_queue(message, 'Importers') + message = f'sync {b64_gzip_content}' + print(item.id) + self.add_message_to_queue(obj=item, message=message, queue='Importers') if __name__ == '__main__': diff --git a/bin/core/Sync_module.py b/bin/core/Sync_module.py index 22f814f4..857efaa3 100755 --- a/bin/core/Sync_module.py +++ b/bin/core/Sync_module.py @@ -15,17 +15,20 @@ This module . import os import sys import time +import traceback sys.path.append(os.environ['AIL_BIN']) ################################## # Import Project packages ################################## from core import ail_2_ail -from lib.objects.Items import Item +from lib.ail_queues import get_processed_end_obj +from lib.exceptions import ModuleQueueError +from lib.objects import ail_objects from modules.abstract_module import AbstractModule -class Sync_module(AbstractModule): +class Sync_module(AbstractModule): # TODO KEEP A QUEUE ??????????????????????????????????????????????? """ Sync_module module for AIL framework """ @@ -53,7 +56,7 @@ class Sync_module(AbstractModule): print('sync queues refreshed') print(self.dict_sync_queues) - obj = self.get_obj() + obj = ail_objects.get_obj_from_global_id(message) tags = obj.get_tags() @@ -67,10 +70,52 @@ class Sync_module(AbstractModule): obj_dict = obj.get_default_meta() # send to queue push and/or pull for dict_ail in self.dict_sync_queues[queue_uuid]['ail_instances']: - print(f'ail_uuid: {dict_ail["ail_uuid"]} obj: {message}') + print(f'ail_uuid: {dict_ail["ail_uuid"]} obj: {obj.type}:{obj.get_subtype(r_str=True)}:{obj.id}') ail_2_ail.add_object_to_sync_queue(queue_uuid, dict_ail['ail_uuid'], obj_dict, push=dict_ail['push'], pull=dict_ail['pull']) + def run(self): + """ + Run Module endless process + """ + + # Endless loop processing messages from the input queue + while self.proceed: + # Get one message (paste) from the QueueIn (copy of Redis_Global publish) + global_id = get_processed_end_obj() + if global_id: + try: + # Module processing with the message from the queue + self.compute(global_id) + except Exception as err: + if self.debug: + self.queue.error() + raise err + + # LOG ERROR + trace = traceback.format_tb(err.__traceback__) + trace = ''.join(trace) + self.logger.critical(f"Error in module {self.module_name}: {__name__} : {err}") + self.logger.critical(f"Module {self.module_name} input message: {global_id}") + self.logger.critical(trace) + + if isinstance(err, ModuleQueueError): + self.queue.error() + raise err + # remove from set_module + ## check if item process == completed + + if self.obj: + self.queue.end_message(self.obj.get_global_id(), self.sha256_mess) + self.obj = None + self.sha256_mess = None + + else: + self.computeNone() + # Wait before next process + self.logger.debug(f"{self.module_name}, waiting for new message, Idling {self.pending_seconds}s") + time.sleep(self.pending_seconds) + if __name__ == '__main__': diff --git a/bin/crawlers/Crawler.py b/bin/crawlers/Crawler.py index adaf7bbf..3332299d 100755 --- a/bin/crawlers/Crawler.py +++ b/bin/crawlers/Crawler.py @@ -262,23 +262,24 @@ class Crawler(AbstractModule): if 'html' in entries and entries.get('html'): item_id = crawlers.create_item_id(self.items_dir, self.domain.id) - print(item_id) - gzip64encoded = crawlers.get_gzipped_b64_item(item_id, entries['html']) + item = Item(item_id) + print(item.id) + + gzip64encoded = crawlers.get_gzipped_b64_item(item.id, entries['html']) # send item to Global - relay_message = f'crawler item::{item_id} {gzip64encoded}' - self.add_message_to_queue(relay_message, 'Importers') + relay_message = f'crawler {gzip64encoded}' + self.add_message_to_queue(obj=item, message=relay_message, queue='Importers') - # Tag - msg = f'infoleak:submission="crawler";{item_id}' # TODO FIXME - self.add_message_to_queue(msg, 'Tags') + # Tag # TODO replace me with metadata to tags + msg = f'infoleak:submission="crawler"' # TODO FIXME + self.add_message_to_queue(obj=item, message=msg, queue='Tags') + # TODO replace me with metadata to add crawlers.create_item_metadata(item_id, last_url, parent_id) if self.root_item is None: self.root_item = item_id parent_id = item_id - item = Item(item_id) - title_content = crawlers.extract_title_from_html(entries['html']) if title_content: title = Titles.create_title(title_content) diff --git a/bin/importer/FeederImporter.py b/bin/importer/FeederImporter.py index 8c8b08cb..24b9bcb8 100755 --- a/bin/importer/FeederImporter.py +++ b/bin/importer/FeederImporter.py @@ -94,9 +94,9 @@ class FeederImporter(AbstractImporter): if obj.type == 'item': # object save on disk as file (Items) gzip64_content = feeder.get_gzip64_content() - return f'{feeder_name} {obj.get_global_id()} {gzip64_content}' + return obj, f'{feeder_name} {gzip64_content}' else: # Messages save on DB - return f'{feeder_name} {obj.get_global_id()}' + return obj, f'{feeder_name}' class FeederModuleImporter(AbstractModule): @@ -115,8 +115,10 @@ class FeederModuleImporter(AbstractModule): def compute(self, message): # TODO HANDLE Invalid JSON json_data = json.loads(message) - relay_message = self.importer.importer(json_data) - self.add_message_to_queue(message=relay_message) + # TODO multiple objs + messages + obj, relay_message = self.importer.importer(json_data) + #### + self.add_message_to_queue(obj=obj, message=relay_message) # Launch Importer diff --git a/bin/importer/FileImporter.py b/bin/importer/FileImporter.py index 820e7f53..d9a31ba2 100755 --- a/bin/importer/FileImporter.py +++ b/bin/importer/FileImporter.py @@ -22,6 +22,8 @@ from lib import ail_logger # from lib.ail_queues import AILQueue from lib import ail_files # TODO RENAME ME +from lib.objects.Items import Item + logging.config.dictConfig(ail_logger.get_config(name='modules')) class FileImporter(AbstractImporter): @@ -44,10 +46,12 @@ class FileImporter(AbstractImporter): elif not ail_files.is_text(mimetype): # # # # return None - # TODO handle multiple objects - message = self.create_message(item_id, content, gzipped=gzipped, source='dir_import') + source = 'dir_import' + message = self.create_message(content, gzipped=gzipped, source=source) + self.logger.info(f'{source} {item_id}') + obj = Item(item_id) if message: - self.add_message_to_queue(message=message) + self.add_message_to_queue(obj, message=message) class DirImporter(AbstractImporter): def __init__(self): diff --git a/bin/importer/PystemonImporter.py b/bin/importer/PystemonImporter.py index 69733ed0..1c0692b9 100755 --- a/bin/importer/PystemonImporter.py +++ b/bin/importer/PystemonImporter.py @@ -22,6 +22,8 @@ from importer.abstract_importer import AbstractImporter from modules.abstract_module import AbstractModule from lib.ConfigLoader import ConfigLoader +from lib.objects.Items import Item + class PystemonImporter(AbstractImporter): def __init__(self, pystemon_dir, host='localhost', port=6379, db=10): super().__init__() @@ -53,10 +55,13 @@ class PystemonImporter(AbstractImporter): gzipped = False # TODO handle multiple objects - return self.create_message(item_id, content, gzipped=gzipped, source='pystemon') + source = 'pystemon' + message = self.create_message(content, gzipped=gzipped, source=source) + self.logger.info(f'{source} {item_id}') + return item_id, message except IOError as e: - print(f'Error: {full_item_path}, IOError') + self.logger.error(f'Error {e}: {full_item_path}, IOError') return None @@ -80,7 +85,10 @@ class PystemonModuleImporter(AbstractModule): return self.importer.importer() def compute(self, message): - self.add_message_to_queue(message=message) + if message: + item_id, message = message + item = Item(item_id) + self.add_message_to_queue(obj=item, message=message) if __name__ == '__main__': diff --git a/bin/importer/ZMQImporter.py b/bin/importer/ZMQImporter.py index bb86880f..509b136e 100755 --- a/bin/importer/ZMQImporter.py +++ b/bin/importer/ZMQImporter.py @@ -4,15 +4,13 @@ Importer Class ================ -Import Content +ZMQ Importer """ import os import sys - import zmq - sys.path.append(os.environ['AIL_BIN']) ################################## # Import Project packages @@ -21,6 +19,8 @@ from importer.abstract_importer import AbstractImporter from modules.abstract_module import AbstractModule from lib.ConfigLoader import ConfigLoader +from lib.objects.Items import Item + class ZMQImporters(AbstractImporter): def __init__(self): super().__init__() @@ -74,18 +74,19 @@ class ZMQModuleImporter(AbstractModule): for message in messages: message = message.decode() - obj_id, gzip64encoded = message.split(' ', 1) # TODO ADD LOGS + obj_id, gzip64encoded = message.split(' ', 1) # TODO ADD LOGS splitted = obj_id.split('>>', 1) if splitted == 2: feeder_name, obj_id = splitted else: feeder_name = self.default_feeder_name - # f'{source} item::{obj_id} {content}' - relay_message = f'{feeder_name} item::{obj_id} {gzip64encoded}' + obj = Item(obj_id) + # f'{source} {content}' + relay_message = f'{feeder_name} {gzip64encoded}' print(f'feeder_name item::{obj_id}') - self.add_message_to_queue(message=relay_message) + self.add_message_to_queue(obj=obj, message=relay_message) if __name__ == '__main__': diff --git a/bin/importer/abstract_importer.py b/bin/importer/abstract_importer.py index ebe32c63..11cb0f67 100755 --- a/bin/importer/abstract_importer.py +++ b/bin/importer/abstract_importer.py @@ -54,16 +54,22 @@ class AbstractImporter(ABC): # TODO ail queues """ return self.__class__.__name__ - def add_message_to_queue(self, message, queue_name=None): + def add_message_to_queue(self, obj, message='', queue=None): """ Add message to queue + :param obj: AILObject :param message: message to send in queue - :param queue_name: queue or module name + :param queue: queue name or module name ex: add_message_to_queue(item_id, 'Mail') """ - if message: - self.queue.send_message(message, queue_name) + if not obj: + raise Exception(f'Invalid AIL object, {obj}') + obj_global_id = obj.get_global_id() + self.queue.send_message(obj_global_id, message, queue) + + def get_available_queues(self): + return self.queue.get_out_queues() @staticmethod def b64(content): @@ -85,20 +91,20 @@ class AbstractImporter(ABC): # TODO ail queues self.logger.warning(e) return '' - def create_message(self, obj_id, content, b64=False, gzipped=False, source=None): - if not gzipped: - content = self.b64_gzip(content) - elif not b64: - content = self.b64(content) - if not content: - return None - if isinstance(content, bytes): - content = content.decode() + def create_message(self, content, b64=False, gzipped=False, source=None): if not source: source = self.name - self.logger.info(f'{source} {obj_id}') - # self.logger.debug(f'{source} {obj_id} {content}') - # TODO handle multiple objects - return f'{source} item::{obj_id} {content}' + if content: + if not gzipped: + content = self.b64_gzip(content) + elif not b64: + content = self.b64(content) + if not content: + return None + if isinstance(content, bytes): + content = content.decode() + return f'{source} {content}' + else: + return f'{source}' diff --git a/bin/importer/feeders/Telegram.py b/bin/importer/feeders/Telegram.py index 4eea63da..5ef58b5f 100755 --- a/bin/importer/feeders/Telegram.py +++ b/bin/importer/feeders/Telegram.py @@ -27,6 +27,7 @@ import base64 import io import gzip +# TODO remove compression ??? def gunzip_bytes_obj(bytes_obj): gunzipped_bytes_obj = None try: diff --git a/bin/lib/ail_core.py b/bin/lib/ail_core.py index eeb83a98..9eaaca97 100755 --- a/bin/lib/ail_core.py +++ b/bin/lib/ail_core.py @@ -93,6 +93,10 @@ def zscan_iter(r_redis, name): # count ??? ## -- Redis -- ## +def rreplace(s, old, new, occurrence): + li = s.rsplit(old, occurrence) + return new.join(li) + def paginate_iterator(iter_elems, nb_obj=50, page=1): dict_page = {'nb_all_elem': len(iter_elems)} nb_pages = dict_page['nb_all_elem'] / nb_obj diff --git a/bin/lib/ail_queues.py b/bin/lib/ail_queues.py index 38c5a42d..06ad6b0d 100755 --- a/bin/lib/ail_queues.py +++ b/bin/lib/ail_queues.py @@ -84,6 +84,18 @@ class AILQueue: add_processed_obj(obj_global_id, m_hash, module=self.name) return obj_global_id, m_hash, mess + def rename_message_obj(self, new_id, old_id): + # restrict rename function + if self.name == 'Mixer' or self.name == 'Global': + rename_processed_obj(new_id, old_id) + else: + raise ModuleQueueError('This Module can\'t rename an object ID') + + # condition -> not in any queue + # TODO EDIT meta + + + def end_message(self, obj_global_id, m_hash): end_processed_obj(obj_global_id, m_hash, module=self.name) @@ -171,6 +183,12 @@ def clear_modules_queues_stats(): def get_processed_objs(): return r_obj_process.smembers(f'objs:process') +def get_processed_end_objs(): + return r_obj_process.smembers(f'objs:processed') + +def get_processed_end_obj(): + return r_obj_process.spop(f'objs:processed') + def get_processed_objs_by_type(obj_type): return r_obj_process.zrange(f'objs:process:{obj_type}', 0, -1) @@ -219,6 +237,28 @@ def end_processed_obj(obj_global_id, m_hash, module=None, queue=None): r_obj_process.sadd(f'objs:processed', obj_global_id) # TODO use list ?????? +def rename_processed_obj(new_id, old_id): + module = get_processed_obj_modules(old_id) + # currently in a module + if len(module) == 1: + module, x_hash = module[0].split(':', 1) + obj_type = old_id.split(':', 1)[0] + r_obj_process.zrem(f'obj:modules:{old_id}', f'{module}:{x_hash}') + r_obj_process.zrem(f'objs:process:{obj_type}', old_id) + r_obj_process.srem(f'objs:process', old_id) + add_processed_obj(new_id, x_hash, module=module) + +def delete_processed_obj(obj_global_id): + for q in get_processed_obj_queues(obj_global_id): + queue, x_hash = q.split(':', 1) + r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{queue}:{x_hash}') + for m in get_processed_obj_modules(obj_global_id): + module, x_hash = m.split(':', 1) + r_obj_process.zrem(f'obj:modules:{obj_global_id}', f'{module}:{x_hash}') + obj_type = obj_global_id.split(':', 1)[0] + r_obj_process.zrem(f'objs:process:{obj_type}', obj_global_id) + r_obj_process.srem(f'objs:process', obj_global_id) + ################################################################################### @@ -322,7 +362,10 @@ def save_queue_digraph(): if __name__ == '__main__': # clear_modules_queues_stats() # save_queue_digraph() - oobj_global_id = 'item::submitted/2023/09/06/submitted_75fb9ff2-8c91-409d-8bd6-31769d73db8f.gz' - while True: - print(get_processed_obj(oobj_global_id)) - time.sleep(0.5) + oobj_global_id = 'item::submitted/2023/10/11/submitted_b5440009-05d5-4494-a807-a6d8e4a900cf.gz' + # print(get_processed_obj(oobj_global_id)) + # delete_processed_obj(oobj_global_id) + # while True: + # print(get_processed_obj(oobj_global_id)) + # time.sleep(0.5) + print(get_processed_end_objs()) diff --git a/bin/lib/objects/Items.py b/bin/lib/objects/Items.py index c2edbb40..8aaab0b2 100755 --- a/bin/lib/objects/Items.py +++ b/bin/lib/objects/Items.py @@ -11,6 +11,7 @@ import cld3 import html2text from io import BytesIO +from uuid import uuid4 from pymisp import MISPObject @@ -18,7 +19,7 @@ sys.path.append(os.environ['AIL_BIN']) ################################## # Import Project packages ################################## -from lib.ail_core import get_ail_uuid +from lib.ail_core import get_ail_uuid, rreplace from lib.objects.abstract_object import AbstractObject from lib.ConfigLoader import ConfigLoader from lib import item_basic @@ -137,9 +138,23 @@ class Item(AbstractObject): #################################################################################### #################################################################################### - def sanitize_id(self): - pass + # TODO ADD function to check if ITEM (content + file) already exists + def sanitize_id(self): + if ITEMS_FOLDER in self.id: + self.id = self.id.replace(ITEMS_FOLDER, '', 1) + + # limit filename length + basename = self.get_basename() + if len(basename) > 255: + new_basename = f'{basename[:215]}{str(uuid4())}.gz' + self.id = rreplace(self.id, basename, new_basename, 1) + + + + + + return self.id # # TODO: sanitize_id # # TODO: check if already exists ? diff --git a/bin/lib/objects/ail_objects.py b/bin/lib/objects/ail_objects.py index 89be336f..0c29d668 100755 --- a/bin/lib/objects/ail_objects.py +++ b/bin/lib/objects/ail_objects.py @@ -84,6 +84,8 @@ def get_object(obj_type, subtype, obj_id): return UserAccount(obj_id, subtype) elif obj_type == 'username': return Usernames.Username(obj_id, subtype) + else: + raise Exception(f'Unknown AIL object: {obj_type} {subtype} {obj_id}') def get_objects(objects): objs = set() diff --git a/bin/modules/Global.py b/bin/modules/Global.py index 1c05fcf3..dd3cd900 100755 --- a/bin/modules/Global.py +++ b/bin/modules/Global.py @@ -79,73 +79,56 @@ class Global(AbstractModule): self.time_last_stats = time.time() self.processed_item = 0 - def compute(self, message, r_result=False): - # Recovering the streamed message informations - splitted = message.split() + def compute(self, message, r_result=False): # TODO move OBJ ID sanitization to importer + # Recovering the streamed message infos + gzip64encoded = message - if len(splitted) == 2: - item, gzip64encoded = splitted + if self.obj.type == 'item': + if gzip64encoded: - # Remove ITEMS_FOLDER from item path (crawled item + submitted) - if self.ITEMS_FOLDER in item: - item = item.replace(self.ITEMS_FOLDER, '', 1) + # Creating the full filepath + filename = os.path.join(self.ITEMS_FOLDER, self.obj.id) + filename = os.path.realpath(filename) - file_name_item = item.split('/')[-1] - if len(file_name_item) > 255: - new_file_name_item = '{}{}.gz'.format(file_name_item[:215], str(uuid4())) - item = self.rreplace(item, file_name_item, new_file_name_item, 1) + # Incorrect filename + if not os.path.commonprefix([filename, self.ITEMS_FOLDER]) == self.ITEMS_FOLDER: + self.logger.warning(f'Global; Path traversal detected {filename}') + print(f'Global; Path traversal detected {filename}') - # Creating the full filepath - filename = os.path.join(self.ITEMS_FOLDER, item) - filename = os.path.realpath(filename) + else: + # Decode compressed base64 + decoded = base64.standard_b64decode(gzip64encoded) + new_file_content = self.gunzip_bytes_obj(filename, decoded) - # Incorrect filename - if not os.path.commonprefix([filename, self.ITEMS_FOLDER]) == self.ITEMS_FOLDER: - self.logger.warning(f'Global; Path traversal detected {filename}') - print(f'Global; Path traversal detected {filename}') + # TODO REWRITE ME + if new_file_content: + filename = self.check_filename(filename, new_file_content) + + if filename: + # create subdir + dirname = os.path.dirname(filename) + if not os.path.exists(dirname): + os.makedirs(dirname) + + with open(filename, 'wb') as f: + f.write(decoded) + + update_obj_date(self.obj.get_date(), 'item') + + self.add_message_to_queue(obj=self.obj, queue='Item') + self.processed_item += 1 + + print(self.obj.id) + if r_result: + return self.obj.id else: - # Decode compressed base64 - decoded = base64.standard_b64decode(gzip64encoded) - new_file_content = self.gunzip_bytes_obj(filename, decoded) - - if new_file_content: - filename = self.check_filename(filename, new_file_content) - - if filename: - # create subdir - dirname = os.path.dirname(filename) - if not os.path.exists(dirname): - os.makedirs(dirname) - - with open(filename, 'wb') as f: - f.write(decoded) - - item_id = filename - # remove self.ITEMS_FOLDER from - if self.ITEMS_FOLDER in item_id: - item_id = item_id.replace(self.ITEMS_FOLDER, '', 1) - - item = Item(item_id) - - update_obj_date(item.get_date(), 'item') - - self.add_message_to_queue(obj=item, queue='Item') - self.processed_item += 1 - - # DIRTY FIX AIL SYNC - SEND TO SYNC MODULE - # # FIXME: DIRTY FIX - message = f'{item.get_type()};{item.get_subtype(r_str=True)};{item.get_id()}' - print(message) - self.add_message_to_queue(obj=item, queue='Sync') - - print(item_id) - if r_result: - return item_id - + self.logger.info(f"Empty Item: {message} not processed") + elif self.obj: + # TODO send to specific object queue => image, ... + self.add_message_to_queue(obj=self.obj, queue='Item') else: - self.logger.debug(f"Empty Item: {message} not processed") - print(f"Empty Item: {message} not processed") + self.logger.critical(f"Empty obj: {self.obj} {message} not processed") def check_filename(self, filename, new_file_content): """ diff --git a/bin/modules/Mixer.py b/bin/modules/Mixer.py index 7ca0985d..8d9d513c 100755 --- a/bin/modules/Mixer.py +++ b/bin/modules/Mixer.py @@ -9,7 +9,7 @@ This module is consuming the Redis-list created by the ZMQ_Feed_Q Module. This module take all the feeds provided in the config. -Depending on the configuration, this module will process the feed as follow: +Depending on the configuration, this module will process the feed as follows: operation_mode 1: "Avoid any duplicate from any sources" - The module maintain a list of content for each item - If the content is new, process it @@ -64,9 +64,6 @@ class Mixer(AbstractModule): self.ttl_key = config_loader.get_config_int("Module_Mixer", "ttl_duplicate") self.default_feeder_name = config_loader.get_config_str("Module_Mixer", "default_unnamed_feed_name") - self.ITEMS_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "pastes")) + '/' - self.ITEMS_FOLDER = os.path.join(os.path.realpath(self.ITEMS_FOLDER), '') - self.nb_processed_items = 0 self.feeders_processed = {} self.feeders_duplicate = {} @@ -138,27 +135,38 @@ class Mixer(AbstractModule): def compute(self, message): self.refresh_stats() + # obj = self.obj + # TODO CHECK IF NOT self.object -> get object global ID from message + splitted = message.split() - # message -> # feeder_name - object - content - # or # message -> # feeder_name - object + # message -> feeder_name - content + # or message -> feeder_name # feeder_name - object - if len(splitted) == 2: # feeder_name - object (content already saved) - feeder_name, obj_id = splitted + if len(splitted) == 1: # feeder_name - object (content already saved) + feeder_name = message + gzip64encoded = None # Feeder name in message: "feeder obj_id gzip64encoded" - elif len(splitted) == 3: # gzip64encoded content - feeder_name, obj_id, gzip64encoded = splitted + elif len(splitted) == 2: # gzip64encoded content + feeder_name, gzip64encoded = splitted else: - print('Invalid message: not processed') - self.logger.debug(f'Invalid Item: {splitted[0]} not processed') # TODO + self.logger.warning(f'Invalid Message: {splitted} not processed') return None - # remove absolute path - item_id = item_id.replace(self.ITEMS_FOLDER, '', 1) + if self.obj.type == 'item': + # Remove ITEMS_FOLDER from item path (crawled item + submitted) + # Limit basename length + obj_id = self.obj.id + self.obj.sanitize_id() + if self.obj.id != obj_id: + self.queue.rename_message_obj(self.obj.id, obj_id) - relay_message = f'{item_id} {gzip64encoded}' + relay_message = gzip64encoded + # print(relay_message) + + # TODO only work for item object # Avoid any duplicate coming from any sources if self.operation_mode == 1: digest = hashlib.sha1(gzip64encoded.encode('utf8')).hexdigest() @@ -207,7 +215,10 @@ class Mixer(AbstractModule): # No Filtering else: self.increase_stat_processed(feeder_name) - self.add_message_to_queue(relay_message) + if self.obj.type == 'item': + self.add_message_to_queue(obj=self.obj, message=gzip64encoded) + else: + self.add_message_to_queue(obj=self.obj) if __name__ == "__main__": diff --git a/bin/modules/SubmitPaste.py b/bin/modules/SubmitPaste.py index e83a0856..4e264a73 100755 --- a/bin/modules/SubmitPaste.py +++ b/bin/modules/SubmitPaste.py @@ -25,7 +25,7 @@ from modules.abstract_module import AbstractModule from lib.objects.Items import ITEMS_FOLDER from lib import ConfigLoader from lib import Tag - +from lib.objects.Items import Item class SubmitPaste(AbstractModule): """ @@ -276,9 +276,11 @@ class SubmitPaste(AbstractModule): rel_item_path = save_path.replace(self.PASTES_FOLDER, '', 1) self.redis_logger.debug(f"relative path {rel_item_path}") + item = Item(rel_item_path) + # send paste to Global module - relay_message = f"submitted item::{rel_item_path} {gzip64encoded}" - self.add_message_to_queue(message=relay_message) + relay_message = f"submitted {gzip64encoded}" + self.add_message_to_queue(obj=item, message=relay_message) # add tags for tag in ltags: diff --git a/bin/modules/Tags.py b/bin/modules/Tags.py index 760cb138..33ea1c80 100755 --- a/bin/modules/Tags.py +++ b/bin/modules/Tags.py @@ -46,9 +46,6 @@ class Tags(AbstractModule): # Forward message to channel self.add_message_to_queue(message=tag, queue='Tag_feed') - self.add_message_to_queue(queue='Sync') - - if __name__ == '__main__': module = Tags() module.run() diff --git a/bin/modules/abstract_module.py b/bin/modules/abstract_module.py index 05e253d5..ed2fe7d3 100644 --- a/bin/modules/abstract_module.py +++ b/bin/modules/abstract_module.py @@ -96,7 +96,8 @@ class AbstractModule(ABC): self.obj = None return None - def add_message_to_queue(self, message='', obj=None, queue=None): + # TODO ADD META OBJ ???? + def add_message_to_queue(self, obj=None, message='', queue=None): """ Add message to queue :param obj: AILObject