From b459498db23f3b908bfc1b30793a298d786fdd0e Mon Sep 17 00:00:00 2001 From: Terrtia Date: Thu, 22 Jun 2023 15:38:04 +0200 Subject: [PATCH] chg: [queues] track object + check if object processed --- bin/core/Sync_module.py | 44 +++---- bin/importer/FeederImporter.py | 2 +- bin/importer/FileImporter.py | 2 +- bin/importer/PystemonImporter.py | 2 +- bin/importer/ZMQImporter.py | 2 +- bin/lib/ail_queues.py | 166 ++++++++++++++++--------- bin/lib/objects/ail_objects.py | 24 ++-- bin/modules/ApiKey.py | 16 +-- bin/modules/Categ.py | 6 +- bin/modules/Credential.py | 9 +- bin/modules/CreditCards.py | 8 +- bin/modules/Cryptocurrencies.py | 10 +- bin/modules/CveModule.py | 9 +- bin/modules/Decoder.py | 7 +- bin/modules/DomClassifier.py | 6 +- bin/modules/Duplicates.py | 2 +- bin/modules/Global.py | 4 +- bin/modules/Hosts.py | 6 +- bin/modules/IPAddress.py | 6 +- bin/modules/Iban.py | 6 +- bin/modules/Indexer.py | 6 +- bin/modules/Keys.py | 60 ++++----- bin/modules/Languages.py | 2 +- bin/modules/LibInjection.py | 8 +- bin/modules/MISP_Thehive_Auto_Push.py | 5 +- bin/modules/Mail.py | 8 +- bin/modules/Mixer.py | 2 +- bin/modules/Onion.py | 8 +- bin/modules/PgpDump.py | 6 +- bin/modules/Phone.py | 6 +- bin/modules/SQLInjectionDetection.py | 9 +- bin/modules/SubmitPaste.py | 5 +- bin/modules/Tags.py | 27 ++-- bin/modules/Telegram.py | 6 +- bin/modules/Tools.py | 6 +- bin/modules/Urls.py | 12 +- bin/modules/Zerobins.py | 5 +- bin/modules/abstract_module.py | 41 +++++- bin/trackers/Tracker_Regex.py | 7 +- bin/trackers/Tracker_Term.py | 7 +- bin/trackers/Tracker_Typo_Squatting.py | 8 +- bin/trackers/Tracker_Yara.py | 7 +- 42 files changed, 311 insertions(+), 277 deletions(-) diff --git a/bin/core/Sync_module.py b/bin/core/Sync_module.py index e37d48d6..22f814f4 100755 --- a/bin/core/Sync_module.py +++ b/bin/core/Sync_module.py @@ -53,37 +53,23 @@ class Sync_module(AbstractModule): print('sync queues refreshed') print(self.dict_sync_queues) - # Extract object from message - # # TODO: USE JSON DICT ???? - mess_split = message.split(';') - if len(mess_split) == 3: - obj_type = mess_split[0] - obj_subtype = mess_split[1] - obj_id = mess_split[2] + obj = self.get_obj() - # OBJECT => Item - # if obj_type == 'item': - obj = Item(obj_id) + tags = obj.get_tags() - tags = obj.get_tags() - - # check filter + tags - # print(message) - for queue_uuid in self.dict_sync_queues: - filter_tags = self.dict_sync_queues[queue_uuid]['filter'] - if filter_tags and tags: - # print('tags: {tags} filter: {filter_tags}') - if filter_tags.issubset(tags): - obj_dict = obj.get_default_meta() - # send to queue push and/or pull - for dict_ail in self.dict_sync_queues[queue_uuid]['ail_instances']: - print(f'ail_uuid: {dict_ail["ail_uuid"]} obj: {message}') - ail_2_ail.add_object_to_sync_queue(queue_uuid, dict_ail['ail_uuid'], obj_dict, - push=dict_ail['push'], pull=dict_ail['pull']) - - else: - # Malformed message - raise Exception(f'too many values to unpack (expected 3) given {len(mess_split)} with message {message}') + # check filter + tags + # print(message) + for queue_uuid in self.dict_sync_queues: + filter_tags = self.dict_sync_queues[queue_uuid]['filter'] + if filter_tags and tags: + # print('tags: {tags} filter: {filter_tags}') + if filter_tags.issubset(tags): + obj_dict = obj.get_default_meta() + # send to queue push and/or pull + for dict_ail in self.dict_sync_queues[queue_uuid]['ail_instances']: + print(f'ail_uuid: {dict_ail["ail_uuid"]} obj: {message}') + ail_2_ail.add_object_to_sync_queue(queue_uuid, dict_ail['ail_uuid'], obj_dict, + push=dict_ail['push'], pull=dict_ail['pull']) if __name__ == '__main__': diff --git a/bin/importer/FeederImporter.py b/bin/importer/FeederImporter.py index e7a06132..7acd6ae9 100755 --- a/bin/importer/FeederImporter.py +++ b/bin/importer/FeederImporter.py @@ -113,7 +113,7 @@ class FeederModuleImporter(AbstractModule): # TODO HANDLE Invalid JSON json_data = json.loads(message) relay_message = self.importer.importer(json_data) - self.add_message_to_queue(relay_message) + self.add_message_to_queue(message=relay_message) # Launch Importer diff --git a/bin/importer/FileImporter.py b/bin/importer/FileImporter.py index 410cc65d..4a926a41 100755 --- a/bin/importer/FileImporter.py +++ b/bin/importer/FileImporter.py @@ -46,7 +46,7 @@ class FileImporter(AbstractImporter): message = self.create_message(item_id, content, gzipped=gzipped, source='dir_import') if message: - self.add_message_to_queue(message) + self.add_message_to_queue(message=message) class DirImporter(AbstractImporter): def __init__(self): diff --git a/bin/importer/PystemonImporter.py b/bin/importer/PystemonImporter.py index 536801ba..df7d8d3a 100755 --- a/bin/importer/PystemonImporter.py +++ b/bin/importer/PystemonImporter.py @@ -74,7 +74,7 @@ class PystemonModuleImporter(AbstractModule): return self.importer.importer() def compute(self, message): - self.add_message_to_queue(message) + self.add_message_to_queue(message=message) if __name__ == '__main__': diff --git a/bin/importer/ZMQImporter.py b/bin/importer/ZMQImporter.py index dd9986d2..91728cf9 100755 --- a/bin/importer/ZMQImporter.py +++ b/bin/importer/ZMQImporter.py @@ -73,7 +73,7 @@ class ZMQModuleImporter(AbstractModule): for message in messages: message = message.decode() print(message.split(' ', 1)[0]) - self.add_message_to_queue(message) + self.add_message_to_queue(message=message) if __name__ == '__main__': diff --git a/bin/lib/ail_queues.py b/bin/lib/ail_queues.py index 81014024..3ab68708 100755 --- a/bin/lib/ail_queues.py +++ b/bin/lib/ail_queues.py @@ -6,6 +6,8 @@ import sys import datetime import time +from hashlib import sha256 + sys.path.append(os.environ['AIL_BIN']) ################################## # Import Project packages @@ -15,10 +17,16 @@ from lib.ConfigLoader import ConfigLoader config_loader = ConfigLoader() r_queues = config_loader.get_redis_conn("Redis_Queues") +r_obj_process = config_loader.get_redis_conn("Redis_Process") config_loader = None MODULES_FILE = os.path.join(os.environ['AIL_HOME'], 'configs', 'modules.cfg') +# # # # # # # # +# # +# AIL QUEUE # +# # +# # # # # # # # class AILQueue: @@ -60,16 +68,26 @@ class AILQueue: # Update queues stats r_queues.hset('queues', self.name, self.get_nb_messages()) r_queues.hset(f'modules', f'{self.pid}:{self.name}', int(time.time())) + # Get Message message = r_queues.lpop(f'queue:{self.name}:in') if not message: return None else: - # TODO SAVE CURRENT ITEMS (OLD Module information) + row_mess = message.split(';', 1) + if len(row_mess) != 2: + return None, None, message + # raise Exception(f'Error: queue {self.name}, no AIL object provided') + else: + obj_global_id, mess = row_mess + sha256_mess = sha256(message.encode()).hexdigest() + add_processed_obj(obj_global_id, sha256_mess, module=self.name) + return obj_global_id, sha256_mess, mess - return message + def end_message(self, obj_global_id, sha256_mess): + end_processed_obj(obj_global_id, sha256_mess, module=self.name) - def send_message(self, message, queue_name=None): + def send_message(self, obj_global_id, message='', queue_name=None): if not self.subscribers_modules: raise ModuleQueueError('This Module don\'t have any subscriber') if queue_name: @@ -80,8 +98,17 @@ class AILQueue: raise ModuleQueueError('Queue name required. This module push to multiple queues') queue_name = list(self.subscribers_modules)[0] + message = f'{obj_global_id};{message}' + if obj_global_id != '::': + sha256_mess = sha256(message.encode()).hexdigest() + else: + sha256_mess = None + # Add message to all modules for module_name in self.subscribers_modules[queue_name]: + if sha256_mess: + add_processed_obj(obj_global_id, sha256_mess, queue=module_name) + r_queues.rpush(f'queue:{module_name}:in', message) # stats nb_mess = r_queues.llen(f'queue:{module_name}:in') @@ -98,6 +125,7 @@ class AILQueue: def error(self): r_queues.hdel(f'modules', f'{self.pid}:{self.name}') + def get_queues_modules(): return r_queues.hkeys('queues') @@ -132,6 +160,74 @@ def get_modules_queues_stats(): def clear_modules_queues_stats(): r_queues.delete('modules') + +# # # # # # # # # +# # +# OBJ QUEUES # PROCESS ?? +# # +# # # # # # # # # + + +def get_processed_objs(): + return r_obj_process.smembers(f'objs:process') + +def get_processed_objs_by_type(obj_type): + return r_obj_process.zrange(f'objs:process:{obj_type}', 0, -1) + +def is_processed_obj_queued(obj_global_id): + return r_obj_process.exists(f'obj:queues:{obj_global_id}') + +def is_processed_obj_moduled(obj_global_id): + return r_obj_process.exists(f'obj:modules:{obj_global_id}') + +def is_processed_obj(obj_global_id): + return is_processed_obj_queued(obj_global_id) and is_processed_obj_moduled(obj_global_id) + +def get_processed_obj_modules(obj_global_id): + return r_obj_process.zrange(f'obj:modules:{obj_global_id}', 0, -1) + +def get_processed_obj_queues(obj_global_id): + return r_obj_process.zrange(f'obj:queues:{obj_global_id}', 0, -1) + +def get_processed_obj(obj_global_id): + return {'modules': get_processed_obj_modules(obj_global_id), 'queues': get_processed_obj_queues(obj_global_id)} + +def add_processed_obj(obj_global_id, sha256_mess, module=None, queue=None): + obj_type = obj_global_id.split(':', 1)[0] + new_obj = r_obj_process.sadd(f'objs:process', obj_global_id) + # first process: + if new_obj: + r_obj_process.zadd(f'objs:process:{obj_type}', {obj_global_id: int(time.time())}) + if queue: + r_obj_process.zadd(f'obj:queues:{obj_global_id}', {f'{queue}:{sha256_mess}': int(time.time())}) + if module: + r_obj_process.zadd(f'obj:modules:{obj_global_id}', {f'{module}:{sha256_mess}': int(time.time())}) + r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{module}:{sha256_mess}') + +def end_processed_obj(obj_global_id, sha256_mess, module=None, queue=None): + if queue: + r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{queue}:{sha256_mess}') + if module: + r_obj_process.zrem(f'obj:modules:{obj_global_id}', f'{module}:{sha256_mess}') + + # TODO HANDLE QUEUE DELETE + # process completed + if not is_processed_obj(obj_global_id): + obj_type = obj_global_id.split(':', 1)[0] + r_obj_process.zrem(f'objs:process:{obj_type}', obj_global_id) + r_obj_process.srem(f'objs:process', obj_global_id) + + r_obj_process.sadd(f'objs:processed', obj_global_id) # TODO use list ?????? + +################################################################################### + + +# # # # # # # # +# # +# GRAPH # +# # +# # # # # # # # + def get_queue_digraph(): queues_ail = {} modules = {} @@ -223,64 +319,10 @@ def save_queue_digraph(): sys.exit(1) -########################################################################################### -########################################################################################### -########################################################################################### -########################################################################################### -########################################################################################### - -# def get_all_queues_name(): -# return r_queues.hkeys('queues') -# -# def get_all_queues_dict_with_nb_elem(): -# return r_queues.hgetall('queues') -# -# def get_all_queues_with_sorted_nb_elem(): -# res = r_queues.hgetall('queues') -# res = sorted(res.items()) -# return res -# -# def get_module_pid_by_queue_name(queue_name): -# return r_queues.smembers('MODULE_TYPE_{}'.format(queue_name)) -# -# # # TODO: remove last msg part -# def get_module_last_process_start_time(queue_name, module_pid): -# res = r_queues.get('MODULE_{}_{}'.format(queue_name, module_pid)) -# if res: -# return res.split(',')[0] -# return None -# -# def get_module_last_msg(queue_name, module_pid): -# return r_queues.get('MODULE_{}_{}_PATH'.format(queue_name, module_pid)) -# -# def get_all_modules_queues_stats(): -# all_modules_queues_stats = [] -# for queue_name, nb_elem_queue in get_all_queues_with_sorted_nb_elem(): -# l_module_pid = get_module_pid_by_queue_name(queue_name) -# for module_pid in l_module_pid: -# last_process_start_time = get_module_last_process_start_time(queue_name, module_pid) -# if last_process_start_time: -# last_process_start_time = datetime.datetime.fromtimestamp(int(last_process_start_time)) -# seconds = int((datetime.datetime.now() - last_process_start_time).total_seconds()) -# else: -# seconds = 0 -# all_modules_queues_stats.append((queue_name, nb_elem_queue, seconds, module_pid)) -# return all_modules_queues_stats -# -# -# def _get_all_messages_from_queue(queue_name): -# #self.r_temp.hset('queues', self.subscriber_name, int(self.r_temp.scard(in_set))) -# return r_queues.smembers(f'queue:{queue_name}:in') -# -# # def is_message_in queue(queue_name): -# # pass -# -# def remove_message_from_queue(queue_name, message): -# queue_key = f'queue:{queue_name}:in' -# r_queues.srem(queue_key, message) -# r_queues.hset('queues', queue_name, int(r_queues.scard(queue_key))) - - if __name__ == '__main__': # clear_modules_queues_stats() - save_queue_digraph() + # save_queue_digraph() + oobj_global_id = 'item::submitted/2023/06/22/submitted_f656119e-f2ea-49d7-9beb-fb97077f8fe5.gz' + while True: + print(get_processed_obj(oobj_global_id)) + time.sleep(0.5) diff --git a/bin/lib/objects/ail_objects.py b/bin/lib/objects/ail_objects.py index df598a70..01990996 100755 --- a/bin/lib/objects/ail_objects.py +++ b/bin/lib/objects/ail_objects.py @@ -46,29 +46,29 @@ def sanitize_objs_types(objs): return l_types -def get_object(obj_type, subtype, id): +def get_object(obj_type, subtype, obj_id): if obj_type == 'item': - return Item(id) + return Item(obj_id) elif obj_type == 'domain': - return Domain(id) + return Domain(obj_id) elif obj_type == 'decoded': - return Decoded(id) + return Decoded(obj_id) elif obj_type == 'cookie-name': - return CookiesNames.CookieName(id) + return CookiesNames.CookieName(obj_id) elif obj_type == 'cve': - return Cve(id) + return Cve(obj_id) elif obj_type == 'favicon': - return Favicon(id) + return Favicon(obj_id) elif obj_type == 'screenshot': - return Screenshot(id) + return Screenshot(obj_id) elif obj_type == 'cryptocurrency': - return CryptoCurrencies.CryptoCurrency(id, subtype) + return CryptoCurrencies.CryptoCurrency(obj_id, subtype) elif obj_type == 'pgp': - return Pgps.Pgp(id, subtype) + return Pgps.Pgp(obj_id, subtype) elif obj_type == 'title': - return Titles.Title(id) + return Titles.Title(obj_id) elif obj_type == 'username': - return Usernames.Username(id, subtype) + return Usernames.Username(obj_id, subtype) def get_objects(objects): objs = set() diff --git a/bin/modules/ApiKey.py b/bin/modules/ApiKey.py index b9324e7b..bf54c095 100755 --- a/bin/modules/ApiKey.py +++ b/bin/modules/ApiKey.py @@ -47,8 +47,8 @@ class ApiKey(AbstractModule): self.logger.info(f"Module {self.module_name} initialized") def compute(self, message, r_result=False): - item_id, score = message.split() - item = Item(item_id) + score = message + item = self.get_obj() item_content = item.get_content() google_api_key = self.regex_findall(self.re_google_api_key, item.get_id(), item_content, r_set=True) @@ -63,8 +63,8 @@ class ApiKey(AbstractModule): print(f'found google api key: {to_print}') self.redis_logger.warning(f'{to_print}Checked {len(google_api_key)} found Google API Key;{item.get_id()}') - msg = f'infoleak:automatic-detection="google-api-key";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = 'infoleak:automatic-detection="google-api-key"' + self.add_message_to_queue(message=tag, queue='Tags') # # TODO: # FIXME: AWS regex/validate/sanitize KEY + SECRET KEY if aws_access_key: @@ -74,12 +74,12 @@ class ApiKey(AbstractModule): print(f'found AWS secret key') self.redis_logger.warning(f'{to_print}Checked {len(aws_secret_key)} found AWS secret Key;{item.get_id()}') - msg = f'infoleak:automatic-detection="aws-key";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = 'infoleak:automatic-detection="aws-key"' + self.add_message_to_queue(message=tag, queue='Tags') # Tags - msg = f'infoleak:automatic-detection="api-key";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = 'infoleak:automatic-detection="api-key"' + self.add_message_to_queue(message=tag, queue='Tags') if r_result: return google_api_key, aws_access_key, aws_secret_key diff --git a/bin/modules/Categ.py b/bin/modules/Categ.py index 60517d64..124f92bc 100755 --- a/bin/modules/Categ.py +++ b/bin/modules/Categ.py @@ -82,7 +82,7 @@ class Categ(AbstractModule): def compute(self, message, r_result=False): # Create Item Object - item = Item(message) + item = self.get_obj() # Get item content content = item.get_content() categ_found = [] @@ -94,11 +94,11 @@ class Categ(AbstractModule): lenfound = len(found) if lenfound >= self.matchingThreshold: categ_found.append(categ) - msg = f'{item.get_id()} {lenfound}' + msg = str(lenfound) # Export message to categ queue print(msg, categ) - self.add_message_to_queue(msg, categ) + self.add_message_to_queue(message=msg, queue=categ) self.redis_logger.debug( f'Categ;{item.get_source()};{item.get_date()};{item.get_basename()};Detected {lenfound} as {categ};{item.get_id()}') diff --git a/bin/modules/Credential.py b/bin/modules/Credential.py index 7fcbafb0..f20f2ba9 100755 --- a/bin/modules/Credential.py +++ b/bin/modules/Credential.py @@ -29,7 +29,6 @@ Redis organization: import os import sys import time -import re from datetime import datetime from pyfaup.faup import Faup @@ -85,8 +84,8 @@ class Credential(AbstractModule): def compute(self, message): - item_id, count = message.split() - item = Item(item_id) + count = message + item = self.get_obj() item_content = item.get_content() @@ -111,8 +110,8 @@ class Credential(AbstractModule): print(f"========> Found more than 10 credentials in this file : {item.get_id()}") self.redis_logger.warning(to_print) - msg = f'infoleak:automatic-detection="credential";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = 'infoleak:automatic-detection="credential"' + self.add_message_to_queue(message=tag, queue='Tags') site_occurrence = self.regex_findall(self.regex_site_for_stats, item.get_id(), item_content) diff --git a/bin/modules/CreditCards.py b/bin/modules/CreditCards.py index e29e42a1..1d8411bb 100755 --- a/bin/modules/CreditCards.py +++ b/bin/modules/CreditCards.py @@ -68,8 +68,8 @@ class CreditCards(AbstractModule): return extracted def compute(self, message, r_result=False): - item_id, score = message.split() - item = Item(item_id) + score = message + item = self.get_obj() content = item.get_content() all_cards = self.regex_findall(self.regex, item.id, content) @@ -90,8 +90,8 @@ class CreditCards(AbstractModule): print(mess) self.redis_logger.warning(mess) - msg = f'infoleak:automatic-detection="credit-card";{item.id}' - self.add_message_to_queue(msg, 'Tags') + tag = 'infoleak:automatic-detection="credit-card"' + self.add_message_to_queue(message=tag, queue='Tags') if r_result: return creditcard_set diff --git a/bin/modules/Cryptocurrencies.py b/bin/modules/Cryptocurrencies.py index 6197f8a1..fd5c5402 100755 --- a/bin/modules/Cryptocurrencies.py +++ b/bin/modules/Cryptocurrencies.py @@ -114,7 +114,7 @@ class Cryptocurrencies(AbstractModule, ABC): self.logger.info(f'Module {self.module_name} initialized') def compute(self, message): - item = Item(message) + item = self.get_obj() item_id = item.get_id() date = item.get_date() content = item.get_content() @@ -134,14 +134,14 @@ class Cryptocurrencies(AbstractModule, ABC): # Check private key if is_valid_address: - msg = f'{currency["tag"]};{item_id}' - self.add_message_to_queue(msg, 'Tags') + msg = f'{currency["tag"]}' + self.add_message_to_queue(message=msg, queue='Tags') if currency.get('private_key'): private_keys = self.regex_findall(currency['private_key']['regex'], item_id, content) if private_keys: - msg = f'{currency["private_key"]["tag"]};{item_id}' - self.add_message_to_queue(msg, 'Tags') + msg = f'{currency["private_key"]["tag"]}' + self.add_message_to_queue(message=msg, queue='Tags') # debug print(private_keys) diff --git a/bin/modules/CveModule.py b/bin/modules/CveModule.py index 8df75c6a..55fa0c91 100755 --- a/bin/modules/CveModule.py +++ b/bin/modules/CveModule.py @@ -44,9 +44,8 @@ class CveModule(AbstractModule): self.logger.info(f'Module {self.module_name} initialized') def compute(self, message): - - item_id, count = message.split() - item = Item(item_id) + count = message + item = self.get_obj() item_id = item.get_id() cves = self.regex_findall(self.reg_cve, item_id, item.get_content()) @@ -61,9 +60,9 @@ class CveModule(AbstractModule): print(warning) self.redis_logger.warning(warning) - msg = f'infoleak:automatic-detection="cve";{item_id}' + tag = 'infoleak:automatic-detection="cve"' # Send to Tags Queue - self.add_message_to_queue(msg, 'Tags') + self.add_message_to_queue(message=tag, queue='Tags') if __name__ == '__main__': diff --git a/bin/modules/Decoder.py b/bin/modules/Decoder.py index c7627480..a8ba07af 100755 --- a/bin/modules/Decoder.py +++ b/bin/modules/Decoder.py @@ -87,8 +87,7 @@ class Decoder(AbstractModule): self.logger.info(f'Module {self.module_name} initialized') def compute(self, message): - - item = Item(message) + item = self.get_obj() content = item.get_content() date = item.get_date() new_decodeds = [] @@ -129,8 +128,8 @@ class Decoder(AbstractModule): self.logger.info(f'{item.id} - {dname}') # Send to Tags - msg = f'infoleak:automatic-detection="{dname}";{item.id}' - self.add_message_to_queue(msg, 'Tags') + tag = f'infoleak:automatic-detection="{dname}"' + self.add_message_to_queue(message=tag, queue='Tags') #################### # TRACKERS DECODED diff --git a/bin/modules/DomClassifier.py b/bin/modules/DomClassifier.py index fd16939d..e7b77d26 100755 --- a/bin/modules/DomClassifier.py +++ b/bin/modules/DomClassifier.py @@ -51,9 +51,9 @@ class DomClassifier(AbstractModule): self.logger.info(f"Module: {self.module_name} Launched") def compute(self, message, r_result=False): - host, item_id = message.split() + host = message - item = Item(item_id) + item = self.get_obj() item_basename = item.get_basename() item_date = item.get_date() item_source = item.get_source() @@ -69,7 +69,7 @@ class DomClassifier(AbstractModule): if self.c.vdomain and d4.is_passive_dns_enabled(): for dns_record in self.c.vdomain: - self.add_message_to_queue(dns_record) + self.add_message_to_queue(obj=None, message=dns_record) localizeddomains = self.c.include(expression=self.cc_tld) if localizeddomains: diff --git a/bin/modules/Duplicates.py b/bin/modules/Duplicates.py index d0a39c0d..57641979 100755 --- a/bin/modules/Duplicates.py +++ b/bin/modules/Duplicates.py @@ -52,7 +52,7 @@ class Duplicates(AbstractModule): def compute(self, message): # IOError: "CRC Checksum Failed on : {id}" - item = Item(message) + item = self.get_obj() # Check file size if item.get_size() < self.min_item_size: diff --git a/bin/modules/Global.py b/bin/modules/Global.py index f24a8603..1c05fcf3 100755 --- a/bin/modules/Global.py +++ b/bin/modules/Global.py @@ -130,14 +130,14 @@ class Global(AbstractModule): update_obj_date(item.get_date(), 'item') - self.add_message_to_queue(item_id, 'Item') + self.add_message_to_queue(obj=item, queue='Item') self.processed_item += 1 # DIRTY FIX AIL SYNC - SEND TO SYNC MODULE # # FIXME: DIRTY FIX message = f'{item.get_type()};{item.get_subtype(r_str=True)};{item.get_id()}' print(message) - self.add_message_to_queue(message, 'Sync') + self.add_message_to_queue(obj=item, queue='Sync') print(item_id) if r_result: diff --git a/bin/modules/Hosts.py b/bin/modules/Hosts.py index 37f278a5..fce5595e 100755 --- a/bin/modules/Hosts.py +++ b/bin/modules/Hosts.py @@ -49,7 +49,7 @@ class Hosts(AbstractModule): self.logger.info(f"Module: {self.module_name} Launched") def compute(self, message): - item = Item(message) + item = self.get_obj() # mimetype = item_basic.get_item_mimetype(item.get_id()) # if mimetype.split('/')[0] == "text": @@ -60,9 +60,7 @@ class Hosts(AbstractModule): print(f'{len(hosts)} host {item.get_id()}') for host in hosts: # print(host) - - msg = f'{host} {item.get_id()}' - self.add_message_to_queue(msg, 'Host') + self.add_message_to_queue(message=str(host), queue='Host') if __name__ == '__main__': diff --git a/bin/modules/IPAddress.py b/bin/modules/IPAddress.py index 1a5d53c7..31a0ff68 100755 --- a/bin/modules/IPAddress.py +++ b/bin/modules/IPAddress.py @@ -66,7 +66,7 @@ class IPAddress(AbstractModule): if not self.ip_networks: return None - item = Item(message) + item = self.get_obj() content = item.get_content() # list of the regex results in the Item @@ -86,8 +86,8 @@ class IPAddress(AbstractModule): self.redis_logger.warning(f'{item.get_id()} contains {item.get_id()} IPs') # Tag message with IP - msg = f'infoleak:automatic-detection="ip";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = 'infoleak:automatic-detection="ip"' + self.add_message_to_queue(message=tag, queue='Tags') if __name__ == "__main__": diff --git a/bin/modules/Iban.py b/bin/modules/Iban.py index 091a85a5..aa1aa5d6 100755 --- a/bin/modules/Iban.py +++ b/bin/modules/Iban.py @@ -73,7 +73,7 @@ class Iban(AbstractModule): return extracted def compute(self, message): - item = Item(message) + item = self.get_obj() item_id = item.get_id() ibans = self.regex_findall(self.iban_regex, item_id, item.get_content()) @@ -97,8 +97,8 @@ class Iban(AbstractModule): to_print = f'Iban;{item.get_source()};{item.get_date()};{item.get_basename()};' self.redis_logger.warning(f'{to_print}Checked found {len(valid_ibans)} IBAN;{item_id}') # Tags - msg = f'infoleak:automatic-detection="iban";{item_id}' - self.add_message_to_queue(msg, 'Tags') + tag = 'infoleak:automatic-detection="iban"' + self.add_message_to_queue(message=tag, queue='Tags') if __name__ == '__main__': diff --git a/bin/modules/Indexer.py b/bin/modules/Indexer.py index 03fededa..48378bdd 100755 --- a/bin/modules/Indexer.py +++ b/bin/modules/Indexer.py @@ -93,12 +93,12 @@ class Indexer(AbstractModule): self.last_refresh = time_now def compute(self, message): - docpath = message.split(" ", -1)[-1] - - item = Item(message) + item = self.get_obj() item_id = item.get_id() item_content = item.get_content() + docpath = item_id + self.logger.debug(f"Indexing - {self.indexname}: {docpath}") print(f"Indexing - {self.indexname}: {docpath}") diff --git a/bin/modules/Keys.py b/bin/modules/Keys.py index f86b6223..e14523cf 100755 --- a/bin/modules/Keys.py +++ b/bin/modules/Keys.py @@ -56,7 +56,7 @@ class Keys(AbstractModule): self.pending_seconds = 1 def compute(self, message): - item = Item(message) + item = self.get_obj() content = item.get_content() # find = False @@ -65,107 +65,107 @@ class Keys(AbstractModule): if KeyEnum.PGP_MESSAGE.value in content: self.redis_logger.warning(f'{item.get_basename()} has a PGP enc message') - msg = f'infoleak:automatic-detection="pgp-message";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = 'infoleak:automatic-detection="pgp-message"' + self.add_message_to_queue(message=tag, queue='Tags') get_pgp_content = True # find = True if KeyEnum.PGP_PUBLIC_KEY_BLOCK.value in content: - msg = f'infoleak:automatic-detection="pgp-public-key-block";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = f'infoleak:automatic-detection="pgp-public-key-block";{item.get_id()}' + self.add_message_to_queue(message=tag, queue='Tags') get_pgp_content = True if KeyEnum.PGP_SIGNATURE.value in content: - msg = f'infoleak:automatic-detection="pgp-signature";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = f'infoleak:automatic-detection="pgp-signature";{item.get_id()}' + self.add_message_to_queue(message=tag, queue='Tags') get_pgp_content = True if KeyEnum.PGP_PRIVATE_KEY_BLOCK.value in content: self.redis_logger.warning(f'{item.get_basename()} has a pgp private key block message') - msg = f'infoleak:automatic-detection="pgp-private-key";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = f'infoleak:automatic-detection="pgp-private-key";{item.get_id()}' + self.add_message_to_queue(message=tag, queue='Tags') get_pgp_content = True if KeyEnum.CERTIFICATE.value in content: self.redis_logger.warning(f'{item.get_basename()} has a certificate message') - msg = f'infoleak:automatic-detection="certificate";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = f'infoleak:automatic-detection="certificate";{item.get_id()}' + self.add_message_to_queue(message=tag, queue='Tags') # find = True if KeyEnum.RSA_PRIVATE_KEY.value in content: self.redis_logger.warning(f'{item.get_basename()} has a RSA private key message') print('rsa private key message found') - msg = f'infoleak:automatic-detection="rsa-private-key";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = f'infoleak:automatic-detection="rsa-private-key";{item.get_id()}' + self.add_message_to_queue(message=tag, queue='Tags') # find = True if KeyEnum.PRIVATE_KEY.value in content: self.redis_logger.warning(f'{item.get_basename()} has a private key message') print('private key message found') - msg = f'infoleak:automatic-detection="private-key";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = f'infoleak:automatic-detection="private-key";{item.get_id()}' + self.add_message_to_queue(message=tag, queue='Tags') # find = True if KeyEnum.ENCRYPTED_PRIVATE_KEY.value in content: self.redis_logger.warning(f'{item.get_basename()} has an encrypted private key message') print('encrypted private key message found') - msg = f'infoleak:automatic-detection="encrypted-private-key";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = f'infoleak:automatic-detection="encrypted-private-key";{item.get_id()}' + self.add_message_to_queue(message=tag, queue='Tags') # find = True if KeyEnum.OPENSSH_PRIVATE_KEY.value in content: self.redis_logger.warning(f'{item.get_basename()} has an openssh private key message') print('openssh private key message found') - msg = f'infoleak:automatic-detection="private-ssh-key";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = f'infoleak:automatic-detection="private-ssh-key";{item.get_id()}' + self.add_message_to_queue(message=tag, queue='Tags') # find = True if KeyEnum.SSH2_ENCRYPTED_PRIVATE_KEY.value in content: self.redis_logger.warning(f'{item.get_basename()} has an ssh2 private key message') print('SSH2 private key message found') - msg = f'infoleak:automatic-detection="private-ssh-key";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = f'infoleak:automatic-detection="private-ssh-key";{item.get_id()}' + self.add_message_to_queue(message=tag, queue='Tags') # find = True if KeyEnum.OPENVPN_STATIC_KEY_V1.value in content: self.redis_logger.warning(f'{item.get_basename()} has an openssh private key message') print('OpenVPN Static key message found') - msg = f'infoleak:automatic-detection="vpn-static-key";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = f'infoleak:automatic-detection="vpn-static-key";{item.get_id()}' + self.add_message_to_queue(message=tag, queue='Tags') # find = True if KeyEnum.DSA_PRIVATE_KEY.value in content: self.redis_logger.warning(f'{item.get_basename()} has a dsa private key message') - msg = f'infoleak:automatic-detection="dsa-private-key";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = f'infoleak:automatic-detection="dsa-private-key";{item.get_id()}' + self.add_message_to_queue(message=tag, queue='Tags') # find = True if KeyEnum.EC_PRIVATE_KEY.value in content: self.redis_logger.warning(f'{item.get_basename()} has an ec private key message') - msg = f'infoleak:automatic-detection="ec-private-key";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = f'infoleak:automatic-detection="ec-private-key";{item.get_id()}' + self.add_message_to_queue(message=tag, queue='Tags') # find = True if KeyEnum.PUBLIC_KEY.value in content: self.redis_logger.warning(f'{item.get_basename()} has a public key message') - msg = f'infoleak:automatic-detection="public-key";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = f'infoleak:automatic-detection="public-key";{item.get_id()}' + self.add_message_to_queue(message=tag, queue='Tags') # find = True # pgp content if get_pgp_content: - self.add_message_to_queue(item.get_id(), 'PgpDump') + self.add_message_to_queue(queue='PgpDump') # if find : # # Send to duplicate diff --git a/bin/modules/Languages.py b/bin/modules/Languages.py index 4924b379..b0b87230 100755 --- a/bin/modules/Languages.py +++ b/bin/modules/Languages.py @@ -25,7 +25,7 @@ class Languages(AbstractModule): self.logger.info(f'Module {self.module_name} initialized') def compute(self, message): - item = Item(message) + item = self.get_obj() if item.is_crawled(): domain = Domain(item.get_domain()) for lang in item.get_languages(min_probability=0.8): diff --git a/bin/modules/LibInjection.py b/bin/modules/LibInjection.py index 9f357441..eb1174f6 100755 --- a/bin/modules/LibInjection.py +++ b/bin/modules/LibInjection.py @@ -40,7 +40,8 @@ class LibInjection(AbstractModule): self.redis_logger.info(f"Module: {self.module_name} Launched") def compute(self, message): - url, item_id = message.split() + item = self.get_obj() + url = message self.faup.decode(url) url_parsed = self.faup.get() @@ -68,7 +69,6 @@ class LibInjection(AbstractModule): # print(f'query is sqli : {result_query}') if result_path['sqli'] is True or result_query['sqli'] is True: - item = Item(item_id) item_id = item.get_id() print(f"Detected (libinjection) SQL in URL: {item_id}") print(unquote(url)) @@ -77,8 +77,8 @@ class LibInjection(AbstractModule): self.redis_logger.warning(to_print) # Add tag - msg = f'infoleak:automatic-detection="sql-injection";{item_id}' - self.add_message_to_queue(msg, 'Tags') + tag = 'infoleak:automatic-detection="sql-injection"' + self.add_message_to_queue(message=tag, queue='Tags') # statistics # # # TODO: # FIXME: remove me diff --git a/bin/modules/MISP_Thehive_Auto_Push.py b/bin/modules/MISP_Thehive_Auto_Push.py index a9b27db9..9747a69b 100755 --- a/bin/modules/MISP_Thehive_Auto_Push.py +++ b/bin/modules/MISP_Thehive_Auto_Push.py @@ -45,8 +45,9 @@ class MISP_Thehive_Auto_Push(AbstractModule): self.last_refresh = time.time() self.redis_logger.info('Tags Auto Push refreshed') - item_id, tag = message.split(';', 1) - item = Item(item_id) + tag = message + item = self.get_obj() + item_id = item.get_id() # enabled if 'misp' in self.tags: diff --git a/bin/modules/Mail.py b/bin/modules/Mail.py index 7eb25084..bbdbcfce 100755 --- a/bin/modules/Mail.py +++ b/bin/modules/Mail.py @@ -135,8 +135,8 @@ class Mail(AbstractModule): # # TODO: sanitize mails def compute(self, message): - item_id, score = message.split() - item = Item(item_id) + score = message + item = self.get_obj() item_date = item.get_date() mails = self.regex_findall(self.email_regex, item_id, item.get_content()) @@ -177,8 +177,8 @@ class Mail(AbstractModule): print(f'{item_id} Checked {num_valid_email} e-mail(s)') self.redis_logger.warning(msg) # Tags - msg = f'infoleak:automatic-detection="mail";{item_id}' - self.add_message_to_queue(msg, 'Tags') + tag = 'infoleak:automatic-detection="mail"' + self.add_message_to_queue(message=tag, queue='Tags') elif num_valid_email > 0: self.redis_logger.info(msg) diff --git a/bin/modules/Mixer.py b/bin/modules/Mixer.py index 49cd3046..b8f2bedf 100755 --- a/bin/modules/Mixer.py +++ b/bin/modules/Mixer.py @@ -173,7 +173,7 @@ class Mixer(AbstractModule): self.r_cache.expire(digest, self.ttl_key) self.increase_stat_processed(feeder_name) - self.add_message_to_queue(relay_message) + self.add_message_to_queue(message=relay_message) # Need To Be Fixed, Currently doesn't check the source (-> same as operation 1) # # Keep duplicate coming from different sources diff --git a/bin/modules/Onion.py b/bin/modules/Onion.py index 5e76d0fa..2066e9a3 100755 --- a/bin/modules/Onion.py +++ b/bin/modules/Onion.py @@ -69,8 +69,8 @@ class Onion(AbstractModule): onion_urls = [] domains = [] - item_id, score = message.split() - item = Item(item_id) + score = message + item = self.get_obj() item_content = item.get_content() # max execution time on regex @@ -100,8 +100,8 @@ class Onion(AbstractModule): self.redis_logger.warning(f'{to_print}Detected {len(domains)} .onion(s);{item.get_id()}') # TAG Item - msg = f'infoleak:automatic-detection="onion";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = 'infoleak:automatic-detection="onion"' + self.add_message_to_queue(message=tag, queue='Tags') if __name__ == "__main__": diff --git a/bin/modules/PgpDump.py b/bin/modules/PgpDump.py index 0647e897..1e8a27a7 100755 --- a/bin/modules/PgpDump.py +++ b/bin/modules/PgpDump.py @@ -171,7 +171,7 @@ class PgpDump(AbstractModule): print('symmetrically encrypted') def compute(self, message): - item = Item(message) + item = self.get_obj() self.item_id = item.get_id() content = item.get_content() @@ -234,8 +234,8 @@ class PgpDump(AbstractModule): print(f' private key: {key}') if self.symmetrically_encrypted: - msg = f'infoleak:automatic-detection="pgp-symmetric";{self.item_id}' - self.add_message_to_queue(msg, 'Tags') + tag = 'infoleak:automatic-detection="pgp-symmetric"' + self.add_message_to_queue(message=tag, queue='Tags') if __name__ == '__main__': diff --git a/bin/modules/Phone.py b/bin/modules/Phone.py index 16af8303..0c01a40f 100755 --- a/bin/modules/Phone.py +++ b/bin/modules/Phone.py @@ -49,7 +49,7 @@ class Phone(AbstractModule): return extracted def compute(self, message): - item = Item(message) + item = self.get_obj() content = item.get_content() # TODO use language detection to choose the country code ? @@ -59,8 +59,8 @@ class Phone(AbstractModule): if results: # TAGS - msg = f'infoleak:automatic-detection="phone-number";{item.get_id()}' - self.add_message_to_queue(msg, 'Tags') + tag = 'infoleak:automatic-detection="phone-number"' + self.add_message_to_queue(message=tag, queue='Tags') self.redis_logger.warning(f'{item.get_id()} contains {len(phone)} Phone numbers') diff --git a/bin/modules/SQLInjectionDetection.py b/bin/modules/SQLInjectionDetection.py index ed6dc89d..dad6a15c 100755 --- a/bin/modules/SQLInjectionDetection.py +++ b/bin/modules/SQLInjectionDetection.py @@ -44,22 +44,21 @@ class SQLInjectionDetection(AbstractModule): self.logger.info(f"Module: {self.module_name} Launched") def compute(self, message): - url, item_id = message.split() + url = message + item = self.get_obj() if self.is_sql_injection(url): self.faup.decode(url) url_parsed = self.faup.get() - item = Item(item_id) - item_id = item.get_id() print(f"Detected SQL in URL: {item_id}") print(urllib.request.unquote(url)) to_print = f'SQLInjection;{item.get_source()};{item.get_date()};{item.get_basename()};Detected SQL in URL;{item_id}' self.redis_logger.warning(to_print) # Tag - msg = f'infoleak:automatic-detection="sql-injection";{item_id}' - self.add_message_to_queue(msg, 'Tags') + tag = f'infoleak:automatic-detection="sql-injection";{item_id}' + self.add_message_to_queue(message=tag, queue='Tags') # statistics # tld = url_parsed['tld'] diff --git a/bin/modules/SubmitPaste.py b/bin/modules/SubmitPaste.py index 782323ef..740090ea 100755 --- a/bin/modules/SubmitPaste.py +++ b/bin/modules/SubmitPaste.py @@ -16,8 +16,6 @@ import gzip import base64 import datetime import time -# from sflock.main import unpack -# import sflock sys.path.append(os.environ['AIL_BIN']) ################################## @@ -48,7 +46,6 @@ class SubmitPaste(AbstractModule): """ super(SubmitPaste, self).__init__() - # TODO KVROCKS self.r_serv_db = ConfigLoader.ConfigLoader().get_db_conn("Kvrocks_DB") self.r_serv_log_submit = ConfigLoader.ConfigLoader().get_redis_conn("Redis_Log_submit") @@ -281,7 +278,7 @@ class SubmitPaste(AbstractModule): # send paste to Global module relay_message = f"submitted {rel_item_path} {gzip64encoded}" - self.add_message_to_queue(relay_message) + self.add_message_to_queue(message=relay_message) # add tags for tag in ltags: diff --git a/bin/modules/Tags.py b/bin/modules/Tags.py index 6c2fb0fb..760cb138 100755 --- a/bin/modules/Tags.py +++ b/bin/modules/Tags.py @@ -20,9 +20,6 @@ sys.path.append(os.environ['AIL_BIN']) # Import Project packages ################################## from modules.abstract_module import AbstractModule -from lib.objects.Items import Item -from lib import Tag - class Tags(AbstractModule): """ @@ -39,25 +36,17 @@ class Tags(AbstractModule): self.logger.info(f'Module {self.module_name} initialized') def compute(self, message): - # Extract item ID and tag from message - mess_split = message.split(';') - if len(mess_split) == 2: - tag = mess_split[0] - item = Item(mess_split[1]) + item = self.obj + tag = message - # Create a new tag - Tag.add_object_tag(tag, 'item', item.get_id()) - print(f'{item.get_id()}: Tagged {tag}') + # Create a new tag + item.add_tag(tag) + print(f'{item.get_id()}: Tagged {tag}') - # Forward message to channel - self.add_message_to_queue(message, 'Tag_feed') + # Forward message to channel + self.add_message_to_queue(message=tag, queue='Tag_feed') - message = f'{item.get_type()};{item.get_subtype(r_str=True)};{item.get_id()}' - self.add_message_to_queue(message, 'Sync') - - else: - # Malformed message - raise Exception(f'too many values to unpack (expected 2) given {len(mess_split)} with message {message}') + self.add_message_to_queue(queue='Sync') if __name__ == '__main__': diff --git a/bin/modules/Telegram.py b/bin/modules/Telegram.py index 31d90878..273d20b9 100755 --- a/bin/modules/Telegram.py +++ b/bin/modules/Telegram.py @@ -41,7 +41,7 @@ class Telegram(AbstractModule): self.logger.info(f"Module {self.module_name} initialized") def compute(self, message, r_result=False): - item = Item(message) + item = self.get_obj() item_content = item.get_content() item_date = item.get_date() @@ -86,8 +86,8 @@ class Telegram(AbstractModule): # CREATE TAG if invite_code_found: # tags - msg = f'infoleak:automatic-detection="telegram-invite-hash";{item.id}' - self.add_message_to_queue(msg, 'Tags') + tag = 'infoleak:automatic-detection="telegram-invite-hash"' + self.add_message_to_queue(message=tag, queue='Tags') if __name__ == "__main__": diff --git a/bin/modules/Tools.py b/bin/modules/Tools.py index 06b2d53a..1a9025c8 100755 --- a/bin/modules/Tools.py +++ b/bin/modules/Tools.py @@ -416,7 +416,7 @@ class Tools(AbstractModule): return extracted def compute(self, message): - item = Item(message) + item = self.get_obj() content = item.get_content() for tool_name in TOOLS: @@ -425,8 +425,8 @@ class Tools(AbstractModule): if match: print(f'{item.id} found: {tool_name}') # Tag Item - msg = f"{tool['tag']};{item.id}" - self.add_message_to_queue(msg, 'Tags') + tag = tool['tag'] + self.add_message_to_queue(message=tag, queue='Tags') # TODO ADD LOGS diff --git a/bin/modules/Urls.py b/bin/modules/Urls.py index 2dc97226..c5a3dfe4 100755 --- a/bin/modules/Urls.py +++ b/bin/modules/Urls.py @@ -62,10 +62,9 @@ class Urls(AbstractModule): """ Search for Web links from given message """ - # Extract item - item_id, score = message.split() + score = message - item = Item(item_id) + item = self.get_obj() item_content = item.get_content() # TODO Handle invalid URL @@ -79,10 +78,9 @@ class Urls(AbstractModule): except AttributeError: url = url_decoded['url'] - to_send = f"{url} {item.get_id()}" - print(to_send) - self.add_message_to_queue(to_send, 'Url') - self.logger.debug(f"url_parsed: {to_send}") + print(url, item.get_id()) + self.add_message_to_queue(message=str(url), queue='Url') + self.logger.debug(f"url_parsed: {url}") if len(l_urls) > 0: to_print = f'Urls;{item.get_source()};{item.get_date()};{item.get_basename()};' diff --git a/bin/modules/Zerobins.py b/bin/modules/Zerobins.py index f3fcea5a..f81bf9f8 100755 --- a/bin/modules/Zerobins.py +++ b/bin/modules/Zerobins.py @@ -51,10 +51,11 @@ class Zerobins(AbstractModule): """ Compute a message in queue """ - url, item_id = message.split() + url = message + item = self.get_obj() # Extract zerobins addresses - matching_binz = self.regex_findall(self.regex, item_id, url) + matching_binz = self.regex_findall(self.regex, item.get_id(), url) if len(matching_binz) > 0: for bin_url in matching_binz: diff --git a/bin/modules/abstract_module.py b/bin/modules/abstract_module.py index 0a1a12cd..05e253d5 100644 --- a/bin/modules/abstract_module.py +++ b/bin/modules/abstract_module.py @@ -23,6 +23,7 @@ from lib import ail_logger from lib.ail_queues import AILQueue from lib import regex_helper from lib.exceptions import ModuleQueueError +from lib.objects.ail_objects import get_obj_from_global_id logging.config.dictConfig(ail_logger.get_config(name='modules')) @@ -47,6 +48,8 @@ class AbstractModule(ABC): # Setup the I/O queues if queue: self.queue = AILQueue(self.module_name, self.pid) + self.obj = None + self.sha256_mess = None # Init Redis Logger self.redis_logger = publisher @@ -70,24 +73,45 @@ class AbstractModule(ABC): # Debug Mode self.debug = False + def get_obj(self): + return self.obj + def get_message(self): """ Get message from the Redis Queue (QueueIn) Input message can change between modules ex: '' """ - return self.queue.get_message() + message = self.queue.get_message() + if message: + obj_global_id, sha256_mess, mess = message + if obj_global_id: + self.sha256_mess = sha256_mess + self.obj = get_obj_from_global_id(obj_global_id) + else: + self.sha256_mess = None + self.obj = None + return mess + self.sha256_mess = None + self.obj = None + return None - def add_message_to_queue(self, message, queue_name=None): + def add_message_to_queue(self, message='', obj=None, queue=None): """ Add message to queue + :param obj: AILObject :param message: message to send in queue - :param queue_name: queue or module name + :param queue: queue name or module name ex: add_message_to_queue(item_id, 'Mail') """ - self.queue.send_message(message, queue_name) - # add to new set_module + if obj: + obj_global_id = obj.get_global_id() + elif self.obj: + obj_global_id = self.obj.get_global_id() + else: + obj_global_id = '::' + self.queue.send_message(obj_global_id, message, queue) def get_available_queues(self): return self.queue.get_out_queues() @@ -130,7 +154,7 @@ class AbstractModule(ABC): # Get one message (ex:item id) from the Redis Queue (QueueIn) message = self.get_message() - if message: + if message or self.obj: try: # Module processing with the message from the queue self.compute(message) @@ -152,6 +176,11 @@ class AbstractModule(ABC): # remove from set_module ## check if item process == completed + if self.obj: + self.queue.end_message(self.obj.get_global_id(), self.sha256_mess) + self.obj = None + self.sha256_mess = None + else: self.computeNone() # Wait before next process diff --git a/bin/trackers/Tracker_Regex.py b/bin/trackers/Tracker_Regex.py index 5cc06410..bf756f49 100755 --- a/bin/trackers/Tracker_Regex.py +++ b/bin/trackers/Tracker_Regex.py @@ -47,7 +47,7 @@ class Tracker_Regex(AbstractModule): self.redis_logger.info(f"Module: {self.module_name} Launched") - def compute(self, obj_id, obj_type='item', subtype=''): + def compute(self, message): # refresh Tracked regex if self.last_refresh < Tracker.get_tracker_last_updated_by_type('regex'): self.tracked_regexs = Tracker.get_tracked_regexs() @@ -55,7 +55,7 @@ class Tracker_Regex(AbstractModule): self.redis_logger.debug('Tracked regex refreshed') print('Tracked regex refreshed') - obj = ail_objects.get_object(obj_type, subtype, obj_id) + obj = self.get_obj() obj_id = obj.get_id() obj_type = obj.get_type() @@ -87,8 +87,7 @@ class Tracker_Regex(AbstractModule): for tag in tracker.get_tags(): if obj.get_type() == 'item': - msg = f'{tag};{obj_id}' - self.add_message_to_queue(msg, 'Tags') + self.add_message_to_queue(message=tag, queue='Tags') else: obj.add_tag(tag) diff --git a/bin/trackers/Tracker_Term.py b/bin/trackers/Tracker_Term.py index 3e0ba43b..a3067674 100755 --- a/bin/trackers/Tracker_Term.py +++ b/bin/trackers/Tracker_Term.py @@ -64,7 +64,7 @@ class Tracker_Term(AbstractModule): self.redis_logger.info(f"Module: {self.module_name} Launched") - def compute(self, obj_id, obj_type='item', subtype=''): + def compute(self, message): # refresh Tracked term if self.last_refresh_word < Tracker.get_tracker_last_updated_by_type('word'): self.tracked_words = Tracker.get_tracked_words() @@ -78,7 +78,7 @@ class Tracker_Term(AbstractModule): self.redis_logger.debug('Tracked set refreshed') print('Tracked set refreshed') - obj = ail_objects.get_object(obj_type, subtype, obj_id) + obj = self.get_obj() obj_type = obj.get_type() # Object Filter @@ -132,8 +132,7 @@ class Tracker_Term(AbstractModule): # Tags for tag in tracker.get_tags(): if obj.get_type() == 'item': - msg = f'{tag};{obj_id}' - self.add_message_to_queue(msg, 'Tags') + self.add_message_to_queue(message=tag, queue='Tags') else: obj.add_tag(tag) diff --git a/bin/trackers/Tracker_Typo_Squatting.py b/bin/trackers/Tracker_Typo_Squatting.py index 99cacee3..9e093b3e 100755 --- a/bin/trackers/Tracker_Typo_Squatting.py +++ b/bin/trackers/Tracker_Typo_Squatting.py @@ -45,7 +45,7 @@ class Tracker_Typo_Squatting(AbstractModule): self.redis_logger.info(f"Module: {self.module_name} Launched") - def compute(self, message, obj_type='item', subtype=''): + def compute(self, message): # refresh Tracked typo if self.last_refresh_typosquatting < Tracker.get_tracker_last_updated_by_type('typosquatting'): self.tracked_typosquattings = Tracker.get_tracked_typosquatting() @@ -53,8 +53,8 @@ class Tracker_Typo_Squatting(AbstractModule): self.redis_logger.debug('Tracked typosquatting refreshed') print('Tracked typosquatting refreshed') - host, obj_id = message.split() - obj = ail_objects.get_object(obj_type, subtype, obj_id) + host = message + obj = self.get_obj() obj_type = obj.get_type() # Object Filter @@ -84,7 +84,7 @@ class Tracker_Typo_Squatting(AbstractModule): for tag in tracker.get_tags(): if obj.get_type() == 'item': msg = f'{tag};{obj_id}' - self.add_message_to_queue(msg, 'Tags') + self.add_message_to_queue(message=tag, queue='Tags') else: obj.add_tag(tag) diff --git a/bin/trackers/Tracker_Yara.py b/bin/trackers/Tracker_Yara.py index fab397d1..7bf13dfd 100755 --- a/bin/trackers/Tracker_Yara.py +++ b/bin/trackers/Tracker_Yara.py @@ -46,7 +46,7 @@ class Tracker_Yara(AbstractModule): self.redis_logger.info(f"Module: {self.module_name} Launched") - def compute(self, obj_id, obj_type='item', subtype=''): + def compute(self, message): # refresh YARA list if self.last_refresh < Tracker.get_tracker_last_updated_by_type('yara'): self.rules = Tracker.get_tracked_yara_rules() @@ -54,7 +54,7 @@ class Tracker_Yara(AbstractModule): self.redis_logger.debug('Tracked set refreshed') print('Tracked set refreshed') - self.obj = ail_objects.get_object(obj_type, subtype, obj_id) + self.obj = self.get_obj() obj_type = self.obj.get_type() # Object Filter @@ -89,8 +89,7 @@ class Tracker_Yara(AbstractModule): # Tags for tag in tracker.get_tags(): if self.obj.get_type() == 'item': - msg = f'{tag};{obj_id}' - self.add_message_to_queue(msg, 'Tags') + self.add_message_to_queue(message=tag, queue='Tags') else: self.obj.add_tag(tag)