diff --git a/bin/Mixer.py b/bin/Mixer.py deleted file mode 100755 index b15b4d62..00000000 --- a/bin/Mixer.py +++ /dev/null @@ -1,224 +0,0 @@ -#!/usr/bin/env python3 -# -*-coding:UTF-8 -* -""" -The Mixer Module -================ - -This module is consuming the Redis-list created by the ZMQ_Feed_Q Module. - -This module take all the feeds provided in the config. -Depending on the configuration, this module will process the feed as follow: - operation_mode 1: "Avoid any duplicate from any sources" - - The module maintain a list of content for each paste - - If the content is new, process it - - Else, do not process it but keep track for statistics on duplicate - - operation_mode 2: "Keep duplicate coming from different sources" - - The module maintain a list of name given to the paste by the feeder - - If the name has not yet been seen, process it - - Elseif, the saved content associated with the paste is not the same, process it - - Else, do not process it but keep track for statistics on duplicate - - operation_mode 3: "Don't look if duplicated content" - - SImply do not bother to check if it is a duplicate - - Simply do not bother to check if it is a duplicate - -Note that the hash of the content is defined as the sha1(gzip64encoded). - -Every data coming from a named feed can be sent to a pre-processing module before going to the global module. -The mapping can be done via the variable FEED_QUEUE_MAPPING - -""" -import os -import sys - -import base64 -import hashlib -import time -from pubsublogger import publisher -import redis - -from Helper import Process - -sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) -import ConfigLoader - - -# CONFIG # -refresh_time = 30 -FEED_QUEUE_MAPPING = { "feeder2": "preProcess1" } # Map a feeder name to a pre-processing module - -if __name__ == '__main__': - publisher.port = 6380 - publisher.channel = 'Script' - - config_section = 'Mixer' - - p = Process(config_section) - - config_loader = ConfigLoader.ConfigLoader() - - # REDIS # - server = config_loader.get_redis_conn("Redis_Mixer_Cache") - server_cache = config_loader.get_redis_conn("Redis_Log_submit") - - # LOGGING # - publisher.info("Feed Script started to receive & publish.") - - # OTHER CONFIG # - operation_mode = config_loader.get_config_int("Module_Mixer", "operation_mode") - ttl_key = config_loader.get_config_int("Module_Mixer", "ttl_duplicate") - default_unnamed_feed_name = config_loader.get_config_str("Module_Mixer", "default_unnamed_feed_name") - - PASTES_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "pastes")) + '/' - config_loader = None - - # STATS # - processed_paste = 0 - processed_paste_per_feeder = {} - duplicated_paste_per_feeder = {} - time_1 = time.time() - - print('Operation mode ' + str(operation_mode)) - - while True: - - message = p.get_from_set() - if message is not None: - print(message) - splitted = message.split() - if len(splitted) == 2: - complete_paste, gzip64encoded = splitted # NEW: source, item_id, gzip64 source if len==3 ??? - - try: - #feeder_name = ( complete_paste.replace("archive/","") ).split("/")[0] - feeder_name, paste_name = complete_paste.split('>>') - feeder_name.replace(" ","") - if 'import_dir' in feeder_name: - feeder_name = feeder_name.split('/')[1] - - except ValueError as e: - feeder_name = default_unnamed_feed_name - paste_name = complete_paste - - # remove absolute path - paste_name = paste_name.replace(PASTES_FOLDER, '', 1) - - # Processed paste - processed_paste += 1 - try: - processed_paste_per_feeder[feeder_name] += 1 - except KeyError as e: - # new feeder - processed_paste_per_feeder[feeder_name] = 1 - duplicated_paste_per_feeder[feeder_name] = 0 - - - relay_message = "{0} {1}".format(paste_name, gzip64encoded) - #relay_message = b" ".join( [paste_name, gzip64encoded] ) - - digest = hashlib.sha1(gzip64encoded.encode('utf8')).hexdigest() - - # Avoid any duplicate coming from any sources - if operation_mode == 1: - if server.exists(digest): # Content already exists - #STATS - duplicated_paste_per_feeder[feeder_name] += 1 - else: # New content - - # populate Global OR populate another set based on the feeder_name - if feeder_name in FEED_QUEUE_MAPPING: - p.populate_set_out(relay_message, FEED_QUEUE_MAPPING[feeder_name]) - else: - p.populate_set_out(relay_message, 'Mixer') - - server.sadd(digest, feeder_name) - server.expire(digest, ttl_key) - - - # Keep duplicate coming from different sources - elif operation_mode == 2: - # Filter to avoid duplicate - content = server.get('HASH_'+paste_name) - if content is None: - # New content - # Store in redis for filtering - server.set('HASH_'+paste_name, digest) - server.sadd(paste_name, feeder_name) - server.expire(paste_name, ttl_key) - server.expire('HASH_'+paste_name, ttl_key) - - # populate Global OR populate another set based on the feeder_name - if feeder_name in FEED_QUEUE_MAPPING: - p.populate_set_out(relay_message, FEED_QUEUE_MAPPING[feeder_name]) - else: - p.populate_set_out(relay_message, 'Mixer') - - else: - if digest != content: - # Same paste name but different content - #STATS - duplicated_paste_per_feeder[feeder_name] += 1 - server.sadd(paste_name, feeder_name) - server.expire(paste_name, ttl_key) - - # populate Global OR populate another set based on the feeder_name - if feeder_name in FEED_QUEUE_MAPPING: - p.populate_set_out(relay_message, FEED_QUEUE_MAPPING[feeder_name]) - else: - p.populate_set_out(relay_message, 'Mixer') - - else: - # Already processed - # Keep track of processed pastes - #STATS - duplicated_paste_per_feeder[feeder_name] += 1 - continue - else: - # populate Global OR populate another set based on the feeder_name - if feeder_name in FEED_QUEUE_MAPPING: - p.populate_set_out(relay_message, FEED_QUEUE_MAPPING[feeder_name]) - else: - p.populate_set_out(relay_message, 'Mixer') - - - else: - # TODO Store the name of the empty paste inside a Redis-list. - print("Empty Paste: not processed") - publisher.debug("Empty Paste: {0} not processed".format(message)) - else: - - if int(time.time() - time_1) > refresh_time: - # update internal feeder - list_feeder = server_cache.hkeys("mixer_cache:list_feeder") - if list_feeder: - for feeder in list_feeder: - count = int(server_cache.hget("mixer_cache:list_feeder", feeder)) - if count is None: - count = 0 - processed_paste_per_feeder[feeder] = processed_paste_per_feeder.get(feeder, 0) + count - processed_paste = processed_paste + count - print(processed_paste_per_feeder) - to_print = 'Mixer; ; ; ;mixer_all All_feeders Processed {0} paste(s) in {1}sec'.format(processed_paste, refresh_time) - print(to_print) - publisher.info(to_print) - processed_paste = 0 - - for feeder, count in processed_paste_per_feeder.items(): - to_print = 'Mixer; ; ; ;mixer_{0} {0} Processed {1} paste(s) in {2}sec'.format(feeder, count, refresh_time) - print(to_print) - publisher.info(to_print) - processed_paste_per_feeder[feeder] = 0 - - for feeder, count in duplicated_paste_per_feeder.items(): - to_print = 'Mixer; ; ; ;mixer_{0} {0} Duplicated {1} paste(s) in {2}sec'.format(feeder, count, refresh_time) - print(to_print) - publisher.info(to_print) - duplicated_paste_per_feeder[feeder] = 0 - - time_1 = time.time() - - # delete internal feeder list - server_cache.delete("mixer_cache:list_feeder") - time.sleep(0.5) - continue diff --git a/bin/core/Sync_importer.py b/bin/core/Sync_importer.py index 2999f5e8..38854c15 100755 --- a/bin/core/Sync_importer.py +++ b/bin/core/Sync_importer.py @@ -22,33 +22,32 @@ sys.path.append(os.environ['AIL_BIN']) # Import Project packages ################################## from core import ail_2_ail -from lib.ConfigLoader import ConfigLoader from modules.abstract_module import AbstractModule +# from lib.ConfigLoader import ConfigLoader #### CONFIG #### -config_loader = ConfigLoader() -server_cache = config_loader.get_redis_conn("Redis_Log_submit") -config_loader = None +# config_loader = ConfigLoader() +# +# config_loader = None #### ------ #### class Sync_importer(AbstractModule): """ - Tags module for AIL framework + Sync_importer module for AIL framework """ def __init__(self): super(Sync_importer, self).__init__() - # Waiting time in secondes between to message proccessed + # Waiting time in seconds between to message processed self.pending_seconds = 10 - #self.dict_ail_sync_filters = ail_2_ail.get_all_sync_queue_dict() - #self.last_refresh = time.time() + # self.dict_ail_sync_filters = ail_2_ail.get_all_sync_queue_dict() + # self.last_refresh = time.time() # Send module state to logs self.redis_logger.info(f'Module {self.module_name} Launched') - def run(self): while self.proceed: ### REFRESH DICT @@ -67,7 +66,6 @@ class Sync_importer(AbstractModule): self.redis_logger.debug(f"{self.module_name}, waiting for new message, Idling {self.pending_seconds}s") time.sleep(self.pending_seconds) - def compute(self, ail_stream): # # TODO: SANITYZE AIL STREAM @@ -79,15 +77,11 @@ class Sync_importer(AbstractModule): # # TODO: create default id item_id = ail_stream['meta']['ail:id'] - message = f'{item_id} {b64_gzip_content}' + message = f'sync {item_id} {b64_gzip_content}' print(item_id) self.send_message_to_queue(message, 'Mixer') - # increase nb of item by ail sync - server_cache.hincrby("mixer_cache:list_feeder", 'AIL_Sync', 1) - if __name__ == '__main__': - module = Sync_importer() module.run() diff --git a/bin/crawlers/Crawler.py b/bin/crawlers/Crawler.py index 3e0ad17d..1c1245a7 100755 --- a/bin/crawlers/Crawler.py +++ b/bin/crawlers/Crawler.py @@ -27,7 +27,6 @@ class Crawler(AbstractModule): self.pending_seconds = 1 config_loader = ConfigLoader() - self.r_log_submit = config_loader.get_redis_conn('Redis_Log_submit') self.default_har = config_loader.get_config_boolean('Crawler', 'default_har') self.default_screenshot = config_loader.get_config_boolean('Crawler', 'default_screenshot') @@ -228,10 +227,8 @@ class Crawler(AbstractModule): print(item_id) gzip64encoded = crawlers.get_gzipped_b64_item(item_id, entries['html']) # send item to Global - relay_message = f'{item_id} {gzip64encoded}' + relay_message = f'crawler {item_id} {gzip64encoded}' self.send_message_to_queue(relay_message, 'Mixer') - # increase nb of paste by feeder name - self.r_log_submit.hincrby('mixer_cache:list_feeder', 'crawler', 1) # Tag msg = f'infoleak:submission="crawler";{item_id}' diff --git a/bin/importer/FeederImporter.py b/bin/importer/FeederImporter.py index adc2bcf6..f63eea36 100755 --- a/bin/importer/FeederImporter.py +++ b/bin/importer/FeederImporter.py @@ -93,7 +93,7 @@ class FeederImporter(AbstractImporter): feeder.process_meta() gzip64_content = feeder.get_gzip64_content() - return f'{item_id} {gzip64_content}' + return f'{feeder_name} {item_id} {gzip64_content}' class FeederModuleImporter(AbstractModule): @@ -115,10 +115,6 @@ class FeederModuleImporter(AbstractModule): relay_message = self.importer.importer(json_data) self.send_message_to_queue(relay_message) - # TODO IN MIXER - # increase nb of paste by feeder name - # server_cache.hincrby("mixer_cache:list_feeder", feeder_name, 1) - # Launch Importer if __name__ == '__main__': diff --git a/bin/importer/ZMQImporter.py b/bin/importer/ZMQImporter.py index 710b3ddc..100b166d 100755 --- a/bin/importer/ZMQImporter.py +++ b/bin/importer/ZMQImporter.py @@ -56,11 +56,12 @@ class ZMQModuleImporter(AbstractModule): super().__init__() config_loader = ConfigLoader() - address = config_loader.get_config_str('ZMQ_Global', 'address') + addresses = config_loader.get_config_str('ZMQ_Global', 'address') + addresses = addresses.split(',').strip() channel = config_loader.get_config_str('ZMQ_Global', 'channel') self.zmq_importer = ZMQImporters() - # TODO register all Importers - self.zmq_importer.add(address, channel) + for address in addresses: + self.zmq_importer.add(address, channel) # TODO MESSAGE SOURCE - UI def get_message(self): diff --git a/bin/modules/Mixer.py b/bin/modules/Mixer.py new file mode 100755 index 00000000..f57aefa3 --- /dev/null +++ b/bin/modules/Mixer.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* +""" +The Mixer Module +================ + +This module is consuming the Redis-list created by the ZMQ_Feed_Q Module. + +This module take all the feeds provided in the config. + + +Depending on the configuration, this module will process the feed as follow: + operation_mode 1: "Avoid any duplicate from any sources" + - The module maintain a list of content for each item + - If the content is new, process it + - Else, do not process it but keep track for statistics on duplicate + + DISABLED + operation_mode 2: "Keep duplicate coming from different sources" + - The module maintain a list of name given to the item by the feeder + - If the name has not yet been seen, process it + - Elseif, the saved content associated with the item is not the same, process it + - Else, do not process it but keep track for statistics on duplicate + + operation_mode 3: "Don't look if duplicated content" + - SImply do not bother to check if it is a duplicate + - Simply do not bother to check if it is a duplicate + +Note that the hash of the content is defined as the sha1(gzip64encoded). + +""" +import os +import sys + +import hashlib +import time + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from modules.abstract_module import AbstractModule +from lib.ConfigLoader import ConfigLoader + + +class Mixer(AbstractModule): + """docstring for Mixer module.""" + + def __init__(self): + super(Mixer, self).__init__() + + config_loader = ConfigLoader() + self.r_cache = config_loader.get_redis_conn("Redis_Mixer_Cache") + # self.r_cache_s = config_loader.get_redis_conn("Redis_Log_submit") + + self.pending_seconds = 5 + + self.refresh_time = 30 + self.last_refresh = time.time() + + self.operation_mode = config_loader.get_config_int("Module_Mixer", "operation_mode") + print(f'Operation mode {self.operation_mode}') + + self.ttl_key = config_loader.get_config_int("Module_Mixer", "ttl_duplicate") + self.default_feeder_name = config_loader.get_config_str("Module_Mixer", "default_unnamed_feed_name") + + self.ITEMS_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "pastes")) + '/' + self.ITEMS_FOLDER = os.path.join(os.path.realpath(self.ITEMS_FOLDER), '') + + self.nb_processed_items = 0 + self.feeders_processed = {} + self.feeders_duplicate = {} + + self.redis_logger.info(f"Module: {self.module_name} Launched") + + # TODO Save stats in cache + # def get_feeders(self): + # return self.r_cache_s.smembers("mixer_cache:feeders") + # + # def get_feeder_nb_last_processed(self, feeder): + # nb = self.r_cache_s.hget("mixer_cache:feeders:last_processed", feeder) + # if nb: + # return int(nb) + # else: + # return 0 + # + # def get_cache_feeders_nb_last_processed(self): + # feeders = {} + # for feeder in self.get_feeders(): + # feeders[feeder] = self.get_feeder_nb_last_processed(feeder) + # return feeders + + def clear_feeders_stat(self): + pass + # self.r_cache_s.delete("mixer_cache:feeders:last_processed") + + def increase_stat_processed(self, feeder): + self.nb_processed_items += 1 + try: + self.feeders_processed[feeder] += 1 + except KeyError: + self.feeders_processed[feeder] = 1 + + def increase_stat_duplicate(self, feeder): + self.nb_processed_items += 1 + try: + self.feeders_duplicate[feeder] += 1 + except KeyError: + self.feeders_duplicate[feeder] = 1 + + # TODO Save stats in cache + def refresh_stats(self): + if int(time.time() - self.last_refresh) > self.refresh_time: + # update internal feeder + to_print = f'Mixer; ; ; ;mixer_all All_feeders Processed {self.nb_processed_items} item(s) in {self.refresh_time}sec' + print(to_print) + self.redis_logger.info(to_print) + self.nb_processed_items = 0 + + for feeder in self.feeders_processed: + to_print = f'Mixer; ; ; ;mixer_{feeder} {feeder} Processed {self.feeders_processed[feeder]} item(s) in {self.refresh_time}sec' + print(to_print) + self.redis_logger.info(to_print) + self.feeders_processed[feeder] = 0 + + for feeder in self.feeders_duplicate: + to_print = f'Mixer; ; ; ;mixer_{feeder} {feeder} Duplicated {self.feeders_duplicate[feeder]} item(s) in {self.refresh_time}sec' + print(to_print) + self.redis_logger.info(to_print) + self.feeders_duplicate[feeder] = 0 + + self.last_refresh = time.time() + self.clear_feeders_stat() + time.sleep(0.5) + + def computeNone(self): + self.refresh_stats() + + def compute(self, message): + self.refresh_stats() + splitted = message.split() + # Old Feeder name "feeder>>item_id gzip64encoded" + if len(splitted) == 2: + item_id, gzip64encoded = splitted + try: + feeder_name, item_id = item_id.split('>>') + feeder_name.replace(" ", "") + if 'import_dir' in feeder_name: + feeder_name = feeder_name.split('/')[1] + except ValueError: + feeder_name = self.default_feeder_name + # Feeder name in message: "feeder item_id gzip64encoded" + elif len(splitted) == 3: + feeder_name, item_id, gzip64encoded = splitted + else: + print('Invalid message: not processed') + self.redis_logger.debug('Invalid Item: {message} not processed') + return None + + # remove absolute path + item_id = item_id.replace(self.ITEMS_FOLDER, '', 1) + + relay_message = f'{item_id} {gzip64encoded}' + + # Avoid any duplicate coming from any sources + if self.operation_mode == 1: + digest = hashlib.sha1(gzip64encoded.encode('utf8')).hexdigest() + if self.r_cache.exists(digest): # Content already exists + # STATS + self.increase_stat_duplicate(feeder_name) + else: # New content + self.r_cache.sadd(digest, feeder_name) + self.r_cache.expire(digest, self.ttl_key) + + self.increase_stat_processed(feeder_name) + self.send_message_to_queue(relay_message) + + # Need To Be Fixed, Currently doesn't check the source (-> same as operation 1) + # # Keep duplicate coming from different sources + # elif self.operation_mode == 2: + # digest = hashlib.sha1(gzip64encoded.encode('utf8')).hexdigest() + # # Filter to avoid duplicate + # older_digest = self.r_cache.get(f'HASH_{item_id}') + # if older_digest is None: + # # New content + # # Store in redis for filtering + # self.r_cache.set(f'HASH_{item_id}', digest) + # self.r_cache.sadd(item_id, feeder_name) + # self.r_cache.expire(item_id, self.ttl_key) + # self.r_cache.expire(f'HASH_{item_id}', self.ttl_key) + # + # self.send_message_to_queue(relay_message) + # + # else: + # if digest != older_digest: + # # Same item name but different content + # # STATS + # self.increase_stat_duplicate(feeder_name) + # self.r_cache.sadd(item_id, feeder_name) + # self.r_cache.expire(item_id, ttl_key) + # + # self.send_message_to_queue(relay_message) + # + # else: + # # Already processed + # # Keep track of processed items + # # STATS + # self.increase_stat_duplicate(feeder_name) + + # No Filtering + else: + self.increase_stat_processed(feeder_name) + self.send_message_to_queue(relay_message) + + +if __name__ == "__main__": + module = Mixer() + module.run() diff --git a/bin/modules/submit_paste.py b/bin/modules/submit_paste.py index 9afc55b2..b483a99d 100755 --- a/bin/modules/submit_paste.py +++ b/bin/modules/submit_paste.py @@ -284,12 +284,9 @@ class SubmitPaste(AbstractModule): self.redis_logger.debug(f"relative path {rel_item_path}") # send paste to Global module - relay_message = f"{rel_item_path} {gzip64encoded}" + relay_message = f"submitted {rel_item_path} {gzip64encoded}" self.process.populate_set_out(relay_message, 'Mixer') - # increase nb of paste by feeder name - self.r_serv_log_submit.hincrby("mixer_cache:list_feeder", source, 1) - # add tags for tag in ltags: Tag.add_object_tag(tag, 'item', rel_item_path) diff --git a/bin/packages/modules.cfg b/bin/packages/modules.cfg index c3243abc..ca9b9560 100644 --- a/bin/packages/modules.cfg +++ b/bin/packages/modules.cfg @@ -10,7 +10,6 @@ publish = Redis_Import # subscribe = ZMQ_Global subscribe = Redis_Import publish = Redis_Mixer -#publish = Redis_Mixer,Redis_preProcess1 [Sync_importer] publish = Redis_Mixer,Redis_Tags @@ -168,10 +167,6 @@ publish = Redis_Tags [Zerobins] subscribe = Redis_Url -#[PreProcessFeed] -#subscribe = Redis_preProcess1 -#publish = Redis_Mixer - # [My_Module] # subscribe = Redis_Global # publish = Redis_Tags