diff --git a/bin/Mail.py b/bin/Mail.py index 732e048a..9af5b215 100755 --- a/bin/Mail.py +++ b/bin/Mail.py @@ -14,6 +14,7 @@ It apply mail regexes on item content and warn if above a threshold. import os import re import sys +import uuid import redis import time import datetime @@ -22,7 +23,6 @@ import dns.resolver import dns.exception from multiprocessing import Process as Proc -from multiprocessing import Queue from pubsublogger import publisher from Helper import Process @@ -49,7 +49,7 @@ def is_mxdomain_in_cache(mxdomain): return r_serv_cache.exists('mxdomain:{}'.format(mxdomain)) def save_mxdomain_in_cache(mxdomain): - r_serv_cache.setex(mxdomain, 1, datetime.timedelta(days=1)) + r_serv_cache.setex('mxdomain:{}'.format(mxdomain), 1, datetime.timedelta(days=1)) def check_mx_record(set_mxdomains, dns_server): """Check if emails MX domains are responding. @@ -110,8 +110,14 @@ def check_mx_record(set_mxdomains, dns_server): print(e) return valid_mxdomain -def extract_all_emails(queue, item_content): - queue.put(re.findall(email_regex, item_content)) +def extract_all_emails(redis_key, item_content): + all_emails = re.findall(email_regex, item_content) + if len(all_emails) > 1: + r_serv_cache.sadd(redis_key, *all_emails) + r_serv_cache.expire(redis_key, 360) + elif all_emails: + r_serv_cache.sadd(redis_key, all_emails[0]) + r_serv_cache.expire(redis_key, 360) if __name__ == "__main__": publisher.port = 6380 @@ -132,10 +138,11 @@ if __name__ == "__main__": email_regex = "[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}" - q = Queue() + redis_key = 'mail_extracted:{}'.format(str(uuid.uuid4())) while True: message = p.get_from_set() + message = 'urlextract/2020/05/06/throwbin.io_guNTS59b94413e7-a6e8-428a-b561-c9829ec77587.gz 2' if message is not None: item_id, score = message.split() @@ -144,9 +151,9 @@ if __name__ == "__main__": item_content = Item.get_item_content(item_id) - proc = Proc(target=extract_all_emails, args=(q, item_content)) - proc.start() + proc = Proc(target=extract_all_emails, args=(redis_key, item_content, )) try: + proc.start() proc.join(max_execution_time) if proc.is_alive(): proc.terminate() @@ -156,15 +163,14 @@ if __name__ == "__main__": publisher.info(err_mess) continue else: - all_emails = q.get() + all_emails = r_serv_cache.smembers(redis_key) + r_serv_cache.delete(redis_key) + proc.terminate() except KeyboardInterrupt: print("Caught KeyboardInterrupt, terminating workers") proc.terminate() sys.exit(0) - # filtering duplicate - all_emails = set(all_emails) - # get MXdomains set_mxdomains = set() dict_mxdomains_email = {}