new: [cisco umbrella block pages] hostname, IPv4 and IPv6 addresses

some minor clean-up and final update
pull/252/head
Alexandre Dulaunoy 2023-08-25 16:11:41 +02:00
parent 6dfce67778
commit fc012367ec
No known key found for this signature in database
GPG Key ID: 09E2CD4944E6CBCD
6 changed files with 161 additions and 47 deletions

View File

@ -37,6 +37,7 @@ python3 generate-microsoft-azure-appid.py
python3 generate-chrome-crux-1m.py
python3 generate-digitalside.py
python3 generate-gptbot.py
python3 generate-cisco-umbrella-blockpage.py
popd
./jq_all_the_things.sh

View File

@ -0,0 +1,19 @@
{
"description": "Umbrella blockpage hostnames",
"list": [
"hit-adult.opendns.com",
"hit-block.opendns.com",
"hit-botnet.opendns.com",
"hit-malware.opendns.com",
"hit-phish.opendns.com"
],
"matching_attributes": [
"hostname",
"domain",
"url",
"domain|ip"
],
"name": "cisco-umbrella-blockpage-hostname",
"type": "hostname",
"version": 20230825
}

View File

@ -0,0 +1,19 @@
{
"description": "Cisco Umbrella blockpage in IPv4",
"list": [
"146.112.61.104",
"146.112.61.105",
"146.112.61.106",
"146.112.61.107",
"146.112.61.108",
"146.112.61.110"
],
"matching_attributes": [
"ip-src",
"ip-dst",
"domain|ip"
],
"name": "cisco-umbrella-blockpage-ipv4",
"type": "cidr",
"version": 20230825
}

View File

@ -0,0 +1,19 @@
{
"description": "Cisco Umbrella blockpage in IPv6",
"list": [
"::ffff:9270:3d68",
"::ffff:9270:3d69",
"::ffff:9270:3d6a",
"::ffff:9270:3d6b",
"::ffff:9270:3d6c",
"::ffff:9270:3d6e"
],
"matching_attributes": [
"ip-src",
"ip-dst",
"domain|ip"
],
"name": "cisco-umbrella-blockpage-ipv6",
"type": "cidr",
"version": 20230825
}

View File

@ -2,42 +2,55 @@
import ipaddress
import logging
from typing import List, Tuple
from typing import List
from generator import get_version, write_to_file, Dns, create_resolver
# Static Umbrella blockpage addresses: https://docs.umbrella.com/deployment-umbrella/docs/block-page-ip-addresses
blockpage_ip_list = ['146.112.61.104', '::ffff:146.112.61.104', '146.112.61.105', '::ffff:146.112.61.105', '146.112.61.106', '::ffff:146.112.61.106', '146.112.61.107', '::ffff:146.112.61.107', '146.112.61.108', '::ffff:146.112.61.108', '146.112.61.110', '::ffff:146.112.61.110']
blockpage_ip_list = [
'146.112.61.104',
'::ffff:146.112.61.104',
'146.112.61.105',
'::ffff:146.112.61.105',
'146.112.61.106',
'::ffff:146.112.61.106',
'146.112.61.107',
'::ffff:146.112.61.107',
'146.112.61.108',
'::ffff:146.112.61.108',
'146.112.61.110',
'::ffff:146.112.61.110',
]
def process(ipv4: List, ipv6: List, hostname: List):
# Cisco Umbrella blockpage Domains
umbrella_blockpage_hostname_dst = 'umbrella-blockpage-hostname'
umbrella_blockpage_warninglist = {
'description': 'Event contains one or more Cisco Umbrella blockpage hostnames as attribute with an IDS flag set',
'name': 'List of known Cisco Umbrella blockpage hostnames',
'description': 'Umbrella blockpage hostnames',
'name': 'cisco-umbrella-blockpage-hostname',
'type': 'hostname',
'matching_attributes': ['hostname', 'domain', 'url', 'domain|ip']
'matching_attributes': ['hostname', 'domain', 'url', 'domain|ip'],
}
generate(hostname, umbrella_blockpage_warninglist, umbrella_blockpage_hostname_dst)
# Cisco Umbrella blockpage IPv4
umbrella_blockpage_ipv4_dst = 'umbrella-blockpage-v4'
umbrella_blockpage_ipv4_warninglist = {
'description': 'Event contains one or more public IPv4 DNS resolvers as attribute with an IDS flag set',
'name': 'List of known IPv4 public DNS resolvers',
'description': 'Cisco Umbrella blockpage in IPv4',
'name': 'cisco-umbrella-blockpage-ipv4',
'type': 'cidr',
'matching_attributes': ['ip-src', 'ip-dst', 'domain|ip']
'matching_attributes': ['ip-src', 'ip-dst', 'domain|ip'],
}
generate(ipv4, umbrella_blockpage_ipv4_warninglist, umbrella_blockpage_ipv4_dst)
# Cisco Umbrella blockpage IPv6
umbrella_blockpage_ipv6_dst = 'umbrella-blockpage-v6'
umbrella_blockpage_ipv6_warninglist = {
'description': 'Event contains one or more public IPv6 DNS resolvers as attribute with an IDS flag set',
'name': 'List of known IPv6 public DNS resolvers',
'description': 'Cisco Umbrella blockpage in IPv6',
'name': 'cisco-umbrella-blockpage-ipv6',
'type': 'cidr',
'matching_attributes': ['ip-src', 'ip-dst', 'domain|ip']
'matching_attributes': ['ip-src', 'ip-dst', 'domain|ip'],
}
generate(ipv6, umbrella_blockpage_ipv6_warninglist, umbrella_blockpage_ipv6_dst)
@ -45,7 +58,6 @@ def process(ipv4: List, ipv6: List, hostname: List):
def generate(data_list, warninglist, dst):
warninglist['version'] = get_version()
warninglist['list'] = data_list
write_to_file(warninglist, dst)
@ -72,5 +84,6 @@ def main():
process(ipv4_addresses, ipv6_addresses, host_names)
if __name__ == '__main__':
main()

View File

@ -20,7 +20,8 @@ def init_logging():
LOG_DIR = path.join(current_folder, '../generators.log')
logFormatter = logging.Formatter(
"[%(asctime)s] %(levelname)s::%(funcName)s()::%(message)s")
"[%(asctime)s] %(levelname)s::%(funcName)s()::%(message)s"
)
rootLogger = logging.getLogger()
rootLogger.setLevel(logging.INFO)
# Log to file
@ -42,32 +43,39 @@ def download_to_file(url, file, gzip_enable=False):
caller = getmodulename(frame_records[1]).upper()
user_agent = {
"User-agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:46.0) Gecko/20100101 Firefox/46.0"}
"User-agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:46.0) Gecko/20100101 Firefox/46.0"
}
try:
logging.info(f'download_to_file - fetching url: {url}')
r = requests.head(url, headers=user_agent)
url_datetime = parsedate(r.headers['Last-Modified']).astimezone()
file_datetime = datetime.datetime.fromtimestamp(
path.getmtime(get_abspath_source_file(file))).astimezone()
path.getmtime(get_abspath_source_file(file))
).astimezone()
if(url_datetime > file_datetime):
logging.info('{} File on server is newer, so downloading update to {}'.format(
caller, get_abspath_source_file(file)))
if url_datetime > file_datetime:
logging.info(
'{} File on server is newer, so downloading update to {}'.format(
caller, get_abspath_source_file(file)
)
)
actual_download_to_file(url, file, user_agent, gzip_enable=gzip_enable)
else:
logging.info(
'{} File on server is older, nothing to do'.format(caller))
logging.info('{} File on server is older, nothing to do'.format(caller))
except KeyError as exc:
logging.warning('{} KeyError in the headers. the {} header was not sent by server {}. Downloading file'.format(
caller, str(exc), url))
logging.warning(
'{} KeyError in the headers. the {} header was not sent by server {}. Downloading file'.format(
caller, str(exc), url
)
)
actual_download_to_file(url, file, user_agent, gzip_enable=gzip_enable)
except FileNotFoundError as exc:
logging.info(
"{} File didn't exist, so downloading {} from {}".format(caller, file, url))
"{} File didn't exist, so downloading {} from {}".format(caller, file, url)
)
actual_download_to_file(url, file, user_agent, gzip_enable=gzip_enable)
except Exception as exc:
logging.warning(
'{} General exception occured: {}.'.format(caller, str(exc)))
logging.warning('{} General exception occured: {}.'.format(caller, str(exc)))
actual_download_to_file(url, file, user_agent, gzip_enable=gzip_enable)
@ -82,6 +90,7 @@ def actual_download_to_file(url, file, user_agent, gzip_enable=False):
with open(get_abspath_source_file(file), 'wb') as fd:
fd.write(file_content)
def process_stream(url):
r = requests.get(url, stream=True)
@ -97,15 +106,15 @@ def process_stream(url):
def download(url):
user_agent = {
"User-agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:46.0) Gecko/20100101 Firefox/46.0"}
"User-agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:46.0) Gecko/20100101 Firefox/46.0"
}
return requests.get(url, headers=user_agent)
def get_abspath_list_file(dst):
rel_path = getframeinfo(currentframe()).filename
current_folder = path.dirname(path.abspath(rel_path))
real_path = path.join(
current_folder, '../lists/{dst}/list.json'.format(dst=dst))
real_path = path.join(current_folder, '../lists/{dst}/list.json'.format(dst=dst))
return path.abspath(path.realpath(real_path))
@ -130,17 +139,16 @@ def unique_sorted_warninglist(warninglist):
def write_to_file(warninglist, dst):
frame_records = stack()[1]
caller = getmodulename(frame_records[1]).upper()
try:
warninglist = unique_sorted_warninglist(warninglist)
with open(get_abspath_list_file(dst), 'w') as data_file:
json.dump(warninglist, data_file, indent=2, sort_keys=True)
data_file.write("\n")
logging.info('New warninglist written to {}.'.format(
get_abspath_list_file(dst)))
logging.info(
'New warninglist written to {}.'.format(get_abspath_list_file(dst))
)
except Exception as exc:
logging.error(
'{} General exception occurred: {}.'.format(caller, str(exc)))
logging.error('{} General exception occurred: {}.'.format(caller, str(exc)))
def consolidate_networks(networks):
@ -193,39 +201,69 @@ class Dns:
elif part.startswith("mx:"):
output["mx"].append(part.split(":", 1)[1])
elif part.startswith("ip4:") or part.startswith("ip6:"):
output["ranges"].append(ipaddress.ip_network(part.split(":", 1)[1], strict=False))
output["ranges"].append(
ipaddress.ip_network(part.split(":", 1)[1], strict=False)
)
return output
def get_ip_for_domain(self, domain: str) -> List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]:
def get_ip_for_domain(
self, domain: str
) -> List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]:
ranges = []
try:
for ip in self.__resolver.query(domain, "a"):
ranges.append(ipaddress.IPv4Address(str(ip)))
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.resolver.NoNameservers):
except (
dns.resolver.NoAnswer,
dns.resolver.NXDOMAIN,
dns.exception.Timeout,
dns.resolver.NoNameservers,
):
pass
try:
for ip in self.__resolver.query(domain, "aaaa"):
ranges.append(ipaddress.IPv6Address(str(ip)))
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.resolver.NoNameservers):
except (
dns.resolver.NoAnswer,
dns.resolver.NXDOMAIN,
dns.exception.Timeout,
dns.resolver.NoNameservers,
):
pass
return ranges
def get_mx_ips_for_domain(self, domain: str) -> List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]:
def get_mx_ips_for_domain(
self, domain: str
) -> List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]:
ranges = []
try:
for rdata in self.__resolver.query(domain, "mx"):
ranges += self.get_ip_for_domain(rdata.exchange)
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.resolver.NoNameservers):
except (
dns.resolver.NoAnswer,
dns.resolver.NXDOMAIN,
dns.exception.Timeout,
dns.resolver.NoNameservers,
):
pass
return ranges
def get_ip_ranges_from_spf(self, domain: str) -> List[Union[ipaddress.IPv4Network, ipaddress.IPv6Network]]:
def get_ip_ranges_from_spf(
self, domain: str
) -> List[Union[ipaddress.IPv4Network, ipaddress.IPv6Network]]:
try:
txt_records = self.__resolver.query(domain, "TXT")
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.resolver.NoNameservers) as e:
logging.info("Could not fetch TXT record for domain {}: {}".format(domain, str(e)))
except (
dns.resolver.NoAnswer,
dns.resolver.NXDOMAIN,
dns.exception.Timeout,
dns.resolver.NoNameservers,
) as e:
logging.info(
"Could not fetch TXT record for domain {}: {}".format(domain, str(e))
)
return []
ranges = []
@ -251,11 +289,16 @@ class Dns:
def get_domain_from_ip(self, ip: str) -> str:
try:
records = dns.reversename.from_address(ip)
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.resolver.NoNameservers) as e:
except (
dns.resolver.NoAnswer,
dns.resolver.NXDOMAIN,
dns.exception.Timeout,
dns.resolver.NoNameservers,
) as e:
logging.info("Could not fetch PTR record for IP {}: {}".format(ip, str(e)))
return []
return str(dns.resolver.resolve(records,"PTR")[0]).rstrip('.')
return str(dns.resolver.resolve(records, "PTR")[0]).rstrip('.')
def main():