new: URL Haus module

Related: #505
pull/557/head
Raphaël Vinot 2022-11-30 17:52:12 +01:00
parent f30b84b638
commit af9515c123
7 changed files with 136 additions and 6 deletions

View File

@ -51,6 +51,11 @@
"url": "https://phishtankapi.circl.lu/",
"allow_auto_trigger": true
},
"URLhaus": {
"enabled": false,
"url": "https://urlhaus-api.abuse.ch/v1/",
"allow_auto_trigger": true
},
"Hashlookup": {
"enabled": false,
"url": "https://hashlookup.circl.lu/",
@ -73,6 +78,7 @@
"UniversalWhois": "Module to query a local instance of uWhoisd: https://github.com/Lookyloo/uwhoisd",
"UrlScan": "Module to query urlscan.io",
"Phishtank": "Module to query Phishtank Lookup (https://github.com/Lookyloo/phishtank-lookup). URL set to none means querying the public instance.",
"URLhaus": "Module to query URL Haus.",
"Hashlookup": "Module to query Hashlookup (https://github.com/adulau/hashlookup-server). URL set to none means querying the public instance.",
"FOX": "Submission only interface by and for CCCS",
"Pandora": "Submission only interface for https://github.com/pandora-analysis/",

View File

@ -46,7 +46,7 @@ from .helpers import (get_captures_dir, get_email_template,
from .indexing import Indexing
from .modules import (MISP, PhishingInitiative, UniversalWhois,
UrlScan, VirusTotal, Phishtank, Hashlookup,
RiskIQ, RiskIQError, Pandora)
RiskIQ, RiskIQError, Pandora, URLhaus)
class Lookyloo():
@ -103,6 +103,10 @@ class Lookyloo():
if not self.pandora.available:
self.logger.warning('Unable to setup the Pandora module')
self.urlhaus = URLhaus(get_config('modules', 'URLhaus'))
if not self.urlhaus.available:
self.logger.warning('Unable to setup the URLhaus module')
self.logger.info('Initializing context...')
self.context = Context()
self.logger.info('Context initialized.')
@ -274,7 +278,8 @@ class Lookyloo():
self.uwhois.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger)
self.hashlookup.capture_default_trigger(ct, auto_trigger=auto_trigger)
to_return: Dict[str, Dict] = {'PhishingInitiative': {}, 'VirusTotal': {}, 'UrlScan': {}}
to_return: Dict[str, Dict] = {'PhishingInitiative': {}, 'VirusTotal': {}, 'UrlScan': {},
'URLhaus': {}}
capture_cache = self.capture_cache(capture_uuid)
to_return['PhishingInitiative'] = self.pi.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger)
@ -284,6 +289,7 @@ class Lookyloo():
visibility='unlisted' if (capture_cache and capture_cache.no_index) else 'public',
force=force, auto_trigger=auto_trigger)
to_return['Phishtank'] = self.phishtank.capture_default_trigger(ct, auto_trigger=auto_trigger)
to_return['URLhaus'] = self.urlhaus.capture_default_trigger(ct, auto_trigger=auto_trigger)
return to_return
def get_modules_responses(self, capture_uuid: str, /) -> Optional[Dict[str, Any]]:
@ -318,6 +324,14 @@ class Lookyloo():
ips_hits = self.phishtank.lookup_ips_capture(ct)
if ips_hits:
to_return['phishtank']['ips_hits'] = ips_hits
if self.urlhaus.available:
to_return['urlhaus'] = {'urls': {}}
if ct.redirects:
for redirect in ct.redirects:
to_return['urlhaus']['urls'][redirect] = self.urlhaus.get_url_lookup(redirect)
else:
to_return['urlhaus']['urls'][ct.root_hartree.har.root_url] = self.urlhaus.get_url_lookup(ct.root_hartree.har.root_url)
if self.urlscan.available:
info = self.get_info(capture_uuid)
to_return['urlscan'] = {'submission': {}, 'result': {}}

View File

@ -11,3 +11,4 @@ from .pandora import Pandora # noqa
from .phishtank import Phishtank # noqa
from .hashlookup import HashlookupModule as Hashlookup # noqa
from .riskiq import RiskIQ, RiskIQError # noqa
from .urlhaus import URLhaus # noqa

View File

@ -0,0 +1,84 @@
#!/usr/bin/env python3
import json
from datetime import date
from typing import Any, Dict, Optional
import requests
from har2tree import CrawledTree
from ..default import ConfigError, get_homedir
from ..helpers import get_cache_directory
class URLhaus():
def __init__(self, config: Dict[str, Any]):
if not config.get('enabled'):
self.available = False
return
self.available = True
self.allow_auto_trigger = False
self.url = config.get('url')
if config.get('allow_auto_trigger'):
self.allow_auto_trigger = True
self.storage_dir_uh = get_homedir() / 'urlhaus'
self.storage_dir_uh.mkdir(parents=True, exist_ok=True)
def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]:
url_storage_dir = get_cache_directory(self.storage_dir_uh, url, 'url')
if not url_storage_dir.exists():
return None
cached_entries = sorted(url_storage_dir.glob('*'), reverse=True)
if not cached_entries:
return None
with cached_entries[0].open() as f:
return json.load(f)
def __url_result(self, url: str) -> Dict:
data = {'url': url}
response = requests.post(f'{self.url}/url/', data)
response.raise_for_status()
return response.json()
def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, auto_trigger: bool=False) -> Dict:
'''Run the module on all the nodes up to the final redirect'''
if not self.available:
return {'error': 'Module not available'}
if auto_trigger and not self.allow_auto_trigger:
return {'error': 'Auto trigger not allowed on module'}
# Check URLs up to the redirect
if crawled_tree.redirects:
for redirect in crawled_tree.redirects:
self.url_lookup(redirect)
else:
self.url_lookup(crawled_tree.root_hartree.har.root_url)
return {'success': 'Module triggered'}
def url_lookup(self, url: str) -> None:
'''Lookup an URL on URL haus
Note: It will trigger a request to URL haus every time *until* there is a hit (it's cheap), then once a day.
'''
if not self.available:
raise ConfigError('URL haus not available, probably not enabled.')
url_storage_dir = get_cache_directory(self.storage_dir_uh, url, 'url')
url_storage_dir.mkdir(parents=True, exist_ok=True)
uh_file = url_storage_dir / date.today().isoformat()
if uh_file.exists():
return
url_information = self.__url_result(url)
if not url_information:
url_storage_dir.rmdir()
return
with uh_file.open('w') as _f:
json.dump(url_information, _f)

6
poetry.lock generated
View File

@ -554,7 +554,7 @@ i18n = ["Babel (>=2.7)"]
[[package]]
name = "jsonschema"
version = "4.17.1"
version = "4.17.3"
description = "An implementation of JSON Schema validation for Python"
category = "main"
optional = false
@ -1907,8 +1907,8 @@ jinja2 = [
{file = "Jinja2-3.1.2.tar.gz", hash = "sha256:31351a702a408a9e7595a8fc6150fc3f43bb6bf7e319770cbc0db9df9437e852"},
]
jsonschema = [
{file = "jsonschema-4.17.1-py3-none-any.whl", hash = "sha256:410ef23dcdbca4eaedc08b850079179883c2ed09378bd1f760d4af4aacfa28d7"},
{file = "jsonschema-4.17.1.tar.gz", hash = "sha256:05b2d22c83640cde0b7e0aa329ca7754fbd98ea66ad8ae24aa61328dfe057fa3"},
{file = "jsonschema-4.17.3-py3-none-any.whl", hash = "sha256:a870ad254da1a8ca84b6a2905cac29d265f805acc57af304784962a2aa6508f6"},
{file = "jsonschema-4.17.3.tar.gz", hash = "sha256:0f864437ab8b6076ba6707453ef8f98a6a0d512a80e93f8abdb676f737ecb60d"},
]
lacuscore = [
{file = "lacuscore-1.1.8-py3-none-any.whl", hash = "sha256:efc747a4b0a26681e46e208a2e1522baa144bccaa6cd08a3e438a5a69de8fbea"},

View File

@ -442,6 +442,13 @@ def modules(tree_uuid: str):
full_report['url'],
full_report['phish_detail_url']))
urlhaus_short_result: Dict[str, List] = {'urls': []}
if 'urlhaus' in modules_responses:
# TODO: make a short result
uh = modules_responses.pop('urlhaus')
for url, results in uh['urls'].items():
urlhaus_short_result['urls'].append(results)
urlscan_to_display: Dict = {}
if 'urlscan' in modules_responses and modules_responses.get('urlscan'):
urlscan = modules_responses.pop('urlscan')
@ -467,7 +474,8 @@ def modules(tree_uuid: str):
pass
return render_template('modules.html', uuid=tree_uuid, vt=vt_short_result,
pi=pi_short_result, urlscan=urlscan_to_display,
phishtank=phishtank_short_result)
phishtank=phishtank_short_result,
urlhaus=urlhaus_short_result)
@app.route('/tree/<string:tree_uuid>/redirects', methods=['GET'])

View File

@ -53,6 +53,23 @@
</div>
</center>
{% endif%}
{% if urlhaus and urlhaus.get('urls') %}
<hr>
<center>
<h1 class="display-4">URL Haus</h1>
<div>
{% if urlhaus.get('urls') %}
<p class="lead">URL Haus knows the URLs below</p>
<dl class="row">
{% for entry in urlhaus['urls'] %}
<dt class="col-sm-7">{{ shorten_string(entry['url'], 150) }}</dt>
<dd class="col-sm-3"><a href="{{ entry['urlhaus_reference'] }}">View on URL Haus</a></li></dd>
{% endfor %}
</dl>
{% endif%}
</div>
</center>
{% endif%}
{% if vt %}
<hr>
<center><h1 class="display-4">Virus Total</h1></center>