new: IPs lookup against phishtank

pull/265/head
Raphaël Vinot 2021-09-23 13:58:40 +02:00
parent e6e61089b6
commit 87c6925c7b
4 changed files with 89 additions and 8 deletions

View File

@ -259,12 +259,15 @@ class Lookyloo():
else: else:
to_return['pi'][ct.root_hartree.har.root_url] = self.pi.get_url_lookup(ct.root_hartree.har.root_url) to_return['pi'][ct.root_hartree.har.root_url] = self.pi.get_url_lookup(ct.root_hartree.har.root_url)
if self.phishtank.available: if self.phishtank.available:
to_return['phishtank'] = {} to_return['phishtank'] = {'urls': {}, 'ips_hits': {}}
if ct.redirects: if ct.redirects:
for redirect in ct.redirects: for redirect in ct.redirects:
to_return['phishtank'][redirect] = self.phishtank.get_url_lookup(redirect) to_return['phishtank']['urls'][redirect] = self.phishtank.get_url_lookup(redirect)
else: else:
to_return['phishtank'][ct.root_hartree.har.root_url] = self.phishtank.get_url_lookup(ct.root_hartree.har.root_url) to_return['phishtank']['urls'][ct.root_hartree.har.root_url] = self.phishtank.get_url_lookup(ct.root_hartree.har.root_url)
ips_hits = self.phishtank.lookup_ips_capture(ct)
if ips_hits:
to_return['phishtank']['ips_hits'] = ips_hits
if self.urlscan.available: if self.urlscan.available:
info = self.get_info(capture_uuid) info = self.get_info(capture_uuid)
to_return['urlscan'] = {'submission': {}, 'result': {}} to_return['urlscan'] = {'submission': {}, 'result': {}}

View File

@ -5,7 +5,7 @@ import hashlib
import json import json
from datetime import date, datetime, timedelta, timezone from datetime import date, datetime, timedelta, timezone
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Optional from typing import Any, Dict, Optional, List
from har2tree import CrawledTree from har2tree import CrawledTree
from pyphishtanklookup import PhishtankLookup from pyphishtanklookup import PhishtankLookup
@ -52,6 +52,32 @@ class Phishtank():
with cached_entries[0].open() as f: with cached_entries[0].open() as f:
return json.load(f) return json.load(f)
def lookup_ips_capture(self, crawled_tree: CrawledTree) -> Dict[str, List[Dict[str, Any]]]:
with (crawled_tree.root_hartree.har.path.parent / 'ips.json').open() as f:
ips_dump = json.load(f)
to_return: Dict[str, List[Dict[str, Any]]] = {}
for ip in set(ip for ips_list in ips_dump.values() for ip in ips_list):
entry = self.get_ip_lookup(ip)
if not entry:
continue
to_return[ip] = []
for url in entry['urls']:
entry = self.get_url_lookup(url)
if entry:
to_return[ip].append(entry)
return to_return
def get_ip_lookup(self, ip: str) -> Optional[Dict[str, Any]]:
ip_storage_dir = self.__get_cache_directory(ip)
if not ip_storage_dir.exists():
return None
cached_entries = sorted(ip_storage_dir.glob('*'), reverse=True)
if not cached_entries:
return None
with cached_entries[0].open() as f:
return json.load(f)
def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, auto_trigger: bool=False) -> Dict: def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, auto_trigger: bool=False) -> Dict:
'''Run the module on all the nodes up to the final redirect''' '''Run the module on all the nodes up to the final redirect'''
if not self.available: if not self.available:
@ -63,13 +89,43 @@ class Phishtank():
if crawled_tree.start_time <= datetime.now(timezone.utc) - timedelta(hours=70): if crawled_tree.start_time <= datetime.now(timezone.utc) - timedelta(hours=70):
return {'error': 'Capture to old, the response will be irrelevant.'} return {'error': 'Capture to old, the response will be irrelevant.'}
# Check URLs up to the redirect
if crawled_tree.redirects: if crawled_tree.redirects:
for redirect in crawled_tree.redirects: for redirect in crawled_tree.redirects:
self.url_lookup(redirect) self.url_lookup(redirect)
else: else:
self.url_lookup(crawled_tree.root_hartree.har.root_url) self.url_lookup(crawled_tree.root_hartree.har.root_url)
# Check all the IPs in the ips file of the capture
with (crawled_tree.root_hartree.har.path.parent / 'ips.json').open() as f:
ips_dump = json.load(f)
for ip in set(ip for ips_list in ips_dump.values() for ip in ips_list):
self.ip_lookup(ip)
return {'success': 'Module triggered'} return {'success': 'Module triggered'}
def ip_lookup(self, ip: str) -> None:
'''Lookup for the URLs related to an IP on Phishtank lookup
Note: It will trigger a request to phishtank every time *until* there is a hit (it's cheap), then once a day.
'''
if not self.available:
raise ConfigError('Phishtank not available, probably not enabled.')
ip_storage_dir = self.__get_cache_directory(ip)
ip_storage_dir.mkdir(parents=True, exist_ok=True)
pt_file = ip_storage_dir / date.today().isoformat()
if pt_file.exists():
return
urls = self.client.get_urls_by_ip(ip)
if not urls:
return
to_dump = {'ip': ip, 'urls': urls}
with pt_file.open('w') as _f:
json.dump(to_dump, _f)
for url in urls:
self.url_lookup(url)
def url_lookup(self, url: str) -> None: def url_lookup(self, url: str) -> None:
'''Lookup an URL on Phishtank lookup '''Lookup an URL on Phishtank lookup
Note: It will trigger a request to phishtank every time *until* there is a hit (it's cheap), then once a day. Note: It will trigger a request to phishtank every time *until* there is a hit (it's cheap), then once a day.

View File

@ -395,13 +395,22 @@ def modules(tree_uuid: str):
continue continue
pi_short_result[url] = full_report['results'][0]['tag_label'] pi_short_result[url] = full_report['results'][0]['tag_label']
phishtank_short_result: Dict[str, str] = {} phishtank_short_result: Dict[str, Dict] = {'urls': {}, 'ips_hits': {}}
if 'phishtank' in modules_responses: if 'phishtank' in modules_responses:
pt = modules_responses.pop('phishtank') pt = modules_responses.pop('phishtank')
for url, full_report in pt.items(): for url, full_report in pt['urls'].items():
if not full_report: if not full_report:
continue continue
phishtank_short_result[url] = full_report['phish_detail_url'] phishtank_short_result['urls'][url] = full_report['phish_detail_url']
for ip, entries in pt['ips_hits'].items():
if not entries:
continue
phishtank_short_result['ips_hits'] = {ip: []}
for full_report in entries:
phishtank_short_result['ips_hits'][ip].append((
full_report['url'],
full_report['phish_detail_url']))
urlscan_to_display: Dict = {} urlscan_to_display: Dict = {}
if 'urlscan' in modules_responses and modules_responses.get('urlscan'): if 'urlscan' in modules_responses and modules_responses.get('urlscan'):

View File

@ -30,9 +30,22 @@
<div> <div>
<p>Phishtank flagged the URLs below as phishing:</p> <p>Phishtank flagged the URLs below as phishing:</p>
<ul> <ul>
{% for url, permaurl in phishtank.items() %} {% for url, permaurl in phishtank['urls'].items() %}
<li>{{ shorten_string(url, 150) }}: <a href="{{ permaurl }}">click to view it</a> on phishtank.</p></li> <li>{{ shorten_string(url, 150) }}: <a href="{{ permaurl }}">click to view it</a> on phishtank.</p></li>
{% endfor %} {% endfor %}
{% if phishtank.get('ips_hits') %}
<p>The IPs below are in the tree and are flagged as phishing on Phishtank, might on on other URLs:</p>
<ul>
{% for ip, entries in phishtank['ips_hits'].items() %}
<li>{{ ip }}:
<ul>
{% for related_url, permaurl in entries %}
<li>{{ shorten_string(related_url, 150) }}: <a href="{{ permaurl }}">click to view it</a> on phishtank.</p><li>
{% endfor %}
</ul>
</li>
{% endfor %}
{% endif%}
</div> </div>
</center> </center>
{% endif%} {% endif%}