diff --git a/.gitignore b/.gitignore index a06fd55..e652d89 100644 --- a/.gitignore +++ b/.gitignore @@ -118,6 +118,7 @@ dump.rdb # Local config files config/*.json config/*.json.bkp +config/takedown_filters.ini # user defined known content known_content_user/ diff --git a/config/takedown_filters.ini.sample b/config/takedown_filters.ini.sample new file mode 100644 index 0000000..53cb4ff --- /dev/null +++ b/config/takedown_filters.ini.sample @@ -0,0 +1,28 @@ +[abuse] +ignore= + ripe.net$ + arin.net$ + apnic.net$ + idnic.net$ + peering@ + domreg@ + registrar-email + akamai.com$ + google.com$ + arin-noc@tucows.com + dnstech@tucows.com + avermeer@tucows.com + arin-maint@tucows.com + amzn-noc-contact@amazon.com + aws-routing-poc@amazon.com + aws-rpki-routing-poc@amazon.com + +[replacelist] +noc@as5577.net=abuse@as5577.net +abuse@godaddy.com=abuse@godaddy.com,phishing@godaddy.com,malware@godaddy.com + +[domain] +ignore= + apple.com + paypal.com + google.com diff --git a/lookyloo/helpers.py b/lookyloo/helpers.py index c859f57..30da76b 100644 --- a/lookyloo/helpers.py +++ b/lookyloo/helpers.py @@ -2,10 +2,12 @@ from __future__ import annotations +import configparser import hashlib import json import logging import os +import re import time from datetime import datetime, timedelta, date @@ -53,31 +55,55 @@ def get_resources_hashes(har2tree_container: CrawledTree | HostNode | URLNode) - return all_ressources_hashes -@lru_cache(64) +@lru_cache def get_taxonomies() -> Taxonomies: return Taxonomies() -@lru_cache(64) +@lru_cache def get_public_suffix_list() -> PublicSuffixList: """Initialize Public Suffix List""" # TODO (?): fetch the list return PublicSuffixList() -@lru_cache(64) +@lru_cache def get_captures_dir() -> Path: capture_dir = get_homedir() / 'scraped' safe_create_dir(capture_dir) return capture_dir -@lru_cache(64) +@lru_cache def get_email_template() -> str: with (get_homedir() / 'config' / 'email.tmpl').open() as f: return f.read() +@lru_cache +def load_takedown_filters() -> tuple[re.Pattern[str], re.Pattern[str], dict[str, list[str]]]: + filter_ini_file = get_homedir() / 'config' / 'takedown_filters.ini' + if not filter_ini_file.exists(): + raise LookylooException(f'Unable to find the takedown filters file: {filter_ini_file}') + config = configparser.ConfigParser() + config.optionxform = str # type: ignore[method-assign,assignment] + config.read(filter_ini_file) + # compile the domains and subdomains to ignore + ignore_domains_list = [] + for d in [d.strip() for d in config['domain']['ignore'].split('\n') if d.strip()]: + ignore_domain = f'{d}$' + ignore_subdomain = rf'.*\.{ignore_domain}' + ignore_domains_list.append(ignore_domain) + ignore_domains_list.append(ignore_subdomain) + ignore_domains = re.compile('|'.join(ignore_domains_list)) + # Compile the emails addresses to ignore + ignore_emails = re.compile('|'.join([i.strip() for i in config['abuse']['ignore'].split('\n') if i.strip()])) + # Make the replace list a dictionary + replace_list = {to_replace: config['replacelist'][to_replace].split(',') for to_replace in config['replacelist']} + + return ignore_domains, ignore_emails, replace_list + + def make_dirs_list(root_dir: Path) -> list[Path]: directories = [] year_now = date.today().year diff --git a/lookyloo/lookyloo.py b/lookyloo/lookyloo.py index 622794d..90d2336 100644 --- a/lookyloo/lookyloo.py +++ b/lookyloo/lookyloo.py @@ -3,7 +3,6 @@ from __future__ import annotations import base64 -import configparser import copy import gzip import json @@ -58,7 +57,8 @@ from .exceptions import (MissingCaptureDirectory, from .helpers import (get_captures_dir, get_email_template, get_resources_hashes, get_taxonomies, uniq_domains, ParsedUserAgent, load_cookies, UserAgents, - get_useragent_for_requests, make_ts_from_dirname) + get_useragent_for_requests, make_ts_from_dirname, load_takedown_filters + ) from .modules import (MISPs, PhishingInitiative, UniversalWhois, UrlScan, VirusTotal, Phishtank, Hashlookup, RiskIQ, RiskIQError, Pandora, URLhaus, CIRCLPDNS) @@ -722,6 +722,9 @@ class Lookyloo(): 'asns': {}, # ASN: [list of contacts from whois] 'all_emails': set() } + + if to_return['contacts']: + to_return['all_emails'] |= set(to_return['contacts']) to_return['ips'] = {ip: self.uwhois.whois(ip, contact_email_only=True) for ip in set(hostnode.resolved_ips['v4']) | set(hostnode.resolved_ips['v6'])} to_return['asns'] = {asn['asn']: self.uwhois.whois(f'AS{asn["asn"]}', contact_email_only=True) for asn in hostnode.ipasn.values()} @@ -763,57 +766,44 @@ class Lookyloo(): to_return['all_emails'] = list(to_return['all_emails']) return to_return - def takedown_filtered(self, hostnode: HostNode) -> dict[str, Any] | None: - config = configparser.ConfigParser() - config.optionxform = str # type: ignore[method-assign,assignment] - ignorelist_path = get_homedir() / 'config' / 'ignore_list.ini' - config.read(ignorelist_path) + def takedown_filtered(self, hostnode: HostNode) -> set[str] | None: + ignore_domains, ignore_emails, replace_list = load_takedown_filters() # checking if domain should be ignored - domains = config['domain']['ignore'] pattern = r"(https?://)?(www\d?\.)?(?P[\w\.-]+\.\w+)(/\S*)?" - match = re.match(pattern, hostnode.name) - if match: - for regex in domains: - ignore_domain = regex + "$" - ignore_subdomain = r".*\." + regex + "$" - if (re.match(ignore_domain, match.group("domain")) or re.match(ignore_subdomain, match.group("domain"))) and regex.strip(): - return None - result = self.takedown_details(hostnode) - # ignoring mails - final_mails = [] - replacelist = config['replacelist'] - ignorelist = config['abuse']['ignore'].split('\n') - for mail in result['all_emails']: - # ignoring mails - is_valid = True - for regex in ignorelist: - if not regex.strip(): - continue - match = re.search(regex.strip(), mail) - if match: - is_valid = False - break - if is_valid: - # replacing emails - for replaceable in replacelist: - if mail == replaceable: - final_mails += replacelist[replaceable].split(',') - is_valid = False - break - if is_valid: - # mail is valid and can be added to the result - final_mails += [mail] - result['all_emails'] = final_mails - return result + if match := re.match(pattern, hostnode.name): + # NOTE: the name may not be a hostname if the capture is not a URL. + if re.search(ignore_domains, match.group("domain")): + self.logger.debug(f'{hostnode.name} is ignored') + return None + else: + # The name is not a domain, we won't have any contacts. + self.logger.debug(f'{hostnode.name} is not a domain, no contacts.') + return None - def get_filtered_emails(self, capture_uuid: str, detailed: bool=False) -> set[str] | dict[str, str]: - info = self.contacts(capture_uuid) - final_mails = set() - for i in info: - for mail in i['all_emails']: + result = self.takedown_details(hostnode) + # process mails + final_mails: set[str] = set() + for mail in result['all_emails']: + if re.search(ignore_emails, mail): + self.logger.debug(f'{mail} is ignored') + continue + if mail in replace_list: + final_mails |= set(replace_list[mail]) + else: final_mails.add(mail) return final_mails + def contacts_filtered(self, capture_uuid: str, /) -> set[str]: + capture = self.get_crawled_tree(capture_uuid) + rendered_hostnode = self.get_hostnode_from_tree(capture_uuid, capture.root_hartree.rendered_node.hostnode_uuid) + result: set[str] = set() + for node in reversed(rendered_hostnode.get_ancestors()): + if mails := self.takedown_filtered(node): + result |= mails + if mails := self.takedown_filtered(rendered_hostnode): + result |= mails + return result + def contacts(self, capture_uuid: str, /) -> list[dict[str, Any]]: capture = self.get_crawled_tree(capture_uuid) rendered_hostnode = self.get_hostnode_from_tree(capture_uuid, capture.root_hartree.rendered_node.hostnode_uuid) diff --git a/lookyloo/modules/uwhois.py b/lookyloo/modules/uwhois.py index 538d3ea..2c36505 100644 --- a/lookyloo/modules/uwhois.py +++ b/lookyloo/modules/uwhois.py @@ -76,11 +76,9 @@ class UniversalWhois(AbstractModule): ... def whois(self, query: str, contact_email_only: bool=False) -> str | list[str]: - - EMAIL_REGEX = rb'(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)' - if not self.available: return '' + bytes_whois = b'' with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.connect((self.server, self.port)) @@ -108,5 +106,5 @@ class UniversalWhois(AbstractModule): # We either dont have an abuse-c object or it does not exist if not contact_email_only: return bytes_whois.decode() - emails = list(set(re.findall(EMAIL_REGEX, bytes_whois))) + emails = list(set(re.findall(rb'[\w\.-]+@[\w\.-]+', bytes_whois))) return [e.decode() for e in sorted(emails)] diff --git a/website/web/genericapi.py b/website/web/genericapi.py index 75cead6..9e51bf7 100644 --- a/website/web/genericapi.py +++ b/website/web/genericapi.py @@ -582,6 +582,7 @@ class Comparables(Resource): # type: ignore[misc] takedown_fields = api.model('TakedownFields', { 'capture_uuid': fields.String(description="The UUID of the capture.", required=True), + 'filter': fields.Boolean(description="If true, the response is a list of emails.", default=False), }) @@ -589,12 +590,17 @@ takedown_fields = api.model('TakedownFields', { @api.doc(description='Get information for triggering a takedown request') class Takedown(Resource): # type: ignore[misc] @api.doc(body=takedown_fields) # type: ignore[misc] - def post(self) -> list[dict[str, Any]] | dict[str, str]: + def post(self) -> list[dict[str, Any]] | dict[str, str] | list[str]: + if not lookyloo.uwhois.available: + return {'error': 'UWhois not available, cannot get contacts.'} parameters: dict[str, Any] = request.get_json(force=True) capture_uuid = parameters.get('capture_uuid') if not capture_uuid: return {'error': f'Invalid request: {parameters}'} - return lookyloo.contacts(capture_uuid) + if parameters.get('filter'): + return list(lookyloo.contacts_filtered(capture_uuid)) + else: + return lookyloo.contacts(capture_uuid) # Admin stuff