diff --git a/generate_all.sh b/generate_all.sh index 20c4364..ec20dcc 100755 --- a/generate_all.sh +++ b/generate_all.sh @@ -37,6 +37,7 @@ python3 generate-microsoft-azure-appid.py python3 generate-chrome-crux-1m.py python3 generate-digitalside.py python3 generate-gptbot.py +python3 generate-cisco-umbrella-blockpage.py popd ./jq_all_the_things.sh diff --git a/lists/umbrella-blockpage-hostname/list.json b/lists/umbrella-blockpage-hostname/list.json new file mode 100644 index 0000000..9bfef0c --- /dev/null +++ b/lists/umbrella-blockpage-hostname/list.json @@ -0,0 +1,19 @@ +{ + "description": "Umbrella blockpage hostnames", + "list": [ + "hit-adult.opendns.com", + "hit-block.opendns.com", + "hit-botnet.opendns.com", + "hit-malware.opendns.com", + "hit-phish.opendns.com" + ], + "matching_attributes": [ + "hostname", + "domain", + "url", + "domain|ip" + ], + "name": "cisco-umbrella-blockpage-hostname", + "type": "hostname", + "version": 20230825 +} diff --git a/lists/umbrella-blockpage-v4/list.json b/lists/umbrella-blockpage-v4/list.json new file mode 100644 index 0000000..d36a0c5 --- /dev/null +++ b/lists/umbrella-blockpage-v4/list.json @@ -0,0 +1,19 @@ +{ + "description": "Cisco Umbrella blockpage in IPv4", + "list": [ + "146.112.61.104", + "146.112.61.105", + "146.112.61.106", + "146.112.61.107", + "146.112.61.108", + "146.112.61.110" + ], + "matching_attributes": [ + "ip-src", + "ip-dst", + "domain|ip" + ], + "name": "cisco-umbrella-blockpage-ipv4", + "type": "cidr", + "version": 20230825 +} diff --git a/lists/umbrella-blockpage-v6/list.json b/lists/umbrella-blockpage-v6/list.json new file mode 100644 index 0000000..911f3bf --- /dev/null +++ b/lists/umbrella-blockpage-v6/list.json @@ -0,0 +1,19 @@ +{ + "description": "Cisco Umbrella blockpage in IPv6", + "list": [ + "::ffff:9270:3d68", + "::ffff:9270:3d69", + "::ffff:9270:3d6a", + "::ffff:9270:3d6b", + "::ffff:9270:3d6c", + "::ffff:9270:3d6e" + ], + "matching_attributes": [ + "ip-src", + "ip-dst", + "domain|ip" + ], + "name": "cisco-umbrella-blockpage-ipv6", + "type": "cidr", + "version": 20230825 +} diff --git a/tools/generate-cisco-umbrella-blockpage.py b/tools/generate-cisco-umbrella-blockpage.py index 3ac0852..ffe4fb2 100755 --- a/tools/generate-cisco-umbrella-blockpage.py +++ b/tools/generate-cisco-umbrella-blockpage.py @@ -2,42 +2,55 @@ import ipaddress import logging -from typing import List, Tuple +from typing import List from generator import get_version, write_to_file, Dns, create_resolver # Static Umbrella blockpage addresses: https://docs.umbrella.com/deployment-umbrella/docs/block-page-ip-addresses -blockpage_ip_list = ['146.112.61.104', '::ffff:146.112.61.104', '146.112.61.105', '::ffff:146.112.61.105', '146.112.61.106', '::ffff:146.112.61.106', '146.112.61.107', '::ffff:146.112.61.107', '146.112.61.108', '::ffff:146.112.61.108', '146.112.61.110', '::ffff:146.112.61.110'] +blockpage_ip_list = [ + '146.112.61.104', + '::ffff:146.112.61.104', + '146.112.61.105', + '::ffff:146.112.61.105', + '146.112.61.106', + '::ffff:146.112.61.106', + '146.112.61.107', + '::ffff:146.112.61.107', + '146.112.61.108', + '::ffff:146.112.61.108', + '146.112.61.110', + '::ffff:146.112.61.110', +] def process(ipv4: List, ipv6: List, hostname: List): # Cisco Umbrella blockpage Domains umbrella_blockpage_hostname_dst = 'umbrella-blockpage-hostname' umbrella_blockpage_warninglist = { - 'description': 'Event contains one or more Cisco Umbrella blockpage hostnames as attribute with an IDS flag set', - 'name': 'List of known Cisco Umbrella blockpage hostnames', + 'description': 'Umbrella blockpage hostnames', + 'name': 'cisco-umbrella-blockpage-hostname', 'type': 'hostname', - 'matching_attributes': ['hostname', 'domain', 'url', 'domain|ip'] + 'matching_attributes': ['hostname', 'domain', 'url', 'domain|ip'], } generate(hostname, umbrella_blockpage_warninglist, umbrella_blockpage_hostname_dst) # Cisco Umbrella blockpage IPv4 umbrella_blockpage_ipv4_dst = 'umbrella-blockpage-v4' umbrella_blockpage_ipv4_warninglist = { - 'description': 'Event contains one or more public IPv4 DNS resolvers as attribute with an IDS flag set', - 'name': 'List of known IPv4 public DNS resolvers', + 'description': 'Cisco Umbrella blockpage in IPv4', + 'name': 'cisco-umbrella-blockpage-ipv4', 'type': 'cidr', - 'matching_attributes': ['ip-src', 'ip-dst', 'domain|ip'] + 'matching_attributes': ['ip-src', 'ip-dst', 'domain|ip'], } generate(ipv4, umbrella_blockpage_ipv4_warninglist, umbrella_blockpage_ipv4_dst) # Cisco Umbrella blockpage IPv6 umbrella_blockpage_ipv6_dst = 'umbrella-blockpage-v6' umbrella_blockpage_ipv6_warninglist = { - 'description': 'Event contains one or more public IPv6 DNS resolvers as attribute with an IDS flag set', - 'name': 'List of known IPv6 public DNS resolvers', + 'description': 'Cisco Umbrella blockpage in IPv6', + 'name': 'cisco-umbrella-blockpage-ipv6', 'type': 'cidr', - 'matching_attributes': ['ip-src', 'ip-dst', 'domain|ip'] + 'matching_attributes': ['ip-src', 'ip-dst', 'domain|ip'], } generate(ipv6, umbrella_blockpage_ipv6_warninglist, umbrella_blockpage_ipv6_dst) @@ -45,7 +58,6 @@ def process(ipv4: List, ipv6: List, hostname: List): def generate(data_list, warninglist, dst): warninglist['version'] = get_version() warninglist['list'] = data_list - write_to_file(warninglist, dst) @@ -55,7 +67,7 @@ def main(): ipv4_addresses = [] ipv6_addresses = [] host_names = [] - + for ip in blockpage_ip_list: host_names.append(dns.get_domain_from_ip(ip)) @@ -69,8 +81,9 @@ def main(): except ValueError as exc: logging.warning(str(exc)) - + process(ipv4_addresses, ipv6_addresses, host_names) + if __name__ == '__main__': main() diff --git a/tools/generator.py b/tools/generator.py index 045801d..225c34a 100644 --- a/tools/generator.py +++ b/tools/generator.py @@ -20,7 +20,8 @@ def init_logging(): LOG_DIR = path.join(current_folder, '../generators.log') logFormatter = logging.Formatter( - "[%(asctime)s] %(levelname)s::%(funcName)s()::%(message)s") + "[%(asctime)s] %(levelname)s::%(funcName)s()::%(message)s" + ) rootLogger = logging.getLogger() rootLogger.setLevel(logging.INFO) # Log to file @@ -42,32 +43,39 @@ def download_to_file(url, file, gzip_enable=False): caller = getmodulename(frame_records[1]).upper() user_agent = { - "User-agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:46.0) Gecko/20100101 Firefox/46.0"} + "User-agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:46.0) Gecko/20100101 Firefox/46.0" + } try: logging.info(f'download_to_file - fetching url: {url}') r = requests.head(url, headers=user_agent) url_datetime = parsedate(r.headers['Last-Modified']).astimezone() file_datetime = datetime.datetime.fromtimestamp( - path.getmtime(get_abspath_source_file(file))).astimezone() + path.getmtime(get_abspath_source_file(file)) + ).astimezone() - if(url_datetime > file_datetime): - logging.info('{} File on server is newer, so downloading update to {}'.format( - caller, get_abspath_source_file(file))) + if url_datetime > file_datetime: + logging.info( + '{} File on server is newer, so downloading update to {}'.format( + caller, get_abspath_source_file(file) + ) + ) actual_download_to_file(url, file, user_agent, gzip_enable=gzip_enable) else: - logging.info( - '{} File on server is older, nothing to do'.format(caller)) + logging.info('{} File on server is older, nothing to do'.format(caller)) except KeyError as exc: - logging.warning('{} KeyError in the headers. the {} header was not sent by server {}. Downloading file'.format( - caller, str(exc), url)) + logging.warning( + '{} KeyError in the headers. the {} header was not sent by server {}. Downloading file'.format( + caller, str(exc), url + ) + ) actual_download_to_file(url, file, user_agent, gzip_enable=gzip_enable) except FileNotFoundError as exc: logging.info( - "{} File didn't exist, so downloading {} from {}".format(caller, file, url)) + "{} File didn't exist, so downloading {} from {}".format(caller, file, url) + ) actual_download_to_file(url, file, user_agent, gzip_enable=gzip_enable) except Exception as exc: - logging.warning( - '{} General exception occured: {}.'.format(caller, str(exc))) + logging.warning('{} General exception occured: {}.'.format(caller, str(exc))) actual_download_to_file(url, file, user_agent, gzip_enable=gzip_enable) @@ -82,6 +90,7 @@ def actual_download_to_file(url, file, user_agent, gzip_enable=False): with open(get_abspath_source_file(file), 'wb') as fd: fd.write(file_content) + def process_stream(url): r = requests.get(url, stream=True) @@ -97,15 +106,15 @@ def process_stream(url): def download(url): user_agent = { - "User-agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:46.0) Gecko/20100101 Firefox/46.0"} + "User-agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:46.0) Gecko/20100101 Firefox/46.0" + } return requests.get(url, headers=user_agent) def get_abspath_list_file(dst): rel_path = getframeinfo(currentframe()).filename current_folder = path.dirname(path.abspath(rel_path)) - real_path = path.join( - current_folder, '../lists/{dst}/list.json'.format(dst=dst)) + real_path = path.join(current_folder, '../lists/{dst}/list.json'.format(dst=dst)) return path.abspath(path.realpath(real_path)) @@ -130,17 +139,16 @@ def unique_sorted_warninglist(warninglist): def write_to_file(warninglist, dst): frame_records = stack()[1] caller = getmodulename(frame_records[1]).upper() - try: warninglist = unique_sorted_warninglist(warninglist) with open(get_abspath_list_file(dst), 'w') as data_file: json.dump(warninglist, data_file, indent=2, sort_keys=True) data_file.write("\n") - logging.info('New warninglist written to {}.'.format( - get_abspath_list_file(dst))) + logging.info( + 'New warninglist written to {}.'.format(get_abspath_list_file(dst)) + ) except Exception as exc: - logging.error( - '{} General exception occurred: {}.'.format(caller, str(exc))) + logging.error('{} General exception occurred: {}.'.format(caller, str(exc))) def consolidate_networks(networks): @@ -193,39 +201,69 @@ class Dns: elif part.startswith("mx:"): output["mx"].append(part.split(":", 1)[1]) elif part.startswith("ip4:") or part.startswith("ip6:"): - output["ranges"].append(ipaddress.ip_network(part.split(":", 1)[1], strict=False)) + output["ranges"].append( + ipaddress.ip_network(part.split(":", 1)[1], strict=False) + ) return output - def get_ip_for_domain(self, domain: str) -> List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]: + def get_ip_for_domain( + self, domain: str + ) -> List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]: ranges = [] try: for ip in self.__resolver.query(domain, "a"): ranges.append(ipaddress.IPv4Address(str(ip))) - except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.resolver.NoNameservers): + except ( + dns.resolver.NoAnswer, + dns.resolver.NXDOMAIN, + dns.exception.Timeout, + dns.resolver.NoNameservers, + ): pass try: for ip in self.__resolver.query(domain, "aaaa"): ranges.append(ipaddress.IPv6Address(str(ip))) - except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.resolver.NoNameservers): + except ( + dns.resolver.NoAnswer, + dns.resolver.NXDOMAIN, + dns.exception.Timeout, + dns.resolver.NoNameservers, + ): pass return ranges - def get_mx_ips_for_domain(self, domain: str) -> List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]: + def get_mx_ips_for_domain( + self, domain: str + ) -> List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]: ranges = [] try: for rdata in self.__resolver.query(domain, "mx"): ranges += self.get_ip_for_domain(rdata.exchange) - except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.resolver.NoNameservers): + except ( + dns.resolver.NoAnswer, + dns.resolver.NXDOMAIN, + dns.exception.Timeout, + dns.resolver.NoNameservers, + ): pass return ranges - def get_ip_ranges_from_spf(self, domain: str) -> List[Union[ipaddress.IPv4Network, ipaddress.IPv6Network]]: + def get_ip_ranges_from_spf( + self, domain: str + ) -> List[Union[ipaddress.IPv4Network, ipaddress.IPv6Network]]: try: txt_records = self.__resolver.query(domain, "TXT") - except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.resolver.NoNameservers) as e: - logging.info("Could not fetch TXT record for domain {}: {}".format(domain, str(e))) + except ( + dns.resolver.NoAnswer, + dns.resolver.NXDOMAIN, + dns.exception.Timeout, + dns.resolver.NoNameservers, + ) as e: + logging.info( + "Could not fetch TXT record for domain {}: {}".format(domain, str(e)) + ) return [] ranges = [] @@ -251,11 +289,16 @@ class Dns: def get_domain_from_ip(self, ip: str) -> str: try: records = dns.reversename.from_address(ip) - except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.resolver.NoNameservers) as e: + except ( + dns.resolver.NoAnswer, + dns.resolver.NXDOMAIN, + dns.exception.Timeout, + dns.resolver.NoNameservers, + ) as e: logging.info("Could not fetch PTR record for IP {}: {}".format(ip, str(e))) - return [] + return [] - return str(dns.resolver.resolve(records,"PTR")[0]).rstrip('.') + return str(dns.resolver.resolve(records, "PTR")[0]).rstrip('.') def main():