fix: [Mails] refactor Mail module

pull/497/head
Terrtia 2020-05-11 11:11:05 +02:00
parent 8c864fdba0
commit 0a525f369e
No known key found for this signature in database
GPG Key ID: 1E1B1F50D84613D0
1 changed files with 153 additions and 97 deletions

View File

@ -2,12 +2,12 @@
# -*-coding:UTF-8 -* # -*-coding:UTF-8 -*
""" """
The Mail Module The Mails Module
====================== ======================
This module is consuming the Redis-list created by the Categ module. This module is consuming the Redis-list created by the Categ module.
It apply mail regexes on paste content and warn if above a threshold. It apply mail regexes on item content and warn if above a threshold.
""" """
@ -17,18 +17,16 @@ import sys
import redis import redis
import time import time
import datetime import datetime
import dns.resolver
import dns.exception import dns.exception
from packages import Paste
from packages import lib_refine
from pubsublogger import publisher from pubsublogger import publisher
from Helper import Process
from pyfaup.faup import Faup from pyfaup.faup import Faup
from Helper import Process ## REGEX TIMEOUT ##
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages'))
import Item
import signal import signal
class TimeoutException(Exception): class TimeoutException(Exception):
@ -39,6 +37,87 @@ def timeout_handler(signum, frame):
signal.signal(signal.SIGALRM, timeout_handler) signal.signal(signal.SIGALRM, timeout_handler)
max_execution_time = 30 max_execution_time = 30
## -- ##
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages'))
import Item
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/'))
import ConfigLoader
## LOAD CONFIG ##
config_loader = ConfigLoader.ConfigLoader()
server_statistics = config_loader.get_redis_conn("ARDB_Statistics")
r_serv_cache = config_loader.get_redis_conn("Redis_Cache")
dns_server = config_loader.get_config_str('Mail', 'dns')
config_loader = None
## -- ##
def is_mxdomain_in_cache(mxdomain):
return r_serv_cache.exists('mxdomain:{}'.format(mxdomain))
def save_mxdomain_in_cache(mxdomain):
r_serv_cache.setex(mxdomain, 1, datetime.timedelta(days=1))
def check_mx_record(set_mxdomains, dns_server):
"""Check if emails MX domains are responding.
:param adress_set: -- (set) This is a set of emails domains
:return: (int) Number of adress with a responding and valid MX domains
"""
resolver = dns.resolver.Resolver()
resolver.nameservers = [dns_server]
resolver.timeout = 5.0
resolver.lifetime = 2.0
valid_mxdomain = []
for mxdomain in set_mxdomains:
# check if is in cache
# # TODO:
if is_mxdomain_in_cache(mxdomain):
valid_mxdomain.append(mxdomain)
else:
# DNS resolution
try:
answers = resolver.query(mxdomain, rdtype=dns.rdatatype.MX)
if answers:
save_mxdomain_in_cache(mxdomain)
valid_mxdomain.append(mxdomain)
# DEBUG
# print('---')
# print(answers.response)
# print(answers.qname)
# print(answers.rdtype)
# print(answers.rdclass)
# print(answers.nameserver)
# print()
except dns.resolver.NoNameservers:
publisher.debug('NoNameserver, No non-broken nameservers are available to answer the query.')
print('NoNameserver, No non-broken nameservers are available to answer the query.')
except dns.resolver.NoAnswer:
publisher.debug('NoAnswer, The response did not contain an answer to the question.')
print('NoAnswer, The response did not contain an answer to the question.')
except dns.name.EmptyLabel:
publisher.debug('SyntaxError: EmptyLabel')
print('SyntaxError: EmptyLabel')
except dns.resolver.NXDOMAIN:
#save_mxdomain_in_cache(mxdomain)
publisher.debug('The query name does not exist.')
print('The query name does not exist.')
except dns.name.LabelTooLong:
publisher.debug('The Label is too long')
print('The Label is too long')
except dns.exception.Timeout:
print('dns timeout')
#save_mxdomain_in_cache(mxdomain)
except Exception as e:
print(e)
return valid_mxdomain
if __name__ == "__main__": if __name__ == "__main__":
publisher.port = 6380 publisher.port = 6380
@ -49,115 +128,92 @@ if __name__ == "__main__":
faup = Faup() faup = Faup()
p = Process(config_section) p = Process(config_section)
addr_dns = p.config.get("Mail", "dns")
# REDIS # publisher.info("Mails module started")
r_serv_cache = redis.StrictRedis(
host=p.config.get("Redis_Cache", "host"),
port=p.config.getint("Redis_Cache", "port"),
db=p.config.getint("Redis_Cache", "db"),
decode_responses=True)
# ARDB #
server_statistics = redis.StrictRedis(
host=p.config.get("ARDB_Statistics", "host"),
port=p.config.getint("ARDB_Statistics", "port"),
db=p.config.getint("ARDB_Statistics", "db"),
decode_responses=True)
# FUNCTIONS # # Numbers of Mails needed to Tags
publisher.info("Suscribed to channel mails_categ") mail_threshold = 10
# FIXME For retro compatibility
channel = 'mails_categ'
prec_item_id = None
# Log as critical if there are more that that amout of valid emails
is_critical = 10
max_execution_time = 30
email_regex = "[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}" email_regex = "[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}"
MX_values = None
while True: while True:
message = p.get_from_set() #message = p.get_from_set()
message = 'archive/pastebin.com_pro/2020/02/07/Fnz1wUim.gz 3'
if message is not None: if message is not None:
item_id, score = message.split() item_id, score = message.split()
if prec_item_id is None or item_id != prec_item_id: item_content = Item.get_item_content(item_id)
PST = Paste.Paste(item_id) item_date = Item.get_item_date(item_id)
# max execution time on regex # Get all emails address
signal.alarm(max_execution_time) signal.alarm(max_execution_time)
try: try:
l_mails = re.findall(email_regex, Item.get_item_content(item_id)) all_emails = re.findall(email_regex, item_content)
except TimeoutException: except TimeoutException:
p.incr_module_timeout_statistic() # add encoder type p.incr_module_timeout_statistic()
err_mess = "Mail: processing timeout: {}".format(item_id) err_mess = "Mails: processing timeout: {}".format(item_id)
print(err_mess) print(err_mess)
publisher.info(err_mess) publisher.info(err_mess)
continue continue
else: else:
signal.alarm(0) signal.alarm(0)
l_mails = list(set(l_mails)) # filtering duplicate
all_emails = set(all_emails)
# max execution time on regex # get MXdomains
signal.alarm(max_execution_time) set_mxdomains = set()
try: dict_mxdomains_email = {}
# Transforming the set into a string for email in all_emails:
MXdomains = re.findall("@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,20}", str(l_mails).lower()) mxdomain = email.split('@')[1].lower()
except TimeoutException: if not mxdomain in dict_mxdomains_email:
p.incr_module_timeout_statistic() # add encoder type dict_mxdomains_email[mxdomain] = []
err_mess = "Mail: processing timeout: {}".format(item_id) set_mxdomains.add(mxdomain)
print(err_mess) dict_mxdomains_email[mxdomain].append(email)
publisher.info(err_mess)
continue
else:
signal.alarm(0)
MX_values = lib_refine.checking_MX_record(r_serv_cache, MXdomains, addr_dns) ## TODO: add MAIL trackers
if MX_values[0] >= 1: print(all_emails)
print()
PST.__setattr__(channel, MX_values) valid_mx = check_mx_record(set_mxdomains, dns_server)
PST.save_attribute_redis(channel, (MX_values[0], print(valid_mx)
list(MX_values[1])))
to_print = 'Mails;{};{};{};Checked {} e-mail(s);{}'.\ num_valid_email = 0
format(PST.p_source, PST.p_date, PST.p_name, for domain_mx in valid_mx:
MX_values[0], PST.p_rel_path) num_valid_email += len(dict_mxdomains_email[domain_mx])
if MX_values[0] > is_critical:
publisher.warning(to_print)
#Send to duplicate
p.populate_set_out(item_id, 'Duplicate')
msg = 'infoleak:automatic-detection="mail";{}'.format(item_id) for email in dict_mxdomains_email[domain_mx]:
p.populate_set_out(msg, 'Tags') msg = 'mail;{};{};{}'.format(1, email, item_date)
p.populate_set_out(msg, 'ModuleStats')
#create country statistics # Create country stats
date = datetime.datetime.now().strftime("%Y%m") faup.decode(email)
for mail in MX_values[1]: tld = faup.get()['tld']
print('mail;{};{};{}'.format(MX_values[1][mail], mail, PST.p_date)) try:
p.populate_set_out('mail;{};{};{}'.format(MX_values[1][mail], mail, PST.p_date), 'ModuleStats') tld = tld.decode()
except:
pass
server_statistics.hincrby('mail_by_tld:{}'.format(item_date), tld, 1)
msg = 'Mails;{};{};{};Checked {} e-mail(s);{}'.format(Item.get_source(item_id), item_date, Item.get_item_basename(item_id), num_valid_email, item_id)
print(msg)
if num_valid_email > mail_threshold:
publisher.warning(msg)
#Send to duplicate
p.populate_set_out(item_id, 'Duplicate')
#tags
msg = 'infoleak:automatic-detection="mail";{}'.format(item_id)
p.populate_set_out(msg, 'Tags')
else:
publisher.info(msg)
time.sleep(30)
faup.decode(mail)
tld = faup.get()['tld']
try:
tld = tld.decode()
except:
pass
server_statistics.hincrby('mail_by_tld:'+date, tld, MX_values[1][mail])
else:
publisher.info(to_print)
#create country statistics
for mail in MX_values[1]:
print('mail;{};{};{}'.format(MX_values[1][mail], mail, PST.p_date))
p.populate_set_out('mail;{};{};{}'.format(MX_values[1][mail], mail, PST.p_date), 'ModuleStats')
prec_item_id = item_id
else: else:
publisher.debug("Script Mails is Idling 10s")
print('Sleeping')
time.sleep(10) time.sleep(10)