mirror of https://github.com/CIRCL/AIL-framework
				
				
				
			
		
			
				
	
	
		
			161 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			161 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Python
		
	
	
| #!/usr/bin/python3
 | |
| 
 | |
| import re
 | |
| import os
 | |
| import configparser
 | |
| import dns.resolver
 | |
| 
 | |
| from pubsublogger import publisher
 | |
| 
 | |
| from datetime import timedelta
 | |
| 
 | |
| 
 | |
| def is_luhn_valid(card_number):
 | |
|     """Apply the Luhn algorithm to validate credit card.
 | |
| 
 | |
|     :param card_number: -- (int) card number
 | |
| 
 | |
| 
 | |
|     """
 | |
|     r = [int(ch) for ch in str(card_number)][::-1]
 | |
|     return (sum(r[0::2]) + sum(sum(divmod(d*2, 10)) for d in r[1::2])) % 10 == 0
 | |
| 
 | |
| 
 | |
| def checking_MX_record(r_serv, adress_set, addr_dns):
 | |
|     """Check if emails MX domains are responding.
 | |
| 
 | |
|     :param r_serv: -- Redis connexion database
 | |
|     :param adress_set: -- (set) This is a set of emails adress
 | |
|     :param adress_set: -- (str) This is a server dns address
 | |
|     :return: (int) Number of adress with a responding and valid MX domains
 | |
| 
 | |
|     This function will split the email adress and try to resolve their domains
 | |
|     names: on example@gmail.com it will try to resolve gmail.com
 | |
| 
 | |
|     """
 | |
| 
 | |
|     #remove duplicate
 | |
|     adress_set = list(set(adress_set))
 | |
| 
 | |
|     score = 0
 | |
|     num = len(adress_set)
 | |
|     WalidMX = set([])
 | |
|     validMX = {}
 | |
|     # Transforming the set into a string
 | |
|     MXdomains = re.findall("@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,20}", str(adress_set).lower())
 | |
|     resolver = dns.resolver.Resolver()
 | |
|     resolver.nameservers = [addr_dns]
 | |
|     resolver.timeout = 5
 | |
|     resolver.lifetime = 2
 | |
|     if MXdomains != []:
 | |
| 
 | |
|             for MXdomain in MXdomains:
 | |
|                 try:
 | |
|                     MXdomain = MXdomain[1:]
 | |
|                     # Already in Redis living.
 | |
|                     if r_serv.exists(MXdomain):
 | |
|                         score += 1
 | |
|                         WalidMX.add(MXdomain)
 | |
|                         validMX[MXdomain] = validMX.get(MXdomain, 0) + 1
 | |
|                     # Not already in Redis
 | |
|                     else:
 | |
|                         # If I'm Walid MX domain
 | |
|                         if resolver.query(MXdomain, rdtype=dns.rdatatype.MX):
 | |
|                             # Gonna be added in redis.
 | |
|                             r_serv.setex(MXdomain, 1, timedelta(days=1))
 | |
|                             score += 1
 | |
|                             WalidMX.add(MXdomain)
 | |
|                             validMX[MXdomain] = validMX.get(MXdomain, 0) + 1
 | |
|                         else:
 | |
|                             pass
 | |
| 
 | |
|                 except dns.resolver.NoNameservers:
 | |
|                     publisher.debug('NoNameserver, No non-broken nameservers are available to answer the query.')
 | |
|                     print('NoNameserver, No non-broken nameservers are available to answer the query.')
 | |
| 
 | |
|                 except dns.resolver.NoAnswer:
 | |
|                     publisher.debug('NoAnswer, The response did not contain an answer to the question.')
 | |
|                     print('NoAnswer, The response did not contain an answer to the question.')
 | |
| 
 | |
|                 except dns.name.EmptyLabel:
 | |
|                     publisher.debug('SyntaxError: EmptyLabel')
 | |
|                     print('SyntaxError: EmptyLabel')
 | |
| 
 | |
|                 except dns.resolver.NXDOMAIN:
 | |
|                     r_serv.setex(MXdomain[1:], 1, timedelta(days=1))
 | |
|                     publisher.debug('The query name does not exist.')
 | |
|                     print('The query name does not exist.')
 | |
| 
 | |
|                 except dns.name.LabelTooLong:
 | |
|                     publisher.debug('The Label is too long')
 | |
|                     print('The Label is too long')
 | |
| 
 | |
|                 except dns.resolver.Timeout:
 | |
|                     print('timeout')
 | |
|                     r_serv.setex(MXdomain, 1, timedelta(days=1))
 | |
| 
 | |
|                 except Exception as e:
 | |
|                     print(e)
 | |
| 
 | |
|     publisher.debug("emails before: {0} after: {1} (valid)".format(num, score))
 | |
|     #return (num, WalidMX)
 | |
|     return (num, validMX)
 | |
| 
 | |
| 
 | |
| def checking_A_record(r_serv, domains_set):
 | |
|     configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg')
 | |
|     if not os.path.exists(configfile):
 | |
|         raise Exception('Unable to find the configuration file. \
 | |
|                         Did you set environment variables? \
 | |
|                         Or activate the virtualenv.')
 | |
|     cfg = configparser.ConfigParser()
 | |
|     cfg.read(configfile)
 | |
|     dns_server = cfg.get("Web", "dns")
 | |
| 
 | |
|     score = 0
 | |
|     num = len(domains_set)
 | |
|     WalidA = set([])
 | |
|     resolver = dns.resolver.Resolver()
 | |
|     resolver.nameservers = [dns_server]
 | |
|     resolver.timeout = 5
 | |
|     resolver.lifetime = 2
 | |
| 
 | |
|     for Adomain in domains_set:
 | |
|         try:
 | |
|             # Already in Redis living.
 | |
|             if r_serv.exists(Adomain):
 | |
|                 score += 1
 | |
|                 WalidA.add(Adomain)
 | |
|             # Not already in Redis
 | |
|             else:
 | |
|                 # If I'm Walid domain
 | |
|                 if resolver.query(Adomain, rdtype=dns.rdatatype.A):
 | |
|                     # Gonna be added in redis.
 | |
|                     r_serv.setex(Adomain, 1, timedelta(days=1))
 | |
|                     score += 1
 | |
|                     WalidA.add(Adomain)
 | |
|                 else:
 | |
|                     pass
 | |
| 
 | |
|         except dns.resolver.NoNameservers:
 | |
|             publisher.debug('NoNameserver, No non-broken nameservers are available to answer the query.')
 | |
| 
 | |
|         except dns.resolver.NoAnswer:
 | |
|             publisher.debug('NoAnswer, The response did not contain an answer to the question.')
 | |
| 
 | |
|         except dns.name.EmptyLabel:
 | |
|             publisher.debug('SyntaxError: EmptyLabel')
 | |
| 
 | |
|         except dns.resolver.NXDOMAIN:
 | |
|             r_serv.setex(Adomain[1:], 1, timedelta(days=1))
 | |
|             publisher.debug('The query name does not exist.')
 | |
| 
 | |
|         except dns.name.LabelTooLong:
 | |
|             publisher.debug('The Label is too long')
 | |
| 
 | |
|         except Exception as e:
 | |
|             print(e)
 | |
| 
 | |
|     publisher.debug("URLs before: {0} after: {1} (valid)".format(num, score))
 | |
|     return (num, WalidA)
 |