From 87c6925c7bcea8dabc4903d5403c1d5a9eafe611 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Thu, 23 Sep 2021 13:58:40 +0200 Subject: [PATCH] new: IPs lookup against phishtank --- lookyloo/lookyloo.py | 9 +++-- lookyloo/modules/phishtank.py | 58 +++++++++++++++++++++++++++++- website/web/__init__.py | 15 ++++++-- website/web/templates/modules.html | 15 +++++++- 4 files changed, 89 insertions(+), 8 deletions(-) diff --git a/lookyloo/lookyloo.py b/lookyloo/lookyloo.py index c598e2f7..b2847e5b 100644 --- a/lookyloo/lookyloo.py +++ b/lookyloo/lookyloo.py @@ -259,12 +259,15 @@ class Lookyloo(): else: to_return['pi'][ct.root_hartree.har.root_url] = self.pi.get_url_lookup(ct.root_hartree.har.root_url) if self.phishtank.available: - to_return['phishtank'] = {} + to_return['phishtank'] = {'urls': {}, 'ips_hits': {}} if ct.redirects: for redirect in ct.redirects: - to_return['phishtank'][redirect] = self.phishtank.get_url_lookup(redirect) + to_return['phishtank']['urls'][redirect] = self.phishtank.get_url_lookup(redirect) else: - to_return['phishtank'][ct.root_hartree.har.root_url] = self.phishtank.get_url_lookup(ct.root_hartree.har.root_url) + to_return['phishtank']['urls'][ct.root_hartree.har.root_url] = self.phishtank.get_url_lookup(ct.root_hartree.har.root_url) + ips_hits = self.phishtank.lookup_ips_capture(ct) + if ips_hits: + to_return['phishtank']['ips_hits'] = ips_hits if self.urlscan.available: info = self.get_info(capture_uuid) to_return['urlscan'] = {'submission': {}, 'result': {}} diff --git a/lookyloo/modules/phishtank.py b/lookyloo/modules/phishtank.py index 2f6ced99..13a4c4f2 100644 --- a/lookyloo/modules/phishtank.py +++ b/lookyloo/modules/phishtank.py @@ -5,7 +5,7 @@ import hashlib import json from datetime import date, datetime, timedelta, timezone from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, List from har2tree import CrawledTree from pyphishtanklookup import PhishtankLookup @@ -52,6 +52,32 @@ class Phishtank(): with cached_entries[0].open() as f: return json.load(f) + def lookup_ips_capture(self, crawled_tree: CrawledTree) -> Dict[str, List[Dict[str, Any]]]: + with (crawled_tree.root_hartree.har.path.parent / 'ips.json').open() as f: + ips_dump = json.load(f) + to_return: Dict[str, List[Dict[str, Any]]] = {} + for ip in set(ip for ips_list in ips_dump.values() for ip in ips_list): + entry = self.get_ip_lookup(ip) + if not entry: + continue + to_return[ip] = [] + for url in entry['urls']: + entry = self.get_url_lookup(url) + if entry: + to_return[ip].append(entry) + return to_return + + def get_ip_lookup(self, ip: str) -> Optional[Dict[str, Any]]: + ip_storage_dir = self.__get_cache_directory(ip) + if not ip_storage_dir.exists(): + return None + cached_entries = sorted(ip_storage_dir.glob('*'), reverse=True) + if not cached_entries: + return None + + with cached_entries[0].open() as f: + return json.load(f) + def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, auto_trigger: bool=False) -> Dict: '''Run the module on all the nodes up to the final redirect''' if not self.available: @@ -63,13 +89,43 @@ class Phishtank(): if crawled_tree.start_time <= datetime.now(timezone.utc) - timedelta(hours=70): return {'error': 'Capture to old, the response will be irrelevant.'} + # Check URLs up to the redirect if crawled_tree.redirects: for redirect in crawled_tree.redirects: self.url_lookup(redirect) else: self.url_lookup(crawled_tree.root_hartree.har.root_url) + + # Check all the IPs in the ips file of the capture + with (crawled_tree.root_hartree.har.path.parent / 'ips.json').open() as f: + ips_dump = json.load(f) + for ip in set(ip for ips_list in ips_dump.values() for ip in ips_list): + self.ip_lookup(ip) return {'success': 'Module triggered'} + def ip_lookup(self, ip: str) -> None: + '''Lookup for the URLs related to an IP on Phishtank lookup + Note: It will trigger a request to phishtank every time *until* there is a hit (it's cheap), then once a day. + ''' + if not self.available: + raise ConfigError('Phishtank not available, probably not enabled.') + + ip_storage_dir = self.__get_cache_directory(ip) + ip_storage_dir.mkdir(parents=True, exist_ok=True) + pt_file = ip_storage_dir / date.today().isoformat() + + if pt_file.exists(): + return + + urls = self.client.get_urls_by_ip(ip) + if not urls: + return + to_dump = {'ip': ip, 'urls': urls} + with pt_file.open('w') as _f: + json.dump(to_dump, _f) + for url in urls: + self.url_lookup(url) + def url_lookup(self, url: str) -> None: '''Lookup an URL on Phishtank lookup Note: It will trigger a request to phishtank every time *until* there is a hit (it's cheap), then once a day. diff --git a/website/web/__init__.py b/website/web/__init__.py index b0c883cd..cd2bc7d0 100644 --- a/website/web/__init__.py +++ b/website/web/__init__.py @@ -395,13 +395,22 @@ def modules(tree_uuid: str): continue pi_short_result[url] = full_report['results'][0]['tag_label'] - phishtank_short_result: Dict[str, str] = {} + phishtank_short_result: Dict[str, Dict] = {'urls': {}, 'ips_hits': {}} if 'phishtank' in modules_responses: pt = modules_responses.pop('phishtank') - for url, full_report in pt.items(): + for url, full_report in pt['urls'].items(): if not full_report: continue - phishtank_short_result[url] = full_report['phish_detail_url'] + phishtank_short_result['urls'][url] = full_report['phish_detail_url'] + + for ip, entries in pt['ips_hits'].items(): + if not entries: + continue + phishtank_short_result['ips_hits'] = {ip: []} + for full_report in entries: + phishtank_short_result['ips_hits'][ip].append(( + full_report['url'], + full_report['phish_detail_url'])) urlscan_to_display: Dict = {} if 'urlscan' in modules_responses and modules_responses.get('urlscan'): diff --git a/website/web/templates/modules.html b/website/web/templates/modules.html index 09f350f5..1a8b7116 100644 --- a/website/web/templates/modules.html +++ b/website/web/templates/modules.html @@ -30,9 +30,22 @@

Phishtank flagged the URLs below as phishing:

{% endif%}