new: List of known SMTP sending IP ranges

pull/190/head
Jakub Onderka 2021-06-13 14:49:57 +02:00
parent 133d70b7ce
commit e7401c9cbe
6 changed files with 1212 additions and 49 deletions

View File

@ -30,6 +30,7 @@ python3 generate-wikimedia.py
python3 generate-second-level-tlds.py
python3 generate-google-gcp.py
python3 generate-google-gmail-sending-ips.py
python3 generate-smtp-sending-ips.py
popd
./jq_all_the_things.sh

File diff suppressed because it is too large Load Diff

View File

@ -7,9 +7,9 @@ from OpenSSL.crypto import FILETYPE_PEM, load_certificate, X509
from pyasn1.codec.der.decoder import decode as asn1_decoder
from pyasn1_modules.rfc2459 import CRLDistPointsSyntax, AuthorityInfoAccessSyntax
from typing import List, Set
from dns.resolver import Resolver, NoAnswer, NXDOMAIN
from dns.resolver import NoAnswer, NXDOMAIN
from dns.exception import Timeout
from generator import download_to_file, get_version, write_to_file, get_abspath_source_file
from generator import download_to_file, get_version, write_to_file, get_abspath_source_file, create_resolver
def get_domain(url: str) -> str:
@ -44,9 +44,7 @@ def get_crl_ocsp_domains(cert: X509) -> List[str]:
def get_ips_from_domain(domain: str) -> Set[str]:
resolver = Resolver()
resolver.timeout = 5
resolver.lifetime = 5
resolver = create_resolver()
ips = set()
@ -65,10 +63,6 @@ def get_ips_from_domain(domain: str) -> Set[str]:
def get_ips_from_domains(domains) -> Set[str]:
resolver = Resolver()
resolver.timeout = 5
resolver.lifetime = 5
p = multiprocessing.dummy.Pool(10)
ips = set()
for ips_for_domain in p.map(get_ips_from_domain, domains):

View File

@ -1,48 +1,17 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from ipaddress import ip_network, IPv4Network, IPv6Network
from dns.resolver import Resolver
from typing import List, Union
from generator import get_version, write_to_file
class Spf:
def _parse_spf(self, spf: str) -> dict:
output = {"include": [], "ranges": []}
for part in spf.split(" "):
if part.startswith("include:"):
output["include"].append(part.split(":", 1)[1])
elif part.startswith("ip4:") or part.startswith("ip6:"):
output["ranges"].append(ip_network(part.split(":", 1)[1]))
return output
def _query_spf(self, resolver: Resolver, domain: str) -> List[Union[IPv4Network, IPv6Network]]:
ranges = []
for rdata in resolver.query(domain, "TXT"):
parsed = self._parse_spf(rdata.to_text())
ranges += parsed["ranges"]
for include in parsed["include"]:
ranges += self._query_spf(resolver, include)
return ranges
def get_list(self, domain: str) -> List[Union[IPv4Network, IPv6Network]]:
resolver = Resolver()
return self._query_spf(resolver, domain)
from generator import get_version, write_to_file, Spf, consolidate_networks, create_resolver
if __name__ == '__main__':
spf = Spf()
print()
spf = Spf(create_resolver())
warninglist = {
'name': "List of known Gmail sending IP ranges",
'version': get_version(),
'description': "List of known Gmail sending IP ranges (https://support.google.com/a/answer/27642?hl=en)",
'matching_attributes': ["ip-src", "ip-dst", "domain|ip"],
'type': 'cidr',
'list': [str(range) for range in spf.get_list("_spf.google.com")],
'list': consolidate_networks(spf.get_ip_ranges("gmail.com")),
}
write_to_file(warninglist, "google-gmail-sending-ips")

View File

@ -0,0 +1,87 @@
#!/usr/bin/env python3
import multiprocessing.dummy
from generator import get_version, write_to_file, Spf, consolidate_networks, create_resolver
# Source: https://github.com/mailcheck/mailcheck/wiki/List-of-Popular-Domains
domains = [
# Default domains included
"aol.com", "att.net", "comcast.net", "facebook.com", "gmail.com", "gmx.com", "googlemail.com",
"google.com", "hotmail.com", "hotmail.co.uk", "mac.com", "me.com", "mail.com", "msn.com",
"live.com", "sbcglobal.net", "verizon.net", "yahoo.com", "yahoo.co.uk",
# Other global domains
"email.com", "fastmail.fm", "games.com", "gmx.net", "hush.com", "hushmail.com", "icloud.com",
"iname.com", "inbox.com", "lavabit.com",
"love.com", "outlook.com", "pobox.com", "protonmail.ch", "protonmail.com", "tutanota.de", "tutanota.com",
"tutamail.com", "tuta.io",
"keemail.me", "rocketmail.com", "safe-mail.net", "wow.com", "ygm.com",
"ymail.com", "zoho.com", "yandex.com",
# United States ISP domains
"bellsouth.net", "charter.net", "cox.net", "earthlink.net", "juno.com",
# British ISP domains
"btinternet.com", "virginmedia.com", "blueyonder.co.uk", "live.co.uk",
"ntlworld.com", "orange.net", "sky.com", "talktalk.co.uk", "tiscali.co.uk",
"virgin.net", "bt.com",
# Domains used in Asia
"sina.com", "sina.cn", "qq.com", "naver.com", "hanmail.net", "daum.net", "nate.com", "yahoo.co.jp", "yahoo.co.kr",
"yahoo.co.id", "yahoo.co.in", "yahoo.com.sg", "yahoo.com.ph", "163.com", "yeah.net", "126.com", "21cn.com",
"aliyun.com", "foxmail.com",
# French ISP domains
"hotmail.fr", "live.fr", "laposte.net", "yahoo.fr", "wanadoo.fr", "orange.fr", "gmx.fr", "sfr.fr", "neuf.fr",
"free.fr",
# German ISP domains
"gmx.de", "hotmail.de", "live.de", "online.de", "t-online.de", "web.de", "yahoo.de",
# Italian ISP domains
"libero.it", "virgilio.it", "hotmail.it", "aol.it", "tiscali.it",
"alice.it", "live.it", "yahoo.it", "email.it", "tin.it", "poste.it", "teletu.it",
# Russian ISP domains
"bk.ru", "inbox.ru", "list.ru", "mail.ru", "rambler.ru", "yandex.by", "yandex.com", "yandex.kz", "yandex.ru",
"yandex.ua", "ya.ru",
# Belgian ISP domains
"hotmail.be", "live.be", "skynet.be", "voo.be", "tvcablenet.be", "telenet.be",
# Argentinian ISP domains
"hotmail.com.ar", "live.com.ar", "yahoo.com.ar", "fibertel.com.ar", "speedy.com.ar", "arnet.com.ar",
# Domains used in Mexico
"yahoo.com.mx", "live.com.mx", "hotmail.es", "hotmail.com.mx", "prodigy.net.mx",
# Domains used in Canada
"yahoo.ca", "hotmail.ca", "bell.net", "shaw.ca", "sympatico.ca", "rogers.com",
# Domains used in Brazil
"yahoo.com.br", "hotmail.com.br", "outlook.com.br", "uol.com.br", "bol.com.br", "terra.com.br", "ig.com.br",
"r7.com", "zipmail.com.br", "globo.com", "globomail.com", "oi.com.br",
# Custom extension
# Domains used in Czechia
"seznam.cz", "atlas.cz", "centrum.cz",
]
if __name__ == '__main__':
spf = Spf(create_resolver())
ranges = []
p = multiprocessing.dummy.Pool(20)
for domain_ranges in p.map(lambda d: spf.get_ip_ranges(d), domains):
ranges.extend(domain_ranges)
warninglist = {
'name': "List of known SMTP sending IP ranges",
'version': get_version(),
'description': "List of IP ranges for known SMTP servers-",
'matching_attributes': ["ip-src", "ip-dst", "domain|ip"],
'type': 'cidr',
'list': consolidate_networks(ranges),
}
write_to_file(warninglist, "smtp-sending-ips")

View File

@ -1,14 +1,14 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
import ipaddress
import json
import logging
from inspect import currentframe, getframeinfo, getmodulename, stack
from os import mkdir, path
from typing import List, Union
import requests
import dns.exception
import dns.resolver
from dateutil.parser import parse as parsedate
@ -137,23 +137,111 @@ def write_to_file(warninglist, dst):
def consolidate_networks(networks):
# Convert to IpNetwork
# Split to IPv4 and IPv6 ranges
ipv4_networks = []
ipv6_networks = []
for network in networks:
network = ipaddress.ip_network(network)
if isinstance(network, str):
# Convert string to IpNetwork
network = ipaddress.ip_network(network)
if network.version == 4:
ipv4_networks.append(network)
else:
ipv6_networks.append(network)
# Collapse
# Collapse ranges
networks_to_keep = list(map(str, ipaddress.collapse_addresses(ipv4_networks)))
networks_to_keep.extend(map(str, ipaddress.collapse_addresses(ipv6_networks)))
return networks_to_keep
def create_resolver() -> dns.resolver.Resolver:
resolver = dns.resolver.Resolver(configure=False)
resolver.timeout = 30
resolver.lifetime = 30
resolver.cache = dns.resolver.LRUCache()
resolver.nameservers = ["193.17.47.1", "185.43.135.1"] # CZ.NIC nameservers
return resolver
class Spf:
def __init__(self, resolver: dns.resolver.Resolver):
self.__resolver = resolver
def _parse_spf(self, domain: str, spf: str) -> dict:
output = {"include": [], "ranges": [], "a": [], "mx": []}
for part in spf.split(" "):
if part.startswith("include:"):
output["include"].append(part.split(":", 1)[1])
elif part.startswith("redirect="):
output["include"].append(part.split("=", 1)[1])
elif part == "a":
output["a"].append(domain)
elif part.startswith("a:"):
output["a"].append(part.split(":", 1)[1])
elif part == "mx":
output["mx"].append(domain)
elif part.startswith("mx:"):
output["mx"].append(part.split(":", 1)[1])
elif part.startswith("ip4:") or part.startswith("ip6:"):
output["ranges"].append(ipaddress.ip_network(part.split(":", 1)[1], strict=False))
return output
def _get_ip_for_domain(self, domain: str) -> List[Union[ipaddress.IPv4Network, ipaddress.IPv6Network]]:
ranges = []
try:
for ip in self.__resolver.query(domain, "a"):
ranges.append(ipaddress.ip_network(str(ip)))
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout):
pass
try:
for ip in self.__resolver.query(domain, "aaaa"):
ranges.append(ipaddress.ip_network(str(ip)))
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout):
pass
return ranges
def _get_mx_ips_for_domain(self, domain: str) -> List[Union[ipaddress.IPv4Network, ipaddress.IPv6Network]]:
ranges = []
try:
for rdata in self.__resolver.query(domain, "mx"):
ranges += self._get_ip_for_domain(rdata.exchange)
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout):
pass
return ranges
def get_ip_ranges(self, domain: str) -> List[Union[ipaddress.IPv4Network, ipaddress.IPv6Network]]:
try:
txt_records = self.__resolver.query(domain, "TXT")
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout) as e:
logging.info("Could not fetch TXT record for domain {}: {}".format(domain, str(e)))
return []
ranges = []
for rdata in txt_records:
record = "".join([s.decode("utf-8") for s in rdata.strings])
if not record.startswith("v=spf1"):
continue
parsed = self._parse_spf(domain, record)
ranges += parsed["ranges"]
for include in parsed["include"]:
ranges += self.get_ip_ranges(include)
for domain in parsed["a"]:
ranges += self._get_ip_for_domain(domain)
for mx in parsed["mx"]:
ranges += self._get_mx_ips_for_domain(mx)
return ranges
def main():
init_logging()