mirror of https://github.com/CIRCL/AIL-framework
				
				
				
			
		
			
				
	
	
		
			161 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			161 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Python
		
	
	
#!/usr/bin/python3
 | 
						|
 | 
						|
import re
 | 
						|
import os
 | 
						|
import configparser
 | 
						|
import dns.resolver
 | 
						|
 | 
						|
from pubsublogger import publisher
 | 
						|
 | 
						|
from datetime import timedelta
 | 
						|
 | 
						|
 | 
						|
def is_luhn_valid(card_number):
 | 
						|
    """Apply the Luhn algorithm to validate credit card.
 | 
						|
 | 
						|
    :param card_number: -- (int) card number
 | 
						|
 | 
						|
 | 
						|
    """
 | 
						|
    r = [int(ch) for ch in str(card_number)][::-1]
 | 
						|
    return (sum(r[0::2]) + sum(sum(divmod(d*2, 10)) for d in r[1::2])) % 10 == 0
 | 
						|
 | 
						|
 | 
						|
def checking_MX_record(r_serv, adress_set, addr_dns):
 | 
						|
    """Check if emails MX domains are responding.
 | 
						|
 | 
						|
    :param r_serv: -- Redis connexion database
 | 
						|
    :param adress_set: -- (set) This is a set of emails adress
 | 
						|
    :param adress_set: -- (str) This is a server dns address
 | 
						|
    :return: (int) Number of adress with a responding and valid MX domains
 | 
						|
 | 
						|
    This function will split the email adress and try to resolve their domains
 | 
						|
    names: on example@gmail.com it will try to resolve gmail.com
 | 
						|
 | 
						|
    """
 | 
						|
 | 
						|
    #remove duplicate
 | 
						|
    adress_set = list(set(adress_set))
 | 
						|
 | 
						|
    score = 0
 | 
						|
    num = len(adress_set)
 | 
						|
    WalidMX = set([])
 | 
						|
    validMX = {}
 | 
						|
    # Transforming the set into a string
 | 
						|
    MXdomains = re.findall("@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,20}", str(adress_set).lower())
 | 
						|
    resolver = dns.resolver.Resolver()
 | 
						|
    resolver.nameservers = [addr_dns]
 | 
						|
    resolver.timeout = 5
 | 
						|
    resolver.lifetime = 2
 | 
						|
    if MXdomains != []:
 | 
						|
 | 
						|
            for MXdomain in MXdomains:
 | 
						|
                try:
 | 
						|
                    MXdomain = MXdomain[1:]
 | 
						|
                    # Already in Redis living.
 | 
						|
                    if r_serv.exists(MXdomain):
 | 
						|
                        score += 1
 | 
						|
                        WalidMX.add(MXdomain)
 | 
						|
                        validMX[MXdomain] = validMX.get(MXdomain, 0) + 1
 | 
						|
                    # Not already in Redis
 | 
						|
                    else:
 | 
						|
                        # If I'm Walid MX domain
 | 
						|
                        if resolver.query(MXdomain, rdtype=dns.rdatatype.MX):
 | 
						|
                            # Gonna be added in redis.
 | 
						|
                            r_serv.setex(MXdomain, 1, timedelta(days=1))
 | 
						|
                            score += 1
 | 
						|
                            WalidMX.add(MXdomain)
 | 
						|
                            validMX[MXdomain] = validMX.get(MXdomain, 0) + 1
 | 
						|
                        else:
 | 
						|
                            pass
 | 
						|
 | 
						|
                except dns.resolver.NoNameservers:
 | 
						|
                    publisher.debug('NoNameserver, No non-broken nameservers are available to answer the query.')
 | 
						|
                    print('NoNameserver, No non-broken nameservers are available to answer the query.')
 | 
						|
 | 
						|
                except dns.resolver.NoAnswer:
 | 
						|
                    publisher.debug('NoAnswer, The response did not contain an answer to the question.')
 | 
						|
                    print('NoAnswer, The response did not contain an answer to the question.')
 | 
						|
 | 
						|
                except dns.name.EmptyLabel:
 | 
						|
                    publisher.debug('SyntaxError: EmptyLabel')
 | 
						|
                    print('SyntaxError: EmptyLabel')
 | 
						|
 | 
						|
                except dns.resolver.NXDOMAIN:
 | 
						|
                    r_serv.setex(MXdomain[1:], 1, timedelta(days=1))
 | 
						|
                    publisher.debug('The query name does not exist.')
 | 
						|
                    print('The query name does not exist.')
 | 
						|
 | 
						|
                except dns.name.LabelTooLong:
 | 
						|
                    publisher.debug('The Label is too long')
 | 
						|
                    print('The Label is too long')
 | 
						|
 | 
						|
                except dns.resolver.Timeout:
 | 
						|
                    print('timeout')
 | 
						|
                    r_serv.setex(MXdomain, 1, timedelta(days=1))
 | 
						|
 | 
						|
                except Exception as e:
 | 
						|
                    print(e)
 | 
						|
 | 
						|
    publisher.debug("emails before: {0} after: {1} (valid)".format(num, score))
 | 
						|
    #return (num, WalidMX)
 | 
						|
    return (num, validMX)
 | 
						|
 | 
						|
 | 
						|
def checking_A_record(r_serv, domains_set):
 | 
						|
    configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg')
 | 
						|
    if not os.path.exists(configfile):
 | 
						|
        raise Exception('Unable to find the configuration file. \
 | 
						|
                        Did you set environment variables? \
 | 
						|
                        Or activate the virtualenv.')
 | 
						|
    cfg = configparser.ConfigParser()
 | 
						|
    cfg.read(configfile)
 | 
						|
    dns_server = cfg.get("Web", "dns")
 | 
						|
 | 
						|
    score = 0
 | 
						|
    num = len(domains_set)
 | 
						|
    WalidA = set([])
 | 
						|
    resolver = dns.resolver.Resolver()
 | 
						|
    resolver.nameservers = [dns_server]
 | 
						|
    resolver.timeout = 5
 | 
						|
    resolver.lifetime = 2
 | 
						|
 | 
						|
    for Adomain in domains_set:
 | 
						|
        try:
 | 
						|
            # Already in Redis living.
 | 
						|
            if r_serv.exists(Adomain):
 | 
						|
                score += 1
 | 
						|
                WalidA.add(Adomain)
 | 
						|
            # Not already in Redis
 | 
						|
            else:
 | 
						|
                # If I'm Walid domain
 | 
						|
                if resolver.query(Adomain, rdtype=dns.rdatatype.A):
 | 
						|
                    # Gonna be added in redis.
 | 
						|
                    r_serv.setex(Adomain, 1, timedelta(days=1))
 | 
						|
                    score += 1
 | 
						|
                    WalidA.add(Adomain)
 | 
						|
                else:
 | 
						|
                    pass
 | 
						|
 | 
						|
        except dns.resolver.NoNameservers:
 | 
						|
            publisher.debug('NoNameserver, No non-broken nameservers are available to answer the query.')
 | 
						|
 | 
						|
        except dns.resolver.NoAnswer:
 | 
						|
            publisher.debug('NoAnswer, The response did not contain an answer to the question.')
 | 
						|
 | 
						|
        except dns.name.EmptyLabel:
 | 
						|
            publisher.debug('SyntaxError: EmptyLabel')
 | 
						|
 | 
						|
        except dns.resolver.NXDOMAIN:
 | 
						|
            r_serv.setex(Adomain[1:], 1, timedelta(days=1))
 | 
						|
            publisher.debug('The query name does not exist.')
 | 
						|
 | 
						|
        except dns.name.LabelTooLong:
 | 
						|
            publisher.debug('The Label is too long')
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            print(e)
 | 
						|
 | 
						|
    publisher.debug("URLs before: {0} after: {1} (valid)".format(num, score))
 | 
						|
    return (num, WalidA)
 |