fix: [Mail module] fix dns caching + use redis queue

pull/519/head
Terrtia 2020-05-14 14:26:52 +02:00
parent 5475660785
commit f8ff232c3e
No known key found for this signature in database
GPG Key ID: 1E1B1F50D84613D0
1 changed files with 17 additions and 11 deletions

View File

@ -14,6 +14,7 @@ It apply mail regexes on item content and warn if above a threshold.
import os
import re
import sys
import uuid
import redis
import time
import datetime
@ -22,7 +23,6 @@ import dns.resolver
import dns.exception
from multiprocessing import Process as Proc
from multiprocessing import Queue
from pubsublogger import publisher
from Helper import Process
@ -49,7 +49,7 @@ def is_mxdomain_in_cache(mxdomain):
return r_serv_cache.exists('mxdomain:{}'.format(mxdomain))
def save_mxdomain_in_cache(mxdomain):
r_serv_cache.setex(mxdomain, 1, datetime.timedelta(days=1))
r_serv_cache.setex('mxdomain:{}'.format(mxdomain), 1, datetime.timedelta(days=1))
def check_mx_record(set_mxdomains, dns_server):
"""Check if emails MX domains are responding.
@ -110,8 +110,14 @@ def check_mx_record(set_mxdomains, dns_server):
print(e)
return valid_mxdomain
def extract_all_emails(queue, item_content):
queue.put(re.findall(email_regex, item_content))
def extract_all_emails(redis_key, item_content):
all_emails = re.findall(email_regex, item_content)
if len(all_emails) > 1:
r_serv_cache.sadd(redis_key, *all_emails)
r_serv_cache.expire(redis_key, 360)
elif all_emails:
r_serv_cache.sadd(redis_key, all_emails[0])
r_serv_cache.expire(redis_key, 360)
if __name__ == "__main__":
publisher.port = 6380
@ -132,10 +138,11 @@ if __name__ == "__main__":
email_regex = "[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}"
q = Queue()
redis_key = 'mail_extracted:{}'.format(str(uuid.uuid4()))
while True:
message = p.get_from_set()
message = 'urlextract/2020/05/06/throwbin.io_guNTS59b94413e7-a6e8-428a-b561-c9829ec77587.gz 2'
if message is not None:
item_id, score = message.split()
@ -144,9 +151,9 @@ if __name__ == "__main__":
item_content = Item.get_item_content(item_id)
proc = Proc(target=extract_all_emails, args=(q, item_content))
proc.start()
proc = Proc(target=extract_all_emails, args=(redis_key, item_content, ))
try:
proc.start()
proc.join(max_execution_time)
if proc.is_alive():
proc.terminate()
@ -156,15 +163,14 @@ if __name__ == "__main__":
publisher.info(err_mess)
continue
else:
all_emails = q.get()
all_emails = r_serv_cache.smembers(redis_key)
r_serv_cache.delete(redis_key)
proc.terminate()
except KeyboardInterrupt:
print("Caught KeyboardInterrupt, terminating workers")
proc.terminate()
sys.exit(0)
# filtering duplicate
all_emails = set(all_emails)
# get MXdomains
set_mxdomains = set()
dict_mxdomains_email = {}