AIL-framework/bin/packages/lib_refine.py

161 lines
5.6 KiB
Python
Raw Normal View History

2018-05-04 13:53:29 +02:00
#!/usr/bin/python3
import re
import os
import configparser
import dns.resolver
from pubsublogger import publisher
from datetime import timedelta
def is_luhn_valid(card_number):
"""Apply the Luhn algorithm to validate credit card.
:param card_number: -- (int) card number
"""
r = [int(ch) for ch in str(card_number)][::-1]
return (sum(r[0::2]) + sum(sum(divmod(d*2, 10)) for d in r[1::2])) % 10 == 0
def checking_MX_record(r_serv, adress_set, addr_dns):
"""Check if emails MX domains are responding.
:param r_serv: -- Redis connexion database
:param adress_set: -- (set) This is a set of emails adress
:param adress_set: -- (str) This is a server dns address
:return: (int) Number of adress with a responding and valid MX domains
This function will split the email adress and try to resolve their domains
names: on example@gmail.com it will try to resolve gmail.com
"""
#remove duplicate
adress_set = list(set(adress_set))
score = 0
num = len(adress_set)
WalidMX = set([])
2018-07-30 09:21:22 +02:00
validMX = {}
# Transforming the set into a string
MXdomains = re.findall("@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,20}", str(adress_set).lower())
2014-09-04 11:46:07 +02:00
resolver = dns.resolver.Resolver()
resolver.nameservers = [addr_dns]
2014-09-04 11:46:07 +02:00
resolver.timeout = 5
resolver.lifetime = 2
if MXdomains != []:
2018-07-30 09:21:22 +02:00
for MXdomain in MXdomains:
try:
2018-07-30 09:21:22 +02:00
MXdomain = MXdomain[1:]
# Already in Redis living.
2018-07-30 09:21:22 +02:00
if r_serv.exists(MXdomain):
score += 1
2018-07-30 09:21:22 +02:00
WalidMX.add(MXdomain)
validMX[MXdomain] = validMX.get(MXdomain, 0) + 1
# Not already in Redis
else:
# If I'm Walid MX domain
2018-07-30 09:21:22 +02:00
if resolver.query(MXdomain, rdtype=dns.rdatatype.MX):
# Gonna be added in redis.
2018-07-30 09:21:22 +02:00
r_serv.setex(MXdomain, 1, timedelta(days=1))
score += 1
2018-07-30 09:21:22 +02:00
WalidMX.add(MXdomain)
validMX[MXdomain] = validMX.get(MXdomain, 0) + 1
else:
pass
except dns.resolver.NoNameservers:
publisher.debug('NoNameserver, No non-broken nameservers are available to answer the query.')
print('NoNameserver, No non-broken nameservers are available to answer the query.')
except dns.resolver.NoAnswer:
publisher.debug('NoAnswer, The response did not contain an answer to the question.')
print('NoAnswer, The response did not contain an answer to the question.')
except dns.name.EmptyLabel:
publisher.debug('SyntaxError: EmptyLabel')
print('SyntaxError: EmptyLabel')
except dns.resolver.NXDOMAIN:
2014-09-04 11:46:07 +02:00
r_serv.setex(MXdomain[1:], 1, timedelta(days=1))
publisher.debug('The query name does not exist.')
print('The query name does not exist.')
except dns.name.LabelTooLong:
publisher.debug('The Label is too long')
print('The Label is too long')
2014-09-04 11:46:07 +02:00
except dns.resolver.Timeout:
print('timeout')
2018-07-30 09:21:22 +02:00
r_serv.setex(MXdomain, 1, timedelta(days=1))
2014-09-04 11:46:07 +02:00
except Exception as e:
2018-04-16 14:50:04 +02:00
print(e)
publisher.debug("emails before: {0} after: {1} (valid)".format(num, score))
2018-07-30 09:21:22 +02:00
#return (num, WalidMX)
return (num, validMX)
def checking_A_record(r_serv, domains_set):
configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg')
if not os.path.exists(configfile):
raise Exception('Unable to find the configuration file. \
Did you set environment variables? \
Or activate the virtualenv.')
cfg = configparser.ConfigParser()
cfg.read(configfile)
dns_server = cfg.get("Web", "dns")
score = 0
num = len(domains_set)
WalidA = set([])
2014-09-04 11:46:07 +02:00
resolver = dns.resolver.Resolver()
resolver.nameservers = [dns_server]
2014-09-04 11:46:07 +02:00
resolver.timeout = 5
resolver.lifetime = 2
for Adomain in domains_set:
try:
# Already in Redis living.
if r_serv.exists(Adomain):
score += 1
WalidA.add(Adomain)
# Not already in Redis
else:
# If I'm Walid domain
2014-09-04 11:46:07 +02:00
if resolver.query(Adomain, rdtype=dns.rdatatype.A):
# Gonna be added in redis.
2014-09-04 11:46:07 +02:00
r_serv.setex(Adomain, 1, timedelta(days=1))
score += 1
WalidA.add(Adomain)
else:
pass
except dns.resolver.NoNameservers:
publisher.debug('NoNameserver, No non-broken nameservers are available to answer the query.')
except dns.resolver.NoAnswer:
publisher.debug('NoAnswer, The response did not contain an answer to the question.')
except dns.name.EmptyLabel:
publisher.debug('SyntaxError: EmptyLabel')
except dns.resolver.NXDOMAIN:
2014-09-04 11:46:07 +02:00
r_serv.setex(Adomain[1:], 1, timedelta(days=1))
publisher.debug('The query name does not exist.')
except dns.name.LabelTooLong:
publisher.debug('The Label is too long')
2014-09-04 11:46:07 +02:00
except Exception as e:
2018-04-16 14:50:04 +02:00
print(e)
publisher.debug("URLs before: {0} after: {1} (valid)".format(num, score))
return (num, WalidA)