misp-warninglists/tools/generate-crl-ip-domains.py

114 lines
4.2 KiB
Python
Executable File

#!/usr/bin/env python3
import csv
import logging
import multiprocessing.dummy
import urllib.parse
import sys
from OpenSSL.crypto import FILETYPE_PEM, load_certificate, X509
from pyasn1.codec.der.decoder import decode as asn1_decoder
from pyasn1_modules.rfc2459 import CRLDistPointsSyntax, AuthorityInfoAccessSyntax
from typing import List, Set
from dns.resolver import NoAnswer, NXDOMAIN, NoNameservers
from dns.exception import Timeout
import dns
from generator import download_to_file, get_version, write_to_file, get_abspath_source_file, create_resolver
def get_domain(url: str) -> str:
return urllib.parse.urlparse(url).hostname
def get_crl_ocsp_domains(cert: X509) -> List[str]:
crl_ocsp_domains = []
for i in range(0, cert.get_extension_count()):
extension = cert.get_extension(i)
short_name = extension.get_short_name()
if short_name == b'crlDistributionPoints':
decoded, _ = asn1_decoder(extension.get_data(), asn1Spec=CRLDistPointsSyntax())
for crl in decoded:
for generalName in crl.getComponentByName('distributionPoint').getComponentByName('fullName'):
crl_url = generalName.getComponentByName('uniformResourceIdentifier')
domain = get_domain(str(crl_url))
if domain:
crl_ocsp_domains.append(domain)
elif short_name == b'authorityInfoAccess':
decoded, _ = asn1_decoder(extension.get_data(), asn1Spec=AuthorityInfoAccessSyntax())
for section in decoded:
if str(section.getComponentByName('accessMethod')) == '1.3.6.1.5.5.7.48.1': # ocsp
ocsp_url = section.getComponentByName('accessLocation').getComponentByName(
'uniformResourceIdentifier')
domain = get_domain(str(ocsp_url))
if domain:
crl_ocsp_domains.append(domain)
return crl_ocsp_domains
def get_ips_from_domain(domain: str) -> Set[str]:
resolver = create_resolver()
ips = set()
try:
answers = dns.resolver.resolve(domain, 'A')
for rdata in answers:
ips.add(str(rdata))
except (NoAnswer, NXDOMAIN, NoNameservers, Timeout):
pass
try:
answers = dns.resolver.resolve(domain, 'AAAA')
for rdata in answers:
ips.add(str(rdata))
except (NoAnswer, NXDOMAIN, NoNameservers, Timeout):
pass
return ips
def get_ips_from_domains(domains) -> Set[str]:
p = multiprocessing.dummy.Pool(10)
ips = set()
for ips_for_domain in p.map(get_ips_from_domain, domains):
ips.update(ips_for_domain)
return ips
def process(file):
crl_ocsp_domains = set()
with open(get_abspath_source_file(file), 'r') as f_in:
for obj in csv.DictReader(f_in):
try:
pem = obj['PEM Info'].strip("'").replace('\r', '').replace('\n\n', '\n')
cert = load_certificate(FILETYPE_PEM, pem)
crl_ocsp_domains.update(get_crl_ocsp_domains(cert))
except Exception:
logging.exception("Could not process certificate")
warninglist = {
'name': 'CRL and OCSP domains',
'version': get_version(),
'description': 'Domains that belongs to CRL or OCSP',
'list': crl_ocsp_domains,
'matching_attributes': ["hostname", "domain", "domain|ip"],
'type': 'string',
}
write_to_file(warninglist, "crl-hostname")
warninglist = {
'name': 'CRL and OCSP IP addresses',
'version': get_version(),
'description': 'IP addresses that belongs to CRL or OCSP',
'list': get_ips_from_domains(crl_ocsp_domains),
'matching_attributes': ["ip-src", "ip-dst", "domain|ip", "ip-src|port", "ip-dst|port"],
'type': 'cidr',
}
write_to_file(warninglist, "crl-ip")
if __name__ == '__main__':
CA_known_intermediate_url = 'https://ccadb-public.secure.force.com/mozilla/PublicAllIntermediateCertsWithPEMCSV'
CA_known_intermediate_file = 'PublicAllIntermediateCertsWithPEMCSV.csv'
download_to_file(CA_known_intermediate_url, CA_known_intermediate_file)
process(CA_known_intermediate_file)