diff --git a/bin/Mail.py b/bin/Mail.py index ffb23944..31b4d267 100755 --- a/bin/Mail.py +++ b/bin/Mail.py @@ -21,25 +21,14 @@ import datetime import dns.resolver import dns.exception +from multiprocessing import Process as Proc +from multiprocessing import Queue + from pubsublogger import publisher from Helper import Process from pyfaup.faup import Faup -## REGEX TIMEOUT ## -import signal - -def timeout_handler(signum, frame): - raise TimeoutException() - -class TimeoutException(Exception): - pass - - -signal.signal(signal.SIGALRM, timeout_handler) -max_execution_time = 20 -## -- ## - sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages')) import Item @@ -55,6 +44,7 @@ dns_server = config_loader.get_config_str('Mail', 'dns') config_loader = None ## -- ## + def is_mxdomain_in_cache(mxdomain): return r_serv_cache.exists('mxdomain:{}'.format(mxdomain)) @@ -120,6 +110,9 @@ def check_mx_record(set_mxdomains, dns_server): print(e) return valid_mxdomain +def extract_all_emails(queue, item_content): + queue.put(re.findall(email_regex, item_content)) + if __name__ == "__main__": publisher.port = 6380 publisher.channel = "Script" @@ -135,8 +128,12 @@ if __name__ == "__main__": # Numbers of Mails needed to Tags mail_threshold = 10 + max_execution_time = 30 + email_regex = "[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}" + q = Queue() + while True: message = p.get_from_set() @@ -144,23 +141,24 @@ if __name__ == "__main__": item_id, score = message.split() item_content = Item.get_item_content(item_id) - item_date = Item.get_item_date(item_id) - #print(item_id) - - # Get all emails address - signal.alarm(30) + proc = Proc(target=extract_all_emails, args=(q, item_content)) + proc.start() try: - all_emails = re.findall(email_regex, item_content) - except TimeoutException: - p.incr_module_timeout_statistic() - err_mess = "Mails: processing timeout: {}".format(item_id) - print(err_mess) - publisher.info(err_mess) - signal.signal(signal.SIGALRM, timeout_handler) - continue - finally: - signal.alarm(0) + proc.join(max_execution_time) + if proc.is_alive(): + proc.terminate() + p.incr_module_timeout_statistic() + err_mess = "Mails: processing timeout: {}".format(item_id) + print(err_mess) + publisher.info(err_mess) + continue + else: + all_emails = q.get() + except KeyboardInterrupt: + print("Caught KeyboardInterrupt, terminating workers") + proc.terminate() + sys.exit(0) # filtering duplicate all_emails = set(all_emails) @@ -179,6 +177,8 @@ if __name__ == "__main__": valid_mx = check_mx_record(set_mxdomains, dns_server) + item_date = Item.get_item_date(item_id) + num_valid_email = 0 for domain_mx in valid_mx: num_valid_email += len(dict_mxdomains_email[domain_mx])