mirror of https://github.com/CIRCL/AIL-framework
				
				
				
			
		
			
				
	
	
		
			132 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			132 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
| import re
 | |
| import dns.resolver
 | |
| 
 | |
| from pubsublogger import publisher
 | |
| 
 | |
| from datetime import timedelta
 | |
| 
 | |
| 
 | |
| def is_luhn_valid(card_number):
 | |
|     """Apply the Luhn algorithm to validate credit card.
 | |
| 
 | |
|     :param card_number: -- (int) card number
 | |
| 
 | |
| 
 | |
|     """
 | |
|     r = [int(ch) for ch in str(card_number)][::-1]
 | |
|     return (sum(r[0::2]) + sum(sum(divmod(d*2, 10)) for d in r[1::2])) % 10 == 0
 | |
| 
 | |
| 
 | |
| def checking_MX_record(r_serv, adress_set):
 | |
|     """Check if emails MX domains are responding.
 | |
| 
 | |
|     :param r_serv: -- Redis connexion database
 | |
|     :param adress_set: -- (set) This is a set of emails adress
 | |
|     :return: (int) Number of adress with a responding and valid MX domains
 | |
| 
 | |
|     This function will split the email adress and try to resolve their domains
 | |
|     names: on example@gmail.com it will try to resolve gmail.com
 | |
| 
 | |
|     """
 | |
|     score = 0
 | |
|     num = len(adress_set)
 | |
|     WalidMX = set([])
 | |
|     # Transforming the set into a string
 | |
|     MXdomains = re.findall("@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,20}", str(adress_set).lower())
 | |
|     resolver = dns.resolver.Resolver()
 | |
|     resolver.nameservers = ['149.13.33.69']
 | |
|     resolver.timeout = 5
 | |
|     resolver.lifetime = 2
 | |
|     if MXdomains != []:
 | |
| 
 | |
|             for MXdomain in set(MXdomains):
 | |
|                 try:
 | |
|                     # Already in Redis living.
 | |
|                     if r_serv.exists(MXdomain[1:]):
 | |
|                         score += 1
 | |
|                         WalidMX.add(MXdomain[1:])
 | |
|                     # Not already in Redis
 | |
|                     else:
 | |
|                         # If I'm Walid MX domain
 | |
|                         if resolver.query(MXdomain[1:], rdtype=dns.rdatatype.MX):
 | |
|                             # Gonna be added in redis.
 | |
|                             r_serv.setex(MXdomain[1:], 1, timedelta(days=1))
 | |
|                             score += 1
 | |
|                             WalidMX.add(MXdomain[1:])
 | |
|                         else:
 | |
|                             pass
 | |
| 
 | |
|                 except dns.resolver.NoNameservers:
 | |
|                     publisher.debug('NoNameserver, No non-broken nameservers are available to answer the query.')
 | |
| 
 | |
|                 except dns.resolver.NoAnswer:
 | |
|                     publisher.debug('NoAnswer, The response did not contain an answer to the question.')
 | |
| 
 | |
|                 except dns.name.EmptyLabel:
 | |
|                     publisher.debug('SyntaxError: EmptyLabel')
 | |
| 
 | |
|                 except dns.resolver.NXDOMAIN:
 | |
|                     r_serv.setex(MXdomain[1:], 1, timedelta(days=1))
 | |
|                     publisher.debug('The query name does not exist.')
 | |
| 
 | |
|                 except dns.name.LabelTooLong:
 | |
|                     publisher.debug('The Label is too long')
 | |
| 
 | |
|                 except dns.resolver.Timeout:
 | |
|                     r_serv.setex(MXdomain[1:], 1, timedelta(days=1))
 | |
| 
 | |
|                 except Exception as e:
 | |
|                     print e
 | |
| 
 | |
|     publisher.debug("emails before: {0} after: {1} (valid)".format(num, score))
 | |
|     return (num, WalidMX)
 | |
| 
 | |
| 
 | |
| def checking_A_record(r_serv, domains_set):
 | |
|     score = 0
 | |
|     num = len(domains_set)
 | |
|     WalidA = set([])
 | |
|     resolver = dns.resolver.Resolver()
 | |
|     resolver.nameservers = ['149.13.33.69']
 | |
|     resolver.timeout = 5
 | |
|     resolver.lifetime = 2
 | |
| 
 | |
|     for Adomain in domains_set:
 | |
|         try:
 | |
|             # Already in Redis living.
 | |
|             if r_serv.exists(Adomain):
 | |
|                 score += 1
 | |
|                 WalidA.add(Adomain)
 | |
|             # Not already in Redis
 | |
|             else:
 | |
|                 # If I'm Walid domain
 | |
|                 if resolver.query(Adomain, rdtype=dns.rdatatype.A):
 | |
|                     # Gonna be added in redis.
 | |
|                     r_serv.setex(Adomain, 1, timedelta(days=1))
 | |
|                     score += 1
 | |
|                     WalidA.add(Adomain)
 | |
|                 else:
 | |
|                     pass
 | |
| 
 | |
|         except dns.resolver.NoNameservers:
 | |
|             publisher.debug('NoNameserver, No non-broken nameservers are available to answer the query.')
 | |
| 
 | |
|         except dns.resolver.NoAnswer:
 | |
|             publisher.debug('NoAnswer, The response did not contain an answer to the question.')
 | |
| 
 | |
|         except dns.name.EmptyLabel:
 | |
|             publisher.debug('SyntaxError: EmptyLabel')
 | |
| 
 | |
|         except dns.resolver.NXDOMAIN:
 | |
|             r_serv.setex(Adomain[1:], 1, timedelta(days=1))
 | |
|             publisher.debug('The query name does not exist.')
 | |
| 
 | |
|         except dns.name.LabelTooLong:
 | |
|             publisher.debug('The Label is too long')
 | |
| 
 | |
|         except Exception as e:
 | |
|             print e
 | |
| 
 | |
|     publisher.debug("URLs before: {0} after: {1} (valid)".format(num, score))
 | |
|     return (num, WalidA)
 |