From 0a525f369e2cbd46b6e6ba7b8d538d98eac2882c Mon Sep 17 00:00:00 2001 From: Terrtia Date: Mon, 11 May 2020 11:11:05 +0200 Subject: [PATCH] fix: [Mails] refactor Mail module --- bin/Mail.py | 250 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 153 insertions(+), 97 deletions(-) diff --git a/bin/Mail.py b/bin/Mail.py index 634931c9..d25b1b42 100755 --- a/bin/Mail.py +++ b/bin/Mail.py @@ -2,12 +2,12 @@ # -*-coding:UTF-8 -* """ -The Mail Module +The Mails Module ====================== This module is consuming the Redis-list created by the Categ module. -It apply mail regexes on paste content and warn if above a threshold. +It apply mail regexes on item content and warn if above a threshold. """ @@ -17,18 +17,16 @@ import sys import redis import time import datetime + +import dns.resolver import dns.exception -from packages import Paste -from packages import lib_refine + from pubsublogger import publisher +from Helper import Process from pyfaup.faup import Faup -from Helper import Process - -sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages')) -import Item - +## REGEX TIMEOUT ## import signal class TimeoutException(Exception): @@ -39,6 +37,87 @@ def timeout_handler(signum, frame): signal.signal(signal.SIGALRM, timeout_handler) max_execution_time = 30 +## -- ## + +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages')) +import Item + +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) +import ConfigLoader + +## LOAD CONFIG ## +config_loader = ConfigLoader.ConfigLoader() +server_statistics = config_loader.get_redis_conn("ARDB_Statistics") +r_serv_cache = config_loader.get_redis_conn("Redis_Cache") + +dns_server = config_loader.get_config_str('Mail', 'dns') + +config_loader = None +## -- ## +def is_mxdomain_in_cache(mxdomain): + return r_serv_cache.exists('mxdomain:{}'.format(mxdomain)) + +def save_mxdomain_in_cache(mxdomain): + r_serv_cache.setex(mxdomain, 1, datetime.timedelta(days=1)) + +def check_mx_record(set_mxdomains, dns_server): + """Check if emails MX domains are responding. + + :param adress_set: -- (set) This is a set of emails domains + :return: (int) Number of adress with a responding and valid MX domains + + """ + resolver = dns.resolver.Resolver() + resolver.nameservers = [dns_server] + resolver.timeout = 5.0 + resolver.lifetime = 2.0 + + valid_mxdomain = [] + for mxdomain in set_mxdomains: + + # check if is in cache + # # TODO: + if is_mxdomain_in_cache(mxdomain): + valid_mxdomain.append(mxdomain) + else: + + # DNS resolution + try: + answers = resolver.query(mxdomain, rdtype=dns.rdatatype.MX) + if answers: + save_mxdomain_in_cache(mxdomain) + valid_mxdomain.append(mxdomain) + # DEBUG + # print('---') + # print(answers.response) + # print(answers.qname) + # print(answers.rdtype) + # print(answers.rdclass) + # print(answers.nameserver) + # print() + + except dns.resolver.NoNameservers: + publisher.debug('NoNameserver, No non-broken nameservers are available to answer the query.') + print('NoNameserver, No non-broken nameservers are available to answer the query.') + except dns.resolver.NoAnswer: + publisher.debug('NoAnswer, The response did not contain an answer to the question.') + print('NoAnswer, The response did not contain an answer to the question.') + except dns.name.EmptyLabel: + publisher.debug('SyntaxError: EmptyLabel') + print('SyntaxError: EmptyLabel') + except dns.resolver.NXDOMAIN: + #save_mxdomain_in_cache(mxdomain) + publisher.debug('The query name does not exist.') + print('The query name does not exist.') + except dns.name.LabelTooLong: + publisher.debug('The Label is too long') + print('The Label is too long') + except dns.exception.Timeout: + print('dns timeout') + #save_mxdomain_in_cache(mxdomain) + except Exception as e: + print(e) + return valid_mxdomain if __name__ == "__main__": publisher.port = 6380 @@ -49,115 +128,92 @@ if __name__ == "__main__": faup = Faup() p = Process(config_section) - addr_dns = p.config.get("Mail", "dns") - # REDIS # - r_serv_cache = redis.StrictRedis( - host=p.config.get("Redis_Cache", "host"), - port=p.config.getint("Redis_Cache", "port"), - db=p.config.getint("Redis_Cache", "db"), - decode_responses=True) - # ARDB # - server_statistics = redis.StrictRedis( - host=p.config.get("ARDB_Statistics", "host"), - port=p.config.getint("ARDB_Statistics", "port"), - db=p.config.getint("ARDB_Statistics", "db"), - decode_responses=True) + publisher.info("Mails module started") - # FUNCTIONS # - publisher.info("Suscribed to channel mails_categ") + # Numbers of Mails needed to Tags + mail_threshold = 10 - # FIXME For retro compatibility - channel = 'mails_categ' - prec_item_id = None - - # Log as critical if there are more that that amout of valid emails - is_critical = 10 - - max_execution_time = 30 email_regex = "[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}" - MX_values = None + while True: - message = p.get_from_set() + #message = p.get_from_set() + message = 'archive/pastebin.com_pro/2020/02/07/Fnz1wUim.gz 3' if message is not None: item_id, score = message.split() - if prec_item_id is None or item_id != prec_item_id: - PST = Paste.Paste(item_id) + item_content = Item.get_item_content(item_id) + item_date = Item.get_item_date(item_id) - # max execution time on regex - signal.alarm(max_execution_time) - try: - l_mails = re.findall(email_regex, Item.get_item_content(item_id)) - except TimeoutException: - p.incr_module_timeout_statistic() # add encoder type - err_mess = "Mail: processing timeout: {}".format(item_id) - print(err_mess) - publisher.info(err_mess) - continue - else: - signal.alarm(0) + # Get all emails address + signal.alarm(max_execution_time) + try: + all_emails = re.findall(email_regex, item_content) + except TimeoutException: + p.incr_module_timeout_statistic() + err_mess = "Mails: processing timeout: {}".format(item_id) + print(err_mess) + publisher.info(err_mess) + continue + else: + signal.alarm(0) - l_mails = list(set(l_mails)) + # filtering duplicate + all_emails = set(all_emails) - # max execution time on regex - signal.alarm(max_execution_time) - try: - # Transforming the set into a string - MXdomains = re.findall("@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,20}", str(l_mails).lower()) - except TimeoutException: - p.incr_module_timeout_statistic() # add encoder type - err_mess = "Mail: processing timeout: {}".format(item_id) - print(err_mess) - publisher.info(err_mess) - continue - else: - signal.alarm(0) + # get MXdomains + set_mxdomains = set() + dict_mxdomains_email = {} + for email in all_emails: + mxdomain = email.split('@')[1].lower() + if not mxdomain in dict_mxdomains_email: + dict_mxdomains_email[mxdomain] = [] + set_mxdomains.add(mxdomain) + dict_mxdomains_email[mxdomain].append(email) - MX_values = lib_refine.checking_MX_record(r_serv_cache, MXdomains, addr_dns) + ## TODO: add MAIL trackers - if MX_values[0] >= 1: + print(all_emails) + print() - PST.__setattr__(channel, MX_values) - PST.save_attribute_redis(channel, (MX_values[0], - list(MX_values[1]))) + valid_mx = check_mx_record(set_mxdomains, dns_server) + print(valid_mx) - to_print = 'Mails;{};{};{};Checked {} e-mail(s);{}'.\ - format(PST.p_source, PST.p_date, PST.p_name, - MX_values[0], PST.p_rel_path) - if MX_values[0] > is_critical: - publisher.warning(to_print) - #Send to duplicate - p.populate_set_out(item_id, 'Duplicate') + num_valid_email = 0 + for domain_mx in valid_mx: + num_valid_email += len(dict_mxdomains_email[domain_mx]) - msg = 'infoleak:automatic-detection="mail";{}'.format(item_id) - p.populate_set_out(msg, 'Tags') + for email in dict_mxdomains_email[domain_mx]: + msg = 'mail;{};{};{}'.format(1, email, item_date) + p.populate_set_out(msg, 'ModuleStats') - #create country statistics - date = datetime.datetime.now().strftime("%Y%m") - for mail in MX_values[1]: - print('mail;{};{};{}'.format(MX_values[1][mail], mail, PST.p_date)) - p.populate_set_out('mail;{};{};{}'.format(MX_values[1][mail], mail, PST.p_date), 'ModuleStats') + # Create country stats + faup.decode(email) + tld = faup.get()['tld'] + try: + tld = tld.decode() + except: + pass + server_statistics.hincrby('mail_by_tld:{}'.format(item_date), tld, 1) + + msg = 'Mails;{};{};{};Checked {} e-mail(s);{}'.format(Item.get_source(item_id), item_date, Item.get_item_basename(item_id), num_valid_email, item_id) + print(msg) + + if num_valid_email > mail_threshold: + publisher.warning(msg) + #Send to duplicate + p.populate_set_out(item_id, 'Duplicate') + #tags + msg = 'infoleak:automatic-detection="mail";{}'.format(item_id) + p.populate_set_out(msg, 'Tags') + else: + publisher.info(msg) + + time.sleep(30) - faup.decode(mail) - tld = faup.get()['tld'] - try: - tld = tld.decode() - except: - pass - server_statistics.hincrby('mail_by_tld:'+date, tld, MX_values[1][mail]) - else: - publisher.info(to_print) - #create country statistics - for mail in MX_values[1]: - print('mail;{};{};{}'.format(MX_values[1][mail], mail, PST.p_date)) - p.populate_set_out('mail;{};{};{}'.format(MX_values[1][mail], mail, PST.p_date), 'ModuleStats') - prec_item_id = item_id else: - publisher.debug("Script Mails is Idling 10s") - print('Sleeping') time.sleep(10)