From 8892893d9bf96a3d09b16736a173b0500a21d8ec Mon Sep 17 00:00:00 2001 From: LaZyDK Date: Fri, 25 Aug 2023 09:55:52 +0200 Subject: [PATCH 1/2] Initial commit --- tools/generate-cisco-umbrella-blockpage.py | 76 ++++++++++++++++++++++ tools/generator.py | 10 +++ 2 files changed, 86 insertions(+) create mode 100755 tools/generate-cisco-umbrella-blockpage.py diff --git a/tools/generate-cisco-umbrella-blockpage.py b/tools/generate-cisco-umbrella-blockpage.py new file mode 100755 index 0000000..3ac0852 --- /dev/null +++ b/tools/generate-cisco-umbrella-blockpage.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 + +import ipaddress +import logging +from typing import List, Tuple + +from generator import get_version, write_to_file, Dns, create_resolver + +# Static Umbrella blockpage addresses: https://docs.umbrella.com/deployment-umbrella/docs/block-page-ip-addresses +blockpage_ip_list = ['146.112.61.104', '::ffff:146.112.61.104', '146.112.61.105', '::ffff:146.112.61.105', '146.112.61.106', '::ffff:146.112.61.106', '146.112.61.107', '::ffff:146.112.61.107', '146.112.61.108', '::ffff:146.112.61.108', '146.112.61.110', '::ffff:146.112.61.110'] + + +def process(ipv4: List, ipv6: List, hostname: List): + # Cisco Umbrella blockpage Domains + umbrella_blockpage_hostname_dst = 'umbrella-blockpage-hostname' + umbrella_blockpage_warninglist = { + 'description': 'Event contains one or more Cisco Umbrella blockpage hostnames as attribute with an IDS flag set', + 'name': 'List of known Cisco Umbrella blockpage hostnames', + 'type': 'hostname', + 'matching_attributes': ['hostname', 'domain', 'url', 'domain|ip'] + } + generate(hostname, umbrella_blockpage_warninglist, umbrella_blockpage_hostname_dst) + + # Cisco Umbrella blockpage IPv4 + umbrella_blockpage_ipv4_dst = 'umbrella-blockpage-v4' + umbrella_blockpage_ipv4_warninglist = { + 'description': 'Event contains one or more public IPv4 DNS resolvers as attribute with an IDS flag set', + 'name': 'List of known IPv4 public DNS resolvers', + 'type': 'cidr', + 'matching_attributes': ['ip-src', 'ip-dst', 'domain|ip'] + } + generate(ipv4, umbrella_blockpage_ipv4_warninglist, umbrella_blockpage_ipv4_dst) + + # Cisco Umbrella blockpage IPv6 + umbrella_blockpage_ipv6_dst = 'umbrella-blockpage-v6' + umbrella_blockpage_ipv6_warninglist = { + 'description': 'Event contains one or more public IPv6 DNS resolvers as attribute with an IDS flag set', + 'name': 'List of known IPv6 public DNS resolvers', + 'type': 'cidr', + 'matching_attributes': ['ip-src', 'ip-dst', 'domain|ip'] + } + generate(ipv6, umbrella_blockpage_ipv6_warninglist, umbrella_blockpage_ipv6_dst) + + +def generate(data_list, warninglist, dst): + warninglist['version'] = get_version() + warninglist['list'] = data_list + + write_to_file(warninglist, dst) + + +def main(): + dns = Dns(create_resolver()) + + ipv4_addresses = [] + ipv6_addresses = [] + host_names = [] + + for ip in blockpage_ip_list: + host_names.append(dns.get_domain_from_ip(ip)) + + try: + ip = ipaddress.ip_address(ip) + + if ip.version == 4: + ipv4_addresses.append(ip.compressed) + elif ip.version == 6: + ipv6_addresses.append(ip.compressed) + + except ValueError as exc: + logging.warning(str(exc)) + + process(ipv4_addresses, ipv6_addresses, host_names) + +if __name__ == '__main__': + main() diff --git a/tools/generator.py b/tools/generator.py index 89c5e07..045801d 100644 --- a/tools/generator.py +++ b/tools/generator.py @@ -10,6 +10,7 @@ import gzip import requests import dns.exception import dns.resolver +import dns.reversename from dateutil.parser import parse as parsedate @@ -247,6 +248,15 @@ class Dns: return ranges + def get_domain_from_ip(self, ip: str) -> str: + try: + records = dns.reversename.from_address(ip) + except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.resolver.NoNameservers) as e: + logging.info("Could not fetch PTR record for IP {}: {}".format(ip, str(e))) + return [] + + return str(dns.resolver.resolve(records,"PTR")[0]).rstrip('.') + def main(): init_logging() From fc012367ecbea90b35d4be48fc921f6bc0243e11 Mon Sep 17 00:00:00 2001 From: Alexandre Dulaunoy Date: Fri, 25 Aug 2023 16:11:41 +0200 Subject: [PATCH 2/2] new: [cisco umbrella block pages] hostname, IPv4 and IPv6 addresses some minor clean-up and final update --- generate_all.sh | 1 + lists/umbrella-blockpage-hostname/list.json | 19 ++++ lists/umbrella-blockpage-v4/list.json | 19 ++++ lists/umbrella-blockpage-v6/list.json | 19 ++++ tools/generate-cisco-umbrella-blockpage.py | 41 +++++--- tools/generator.py | 109 ++++++++++++++------ 6 files changed, 161 insertions(+), 47 deletions(-) create mode 100644 lists/umbrella-blockpage-hostname/list.json create mode 100644 lists/umbrella-blockpage-v4/list.json create mode 100644 lists/umbrella-blockpage-v6/list.json diff --git a/generate_all.sh b/generate_all.sh index 20c4364..ec20dcc 100755 --- a/generate_all.sh +++ b/generate_all.sh @@ -37,6 +37,7 @@ python3 generate-microsoft-azure-appid.py python3 generate-chrome-crux-1m.py python3 generate-digitalside.py python3 generate-gptbot.py +python3 generate-cisco-umbrella-blockpage.py popd ./jq_all_the_things.sh diff --git a/lists/umbrella-blockpage-hostname/list.json b/lists/umbrella-blockpage-hostname/list.json new file mode 100644 index 0000000..9bfef0c --- /dev/null +++ b/lists/umbrella-blockpage-hostname/list.json @@ -0,0 +1,19 @@ +{ + "description": "Umbrella blockpage hostnames", + "list": [ + "hit-adult.opendns.com", + "hit-block.opendns.com", + "hit-botnet.opendns.com", + "hit-malware.opendns.com", + "hit-phish.opendns.com" + ], + "matching_attributes": [ + "hostname", + "domain", + "url", + "domain|ip" + ], + "name": "cisco-umbrella-blockpage-hostname", + "type": "hostname", + "version": 20230825 +} diff --git a/lists/umbrella-blockpage-v4/list.json b/lists/umbrella-blockpage-v4/list.json new file mode 100644 index 0000000..d36a0c5 --- /dev/null +++ b/lists/umbrella-blockpage-v4/list.json @@ -0,0 +1,19 @@ +{ + "description": "Cisco Umbrella blockpage in IPv4", + "list": [ + "146.112.61.104", + "146.112.61.105", + "146.112.61.106", + "146.112.61.107", + "146.112.61.108", + "146.112.61.110" + ], + "matching_attributes": [ + "ip-src", + "ip-dst", + "domain|ip" + ], + "name": "cisco-umbrella-blockpage-ipv4", + "type": "cidr", + "version": 20230825 +} diff --git a/lists/umbrella-blockpage-v6/list.json b/lists/umbrella-blockpage-v6/list.json new file mode 100644 index 0000000..911f3bf --- /dev/null +++ b/lists/umbrella-blockpage-v6/list.json @@ -0,0 +1,19 @@ +{ + "description": "Cisco Umbrella blockpage in IPv6", + "list": [ + "::ffff:9270:3d68", + "::ffff:9270:3d69", + "::ffff:9270:3d6a", + "::ffff:9270:3d6b", + "::ffff:9270:3d6c", + "::ffff:9270:3d6e" + ], + "matching_attributes": [ + "ip-src", + "ip-dst", + "domain|ip" + ], + "name": "cisco-umbrella-blockpage-ipv6", + "type": "cidr", + "version": 20230825 +} diff --git a/tools/generate-cisco-umbrella-blockpage.py b/tools/generate-cisco-umbrella-blockpage.py index 3ac0852..ffe4fb2 100755 --- a/tools/generate-cisco-umbrella-blockpage.py +++ b/tools/generate-cisco-umbrella-blockpage.py @@ -2,42 +2,55 @@ import ipaddress import logging -from typing import List, Tuple +from typing import List from generator import get_version, write_to_file, Dns, create_resolver # Static Umbrella blockpage addresses: https://docs.umbrella.com/deployment-umbrella/docs/block-page-ip-addresses -blockpage_ip_list = ['146.112.61.104', '::ffff:146.112.61.104', '146.112.61.105', '::ffff:146.112.61.105', '146.112.61.106', '::ffff:146.112.61.106', '146.112.61.107', '::ffff:146.112.61.107', '146.112.61.108', '::ffff:146.112.61.108', '146.112.61.110', '::ffff:146.112.61.110'] +blockpage_ip_list = [ + '146.112.61.104', + '::ffff:146.112.61.104', + '146.112.61.105', + '::ffff:146.112.61.105', + '146.112.61.106', + '::ffff:146.112.61.106', + '146.112.61.107', + '::ffff:146.112.61.107', + '146.112.61.108', + '::ffff:146.112.61.108', + '146.112.61.110', + '::ffff:146.112.61.110', +] def process(ipv4: List, ipv6: List, hostname: List): # Cisco Umbrella blockpage Domains umbrella_blockpage_hostname_dst = 'umbrella-blockpage-hostname' umbrella_blockpage_warninglist = { - 'description': 'Event contains one or more Cisco Umbrella blockpage hostnames as attribute with an IDS flag set', - 'name': 'List of known Cisco Umbrella blockpage hostnames', + 'description': 'Umbrella blockpage hostnames', + 'name': 'cisco-umbrella-blockpage-hostname', 'type': 'hostname', - 'matching_attributes': ['hostname', 'domain', 'url', 'domain|ip'] + 'matching_attributes': ['hostname', 'domain', 'url', 'domain|ip'], } generate(hostname, umbrella_blockpage_warninglist, umbrella_blockpage_hostname_dst) # Cisco Umbrella blockpage IPv4 umbrella_blockpage_ipv4_dst = 'umbrella-blockpage-v4' umbrella_blockpage_ipv4_warninglist = { - 'description': 'Event contains one or more public IPv4 DNS resolvers as attribute with an IDS flag set', - 'name': 'List of known IPv4 public DNS resolvers', + 'description': 'Cisco Umbrella blockpage in IPv4', + 'name': 'cisco-umbrella-blockpage-ipv4', 'type': 'cidr', - 'matching_attributes': ['ip-src', 'ip-dst', 'domain|ip'] + 'matching_attributes': ['ip-src', 'ip-dst', 'domain|ip'], } generate(ipv4, umbrella_blockpage_ipv4_warninglist, umbrella_blockpage_ipv4_dst) # Cisco Umbrella blockpage IPv6 umbrella_blockpage_ipv6_dst = 'umbrella-blockpage-v6' umbrella_blockpage_ipv6_warninglist = { - 'description': 'Event contains one or more public IPv6 DNS resolvers as attribute with an IDS flag set', - 'name': 'List of known IPv6 public DNS resolvers', + 'description': 'Cisco Umbrella blockpage in IPv6', + 'name': 'cisco-umbrella-blockpage-ipv6', 'type': 'cidr', - 'matching_attributes': ['ip-src', 'ip-dst', 'domain|ip'] + 'matching_attributes': ['ip-src', 'ip-dst', 'domain|ip'], } generate(ipv6, umbrella_blockpage_ipv6_warninglist, umbrella_blockpage_ipv6_dst) @@ -45,7 +58,6 @@ def process(ipv4: List, ipv6: List, hostname: List): def generate(data_list, warninglist, dst): warninglist['version'] = get_version() warninglist['list'] = data_list - write_to_file(warninglist, dst) @@ -55,7 +67,7 @@ def main(): ipv4_addresses = [] ipv6_addresses = [] host_names = [] - + for ip in blockpage_ip_list: host_names.append(dns.get_domain_from_ip(ip)) @@ -69,8 +81,9 @@ def main(): except ValueError as exc: logging.warning(str(exc)) - + process(ipv4_addresses, ipv6_addresses, host_names) + if __name__ == '__main__': main() diff --git a/tools/generator.py b/tools/generator.py index 045801d..225c34a 100644 --- a/tools/generator.py +++ b/tools/generator.py @@ -20,7 +20,8 @@ def init_logging(): LOG_DIR = path.join(current_folder, '../generators.log') logFormatter = logging.Formatter( - "[%(asctime)s] %(levelname)s::%(funcName)s()::%(message)s") + "[%(asctime)s] %(levelname)s::%(funcName)s()::%(message)s" + ) rootLogger = logging.getLogger() rootLogger.setLevel(logging.INFO) # Log to file @@ -42,32 +43,39 @@ def download_to_file(url, file, gzip_enable=False): caller = getmodulename(frame_records[1]).upper() user_agent = { - "User-agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:46.0) Gecko/20100101 Firefox/46.0"} + "User-agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:46.0) Gecko/20100101 Firefox/46.0" + } try: logging.info(f'download_to_file - fetching url: {url}') r = requests.head(url, headers=user_agent) url_datetime = parsedate(r.headers['Last-Modified']).astimezone() file_datetime = datetime.datetime.fromtimestamp( - path.getmtime(get_abspath_source_file(file))).astimezone() + path.getmtime(get_abspath_source_file(file)) + ).astimezone() - if(url_datetime > file_datetime): - logging.info('{} File on server is newer, so downloading update to {}'.format( - caller, get_abspath_source_file(file))) + if url_datetime > file_datetime: + logging.info( + '{} File on server is newer, so downloading update to {}'.format( + caller, get_abspath_source_file(file) + ) + ) actual_download_to_file(url, file, user_agent, gzip_enable=gzip_enable) else: - logging.info( - '{} File on server is older, nothing to do'.format(caller)) + logging.info('{} File on server is older, nothing to do'.format(caller)) except KeyError as exc: - logging.warning('{} KeyError in the headers. the {} header was not sent by server {}. Downloading file'.format( - caller, str(exc), url)) + logging.warning( + '{} KeyError in the headers. the {} header was not sent by server {}. Downloading file'.format( + caller, str(exc), url + ) + ) actual_download_to_file(url, file, user_agent, gzip_enable=gzip_enable) except FileNotFoundError as exc: logging.info( - "{} File didn't exist, so downloading {} from {}".format(caller, file, url)) + "{} File didn't exist, so downloading {} from {}".format(caller, file, url) + ) actual_download_to_file(url, file, user_agent, gzip_enable=gzip_enable) except Exception as exc: - logging.warning( - '{} General exception occured: {}.'.format(caller, str(exc))) + logging.warning('{} General exception occured: {}.'.format(caller, str(exc))) actual_download_to_file(url, file, user_agent, gzip_enable=gzip_enable) @@ -82,6 +90,7 @@ def actual_download_to_file(url, file, user_agent, gzip_enable=False): with open(get_abspath_source_file(file), 'wb') as fd: fd.write(file_content) + def process_stream(url): r = requests.get(url, stream=True) @@ -97,15 +106,15 @@ def process_stream(url): def download(url): user_agent = { - "User-agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:46.0) Gecko/20100101 Firefox/46.0"} + "User-agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:46.0) Gecko/20100101 Firefox/46.0" + } return requests.get(url, headers=user_agent) def get_abspath_list_file(dst): rel_path = getframeinfo(currentframe()).filename current_folder = path.dirname(path.abspath(rel_path)) - real_path = path.join( - current_folder, '../lists/{dst}/list.json'.format(dst=dst)) + real_path = path.join(current_folder, '../lists/{dst}/list.json'.format(dst=dst)) return path.abspath(path.realpath(real_path)) @@ -130,17 +139,16 @@ def unique_sorted_warninglist(warninglist): def write_to_file(warninglist, dst): frame_records = stack()[1] caller = getmodulename(frame_records[1]).upper() - try: warninglist = unique_sorted_warninglist(warninglist) with open(get_abspath_list_file(dst), 'w') as data_file: json.dump(warninglist, data_file, indent=2, sort_keys=True) data_file.write("\n") - logging.info('New warninglist written to {}.'.format( - get_abspath_list_file(dst))) + logging.info( + 'New warninglist written to {}.'.format(get_abspath_list_file(dst)) + ) except Exception as exc: - logging.error( - '{} General exception occurred: {}.'.format(caller, str(exc))) + logging.error('{} General exception occurred: {}.'.format(caller, str(exc))) def consolidate_networks(networks): @@ -193,39 +201,69 @@ class Dns: elif part.startswith("mx:"): output["mx"].append(part.split(":", 1)[1]) elif part.startswith("ip4:") or part.startswith("ip6:"): - output["ranges"].append(ipaddress.ip_network(part.split(":", 1)[1], strict=False)) + output["ranges"].append( + ipaddress.ip_network(part.split(":", 1)[1], strict=False) + ) return output - def get_ip_for_domain(self, domain: str) -> List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]: + def get_ip_for_domain( + self, domain: str + ) -> List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]: ranges = [] try: for ip in self.__resolver.query(domain, "a"): ranges.append(ipaddress.IPv4Address(str(ip))) - except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.resolver.NoNameservers): + except ( + dns.resolver.NoAnswer, + dns.resolver.NXDOMAIN, + dns.exception.Timeout, + dns.resolver.NoNameservers, + ): pass try: for ip in self.__resolver.query(domain, "aaaa"): ranges.append(ipaddress.IPv6Address(str(ip))) - except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.resolver.NoNameservers): + except ( + dns.resolver.NoAnswer, + dns.resolver.NXDOMAIN, + dns.exception.Timeout, + dns.resolver.NoNameservers, + ): pass return ranges - def get_mx_ips_for_domain(self, domain: str) -> List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]: + def get_mx_ips_for_domain( + self, domain: str + ) -> List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]: ranges = [] try: for rdata in self.__resolver.query(domain, "mx"): ranges += self.get_ip_for_domain(rdata.exchange) - except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.resolver.NoNameservers): + except ( + dns.resolver.NoAnswer, + dns.resolver.NXDOMAIN, + dns.exception.Timeout, + dns.resolver.NoNameservers, + ): pass return ranges - def get_ip_ranges_from_spf(self, domain: str) -> List[Union[ipaddress.IPv4Network, ipaddress.IPv6Network]]: + def get_ip_ranges_from_spf( + self, domain: str + ) -> List[Union[ipaddress.IPv4Network, ipaddress.IPv6Network]]: try: txt_records = self.__resolver.query(domain, "TXT") - except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.resolver.NoNameservers) as e: - logging.info("Could not fetch TXT record for domain {}: {}".format(domain, str(e))) + except ( + dns.resolver.NoAnswer, + dns.resolver.NXDOMAIN, + dns.exception.Timeout, + dns.resolver.NoNameservers, + ) as e: + logging.info( + "Could not fetch TXT record for domain {}: {}".format(domain, str(e)) + ) return [] ranges = [] @@ -251,11 +289,16 @@ class Dns: def get_domain_from_ip(self, ip: str) -> str: try: records = dns.reversename.from_address(ip) - except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.resolver.NoNameservers) as e: + except ( + dns.resolver.NoAnswer, + dns.resolver.NXDOMAIN, + dns.exception.Timeout, + dns.resolver.NoNameservers, + ) as e: logging.info("Could not fetch PTR record for IP {}: {}".format(ip, str(e))) - return [] + return [] - return str(dns.resolver.resolve(records,"PTR")[0]).rstrip('.') + return str(dns.resolver.resolve(records, "PTR")[0]).rstrip('.') def main():