chg: Improve urlscan support, get results.

pull/247/head
Raphaël Vinot 2021-08-11 15:26:12 +02:00
parent 7933670941
commit 3436f5bd4e
6 changed files with 128 additions and 54 deletions

View File

@ -33,7 +33,8 @@
"UrlScan": {
"apikey": null,
"autosubmit": false,
"allow_auto_trigger": false
"allow_auto_trigger": false,
"force_visibility": false
},
"_notes": {
"apikey": "null disables the module. Pass a string otherwise.",

View File

@ -398,10 +398,15 @@ class Lookyloo():
self.logger.warning(f'Unable to trigger the modules unless the tree ({capture_uuid}) is cached.')
return
capture_cache = self.capture_cache(capture_uuid)
self.pi.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger)
self.vt.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger)
self.uwhois.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger)
self.urlscan.capture_default_trigger(self.get_info(capture_uuid), force=force, auto_trigger=auto_trigger)
self.urlscan.capture_default_trigger(
self.get_info(capture_uuid),
visibility='unlisted' if (capture_cache and capture_cache.no_index) else 'public',
force=force, auto_trigger=auto_trigger)
def get_modules_responses(self, capture_uuid: str, /) -> Optional[Dict[str, Any]]:
'''Get the responses of the modules from the cached responses on the disk'''
@ -426,8 +431,14 @@ class Lookyloo():
else:
to_return['pi'][ct.root_hartree.har.root_url] = self.pi.get_url_lookup(ct.root_hartree.har.root_url)
if self.urlscan.available:
info = self.get_info(capture_uuid)
to_return['urlscan'] = {'submission': {}, 'result': {}}
to_return['urlscan']['submission'] = self.urlscan.url_submit(self.get_info(capture_uuid))
to_return['urlscan']['submission'] = self.urlscan.get_url_submission(info)
if to_return['urlscan']['submission'] and 'uuid' in to_return['urlscan']['submission']:
# The submission was done, try to get the results
result = self.urlscan.url_result(info)
if 'error' not in result:
to_return['urlscan']['result'] = result
return to_return
def get_misp_occurrences(self, capture_uuid: str, /) -> Optional[Dict[str, Set[str]]]:
@ -593,7 +604,9 @@ class Lookyloo():
self.logger.warning(f'No cache available for {capture_dir}.')
return None
try:
return CaptureCache(cached)
cc = CaptureCache(cached)
self._captures_index[cc.uuid] = cc
return cc
except LookylooException as e:
self.logger.warning(f'Cache ({capture_dir}) is invalid ({e}): {json.dumps(cached, indent=2)}')
return None

View File

@ -445,7 +445,6 @@ class VirusTotal():
if not self.available:
raise ConfigError('VirusTotal not available, probably no API key')
url_id = vt.url_id(url)
url_storage_dir = self.__get_cache_directory(url)
url_storage_dir.mkdir(parents=True, exist_ok=True)
vt_file = url_storage_dir / date.today().isoformat()
@ -458,6 +457,7 @@ class VirusTotal():
if not force and vt_file.exists():
return
url_id = vt.url_id(url)
for _ in range(3):
try:
url_information = self.client.get_object(f"/urls/{url_id}")
@ -476,6 +476,8 @@ class VirusTotal():
class UrlScan():
def __init__(self, config: Dict[str, Any]):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
if not config.get('apikey'):
self.available = False
return
@ -494,6 +496,19 @@ class UrlScan():
if config.get('autosubmit'):
self.autosubmit = True
if config.get('force_visibility'):
# Cases:
# 1. False: unlisted for hidden captures / public for others
# 2. "key": default visibility defined on urlscan.io
# 3. "public", "unlisted", "private": is set for all submissions
self.force_visibility = config['force_visibility']
else:
self.force_visibility = False
if self.force_visibility not in [False, 'key', 'public', 'unlisted', 'private']:
self.logger.warning("Invalid value for force_visibility, default to False (unlisted for hidden captures / public for others).")
self.force_visibility = False
self.storage_dir_urlscan = get_homedir() / 'urlscan'
self.storage_dir_urlscan.mkdir(parents=True, exist_ok=True)
@ -503,8 +518,10 @@ class UrlScan():
m.update(to_hash.encode())
return self.storage_dir_urlscan / m.hexdigest()
def get_url_submission(self, url: str, useragent: str, referer: str) -> Optional[Dict[str, Any]]:
url_storage_dir = self.__get_cache_directory(url, useragent, referer)
def get_url_submission(self, capture_info: Dict[str, Any]) -> Optional[Dict[str, Any]]:
url_storage_dir = self.__get_cache_directory(capture_info['url'],
capture_info['user_agent'],
capture_info['referer']) / 'submit'
if not url_storage_dir.exists():
return None
cached_entries = sorted(url_storage_dir.glob('*'), reverse=True)
@ -514,18 +531,28 @@ class UrlScan():
with cached_entries[0].open() as f:
return json.load(f)
def capture_default_trigger(self, capture_info: Dict[str, Any], /, *, force: bool=False, auto_trigger: bool=False) -> None:
def capture_default_trigger(self, capture_info: Dict[str, Any], /, visibility: str, *, force: bool=False, auto_trigger: bool=False) -> None:
'''Run the module on the initial URL'''
if not self.available:
return None
if auto_trigger and not self.allow_auto_trigger:
# NOTE: if auto_trigger is true, it means the request comes from the
# auto trigger feature (disabled by default)
# Each module can disable auto-trigger to avoid depleating the
# API limits.
return None
self.url_submit(capture_info, force)
self.url_submit(capture_info, visibility, force)
def __submit_url(self, url: str, useragent: str, referer: str) -> Dict:
data = {"url": url, "visibility": "unlisted",
'customagent': useragent, 'referer': referer}
def __submit_url(self, url: str, useragent: str, referer: str, visibility: str) -> Dict:
data = {"url": url, 'customagent': useragent, 'referer': referer}
if self.force_visibility is False:
data["visibility"] = visibility
elif self.force_visibility in ["public", "unlisted", "private"]:
data["visibility"] = self.force_visibility
else:
# default to key config on urlscan.io website
pass
response = self.client.post('https://urlscan.io/api/v1/scan/', json=data)
response.raise_for_status()
return response.json()
@ -535,7 +562,7 @@ class UrlScan():
response.raise_for_status()
return response.json()
def url_submit(self, capture_info: Dict[str, Any], force: bool=False) -> Dict:
def url_submit(self, capture_info: Dict[str, Any], visibility: str, force: bool=False) -> Dict:
'''Lookup an URL on urlscan.io
Note: force means 2 things:
* (re)scan of the URL
@ -561,7 +588,8 @@ class UrlScan():
try:
response = self.__submit_url(capture_info['url'],
capture_info['user_agent'],
capture_info['referer'])
capture_info['referer'],
visibility)
except requests.exceptions.HTTPError as e:
return {'error': e}
with urlscan_file_submit.open('w') as _f:
@ -569,9 +597,9 @@ class UrlScan():
return response
return {'error': 'Submitting is not allowed by the configuration'}
def url_result(self, url: str, useragent: str, referer: str):
def url_result(self, capture_info: Dict[str, Any]):
'''Get the result from a submission.'''
submission = self.get_url_submission(url, useragent, referer)
submission = self.get_url_submission(capture_info)
if submission and 'uuid' in submission:
uuid = submission['uuid']
if (self.storage_dir_urlscan / f'{uuid}.json').exists():

View File

@ -233,8 +233,8 @@ def hostnode_popup(tree_uuid: str, node_uuid: str):
@app.route('/tree/<string:tree_uuid>/trigger_modules', methods=['GET'])
def trigger_modules(tree_uuid: str):
force = True if request.args.get('force') else False
auto_trigger = True if request.args.get('auto_trigger') else False
force = True if (request.args.get('force') and request.args.get('force') == 'True') else False
auto_trigger = True if (request.args.get('auto_trigger') and request.args.get('auto_trigger') == 'True') else False
lookyloo.trigger_modules(tree_uuid, force=force, auto_trigger=auto_trigger)
return redirect(url_for('modules', tree_uuid=tree_uuid))
@ -392,11 +392,24 @@ def modules(tree_uuid: str):
continue
pi_short_result[url] = full_report['results'][0]['tag_label']
urlscan_permaurl: str = ''
urlscan_to_display: Dict = {}
if 'urlscan' in modules_responses:
urlscan = modules_responses.pop('urlscan')
urlscan_permaurl = urlscan['submission']['result']
return render_template('modules.html', uuid=tree_uuid, vt=vt_short_result, pi=pi_short_result, urlscan=urlscan_permaurl)
urlscan_to_display = {'permaurl': '', 'malicious': False, 'tags': []}
if urlscan['submission'].get('result'):
urlscan_to_display['permaurl'] = urlscan['submission']['result']
if urlscan['result']:
# We have a result available, get the verdicts
if (urlscan['result'].get('verdicts')
and urlscan['result']['verdicts'].get('overall')):
if urlscan['result']['verdicts']['overall'].get('malicious') is not None:
urlscan_to_display['malicious'] = urlscan['result']['verdicts']['overall']['malicious']
if urlscan['result']['verdicts']['overall'].get('tags'):
urlscan_to_display['tags'] = urlscan['result']['verdicts']['overall']['tags']
else:
# unable to run the query, probably an invalid key
pass
return render_template('modules.html', uuid=tree_uuid, vt=vt_short_result, pi=pi_short_result, urlscan=urlscan_to_display)
@app.route('/tree/<string:tree_uuid>/redirects', methods=['GET'])
@ -648,7 +661,7 @@ def get_index_params(request):
show_error: bool = True
category: str = ''
if hide_captures_with_error:
show_error = True if request.args.get('show_error') else False
show_error = True if (request.args.get('show_error') and request.args.get('show_error') == 'True') else False
if enable_categorization:
category = request.args['category'] if request.args.get('category') else ''
@ -805,7 +818,7 @@ def cookies_name_detail(cookie_name: str):
@app.route('/body_hashes/<string:body_hash>', methods=['GET'])
def body_hash_details(body_hash: str):
from_popup = request.args.get('from_popup')
from_popup = True if (request.args.get('from_popup') and request.args.get('from_popup') == 'True') else False
captures, domains = lookyloo.get_body_hash_investigator(body_hash.strip())
return render_template('body_hash.html', body_hash=body_hash, domains=domains, captures=captures, from_popup=from_popup)

View File

@ -1,30 +1,48 @@
{% from "macros.html" import shorten_string %}
<div>
{% if urlscan %}
<hr>
<center>
<h1 class="display-4">urlscan.io</h1>
<div>
<p>A scan was triggered for this capture,
<a href="{{urlscan['permaurl']}}">click to view it</a> on urlscan.io.</p>
{% if urlscan['malicious']%}
<p>It is considered malicious.</p>
{% endif%}
{% if urlscan['tags'] %}
<p>It is tagged as {{ ','.join(urlscan['tags']) }}.</p>
{% endif%}
</div>
</center>
{% endif%}
{% if vt %}
<center><h1 class="display-4">Virus Total</h1></center>
{% for url, entries in vt.items() %}
<div class="border-top my-3"></div>
<center>
<h3><small class="text-muted">URL</small>
{{ shorten_string(url, 50, with_title=True) }}
</h3>
</center>
{% if entries['malicious'] %}
<center>
<p class="lead">Detected as malicious by the following vendors</p>
<dl class="row">
{% for e in entries['malicious'] %}
<dt class="col-sm-3">{{ e[0] }}</dt>
<dd class="col-sm-3">{{ e[1] }}</dd>
{% endfor %}
</center>
</dl>
{% else %}
<p class="lead">No vendors consider this URL as malicious.</p>
{% endif%}
<h5 class="text-right"><a href="{{ entries['permaurl'] }}">Full report on VirusTotal</a></h5>
{% endfor %}
<hr>
<center><h1 class="display-4">Virus Total</h1></center>
{% for url, entries in vt.items() %}
<div class="border-top my-3"></div>
<center>
<h3><small class="text-muted">URL</small>
{{ shorten_string(url, 50, with_title=True) }}
</h3>
</center>
{% if entries['malicious'] %}
<center>
<p class="lead">Detected as malicious by the following vendors</p>
<dl class="row">
{% for e in entries['malicious'] %}
<dt class="col-sm-3">{{ e[0] }}</dt>
<dd class="col-sm-3">{{ e[1] }}</dd>
{% endfor %}
</center>
</dl>
{% else %}
<p class="lead">No vendors consider this URL as malicious.</p>
{% endif%}
<h5 class="text-right"><a href="{{ entries['permaurl'] }}">Full report on VirusTotal</a></h5>
{% endfor %}
{% endif%}
{% if pi%}
<center><h1 class="display-4">Phishing Initiative</h1></center>
@ -37,10 +55,4 @@
</center>
{% endfor %}
{% endif%}
{% if urlscan %}
<center><h1 class="display-4">urlscan.io</h1></center>
<div>
<p>A scan was triggered for this capture, <a href="{{urlscan}}">click see it</a> on urlscan.io.</p>
<p>Note that if you get a 404, it probably means the capture is still ongoing.</p>
</div>
{% endif%}
</div>

View File

@ -471,11 +471,18 @@
<div class="modal-dialog modal-xl" role="document">
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title" id="modulesModalLabel">Reports from 3rd party services</h5>
<h4 class="modal-title" id="modulesModalLabel">
Reports from 3rd party services
</h4>
</br>
<button type="button" class="close" data-dismiss="modal" aria-label="Close">
<span aria-hidden="true">×</span>
</button>
</div>
</br>
<center><h5>Note that if you get an error when you click on a
link below, it probably means the capture is still ongoing.
Try reloading the page after a few seconds.</h5></center>
<div class="modal-body">
... loading results from 3rd party modules ...
</div>