new: Integration with urlscan.io

pull/247/head
Raphaël Vinot 2021-08-10 17:38:47 +02:00
parent 463b5c6b7e
commit 7933670941
7 changed files with 157 additions and 4 deletions

View File

@ -30,6 +30,11 @@
"port": 4243, "port": 4243,
"allow_auto_trigger": true "allow_auto_trigger": true
}, },
"UrlScan": {
"apikey": null,
"autosubmit": false,
"allow_auto_trigger": false
},
"_notes": { "_notes": {
"apikey": "null disables the module. Pass a string otherwise.", "apikey": "null disables the module. Pass a string otherwise.",
"autosubmit": "Automatically submits the URL to the 3rd party service.", "autosubmit": "Automatically submits the URL to the 3rd party service.",

View File

@ -6,6 +6,7 @@ import time
import json import json
import traceback import traceback
import pickle import pickle
import pkg_resources
from typing import List, Optional, Dict, Union, Any, Set from typing import List, Optional, Dict, Union, Any, Set
from io import BufferedIOBase from io import BufferedIOBase
from pathlib import Path from pathlib import Path
@ -348,3 +349,9 @@ def try_make_file(filename: Path):
return True return True
except FileExistsError: except FileExistsError:
return False return False
@lru_cache(64)
def get_useragent_for_requests():
version = pkg_resources.get_distribution('lookyloo').version
return f'Lookyloo / {version}'

View File

@ -40,7 +40,7 @@ from .helpers import (get_homedir, get_socket_path, load_cookies, get_config,
safe_create_dir, get_email_template, load_pickle_tree, safe_create_dir, get_email_template, load_pickle_tree,
remove_pickle_tree, get_resources_hashes, get_taxonomies, uniq_domains, remove_pickle_tree, get_resources_hashes, get_taxonomies, uniq_domains,
CaptureStatus, try_make_file) CaptureStatus, try_make_file)
from .modules import VirusTotal, SaneJavaScript, PhishingInitiative, MISP, UniversalWhois from .modules import VirusTotal, SaneJavaScript, PhishingInitiative, MISP, UniversalWhois, UrlScan
from .capturecache import CaptureCache from .capturecache import CaptureCache
from .context import Context from .context import Context
from .indexing import Indexing from .indexing import Indexing
@ -90,6 +90,10 @@ class Lookyloo():
if not self.uwhois.available: if not self.uwhois.available:
self.logger.warning('Unable to setup the UniversalWhois module') self.logger.warning('Unable to setup the UniversalWhois module')
self.urlscan = UrlScan(get_config('modules', 'UrlScan'))
if not self.urlscan.available:
self.logger.warning('Unable to setup the UrlScan module')
self.context = Context(self.sanejs) self.context = Context(self.sanejs)
self._captures_index: Dict[str, CaptureCache] = {} self._captures_index: Dict[str, CaptureCache] = {}
@ -397,6 +401,7 @@ class Lookyloo():
self.pi.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger) self.pi.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger)
self.vt.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger) self.vt.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger)
self.uwhois.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger) self.uwhois.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger)
self.urlscan.capture_default_trigger(self.get_info(capture_uuid), force=force, auto_trigger=auto_trigger)
def get_modules_responses(self, capture_uuid: str, /) -> Optional[Dict[str, Any]]: def get_modules_responses(self, capture_uuid: str, /) -> Optional[Dict[str, Any]]:
'''Get the responses of the modules from the cached responses on the disk''' '''Get the responses of the modules from the cached responses on the disk'''
@ -420,6 +425,9 @@ class Lookyloo():
to_return['pi'][redirect] = self.pi.get_url_lookup(redirect) to_return['pi'][redirect] = self.pi.get_url_lookup(redirect)
else: else:
to_return['pi'][ct.root_hartree.har.root_url] = self.pi.get_url_lookup(ct.root_hartree.har.root_url) to_return['pi'][ct.root_hartree.har.root_url] = self.pi.get_url_lookup(ct.root_hartree.har.root_url)
if self.urlscan.available:
to_return['urlscan'] = {'submission': {}, 'result': {}}
to_return['urlscan']['submission'] = self.urlscan.url_submit(self.get_info(capture_uuid))
return to_return return to_return
def get_misp_occurrences(self, capture_uuid: str, /) -> Optional[Dict[str, Set[str]]]: def get_misp_occurrences(self, capture_uuid: str, /) -> Optional[Dict[str, Set[str]]]:

View File

@ -12,7 +12,7 @@ import logging
import socket import socket
import re import re
from .helpers import get_homedir, get_config, get_public_suffix_list from .helpers import get_homedir, get_config, get_public_suffix_list, get_useragent_for_requests
from .exceptions import ConfigError from .exceptions import ConfigError
import vt # type: ignore import vt # type: ignore
@ -20,6 +20,7 @@ from vt.error import APIError # type: ignore
from pysanejs import SaneJS from pysanejs import SaneJS
from pyeupi import PyEUPI from pyeupi import PyEUPI
from pymisp import PyMISP, MISPEvent, MISPAttribute from pymisp import PyMISP, MISPEvent, MISPAttribute
import requests
from har2tree import CrawledTree, HostNode, URLNode, Har2TreeError from har2tree import CrawledTree, HostNode, URLNode, Har2TreeError
@ -470,3 +471,117 @@ class VirusTotal():
self.client.scan_url(url) self.client.scan_url(url)
scan_requested = True scan_requested = True
time.sleep(5) time.sleep(5)
class UrlScan():
def __init__(self, config: Dict[str, Any]):
if not config.get('apikey'):
self.available = False
return
self.available = True
self.autosubmit = False
self.allow_auto_trigger = False
self.client = requests.session()
self.client.headers['User-Agent'] = get_useragent_for_requests()
self.client.headers['API-Key'] = config['apikey']
self.client.headers['Content-Type'] = 'application/json'
if config.get('allow_auto_trigger'):
self.allow_auto_trigger = True
if config.get('autosubmit'):
self.autosubmit = True
self.storage_dir_urlscan = get_homedir() / 'urlscan'
self.storage_dir_urlscan.mkdir(parents=True, exist_ok=True)
def __get_cache_directory(self, url: str, useragent: str, referer: str) -> Path:
m = hashlib.md5()
to_hash = f'{url}{useragent}{referer}'
m.update(to_hash.encode())
return self.storage_dir_urlscan / m.hexdigest()
def get_url_submission(self, url: str, useragent: str, referer: str) -> Optional[Dict[str, Any]]:
url_storage_dir = self.__get_cache_directory(url, useragent, referer)
if not url_storage_dir.exists():
return None
cached_entries = sorted(url_storage_dir.glob('*'), reverse=True)
if not cached_entries:
return None
with cached_entries[0].open() as f:
return json.load(f)
def capture_default_trigger(self, capture_info: Dict[str, Any], /, *, force: bool=False, auto_trigger: bool=False) -> None:
'''Run the module on the initial URL'''
if not self.available:
return None
if auto_trigger and not self.allow_auto_trigger:
return None
self.url_submit(capture_info, force)
def __submit_url(self, url: str, useragent: str, referer: str) -> Dict:
data = {"url": url, "visibility": "unlisted",
'customagent': useragent, 'referer': referer}
response = self.client.post('https://urlscan.io/api/v1/scan/', json=data)
response.raise_for_status()
return response.json()
def __url_result(self, uuid: str) -> Dict:
response = self.client.get(f'https://urlscan.io/api/v1/result/{uuid}')
response.raise_for_status()
return response.json()
def url_submit(self, capture_info: Dict[str, Any], force: bool=False) -> Dict:
'''Lookup an URL on urlscan.io
Note: force means 2 things:
* (re)scan of the URL
* re-fetch the object from urlscan.io even if we already did it today
Note: the URL will only be submitted if autosubmit is set to true in the config
'''
if not self.available:
raise ConfigError('UrlScan not available, probably no API key')
url_storage_dir = self.__get_cache_directory(capture_info['url'],
capture_info['user_agent'],
capture_info['referer']) / 'submit'
url_storage_dir.mkdir(parents=True, exist_ok=True)
urlscan_file_submit = url_storage_dir / date.today().isoformat()
if urlscan_file_submit.exists():
if not force:
with urlscan_file_submit.open('r') as _f:
return json.load(_f)
elif self.autosubmit:
# submit is allowed and we either force it, or it's just allowed
try:
response = self.__submit_url(capture_info['url'],
capture_info['user_agent'],
capture_info['referer'])
except requests.exceptions.HTTPError as e:
return {'error': e}
with urlscan_file_submit.open('w') as _f:
json.dump(response, _f)
return response
return {'error': 'Submitting is not allowed by the configuration'}
def url_result(self, url: str, useragent: str, referer: str):
'''Get the result from a submission.'''
submission = self.get_url_submission(url, useragent, referer)
if submission and 'uuid' in submission:
uuid = submission['uuid']
if (self.storage_dir_urlscan / f'{uuid}.json').exists():
with (self.storage_dir_urlscan / f'{uuid}.json').open() as _f:
return json.load(_f)
try:
result = self.__url_result(uuid)
except requests.exceptions.HTTPError as e:
return {'error': e}
with (self.storage_dir_urlscan / f'{uuid}.json').open('w') as _f:
json.dump(result, _f)
return result
return {'error': 'Submission incomplete or unavailable.'}

View File

@ -392,7 +392,11 @@ def modules(tree_uuid: str):
continue continue
pi_short_result[url] = full_report['results'][0]['tag_label'] pi_short_result[url] = full_report['results'][0]['tag_label']
return render_template('modules.html', uuid=tree_uuid, vt=vt_short_result, pi=pi_short_result) urlscan_permaurl: str = ''
if 'urlscan' in modules_responses:
urlscan = modules_responses.pop('urlscan')
urlscan_permaurl = urlscan['submission']['result']
return render_template('modules.html', uuid=tree_uuid, vt=vt_short_result, pi=pi_short_result, urlscan=urlscan_permaurl)
@app.route('/tree/<string:tree_uuid>/redirects', methods=['GET']) @app.route('/tree/<string:tree_uuid>/redirects', methods=['GET'])

View File

@ -57,6 +57,14 @@ class AuthToken(Resource):
return {'error': 'User/Password invalid.'} return {'error': 'User/Password invalid.'}
@api.route('/json/splash_status')
@api.doc(description='Get status of splash.')
class SplashStatus(Resource):
def get(self):
status, info = lookyloo.splash_status()
return {'is_up': status, 'info': info}
@api.route('/json/<string:capture_uuid>/status') @api.route('/json/<string:capture_uuid>/status')
@api.doc(description='Get the status of a capture', @api.doc(description='Get the status of a capture',
params={'capture_uuid': 'The UUID of the capture'}) params={'capture_uuid': 'The UUID of the capture'})

View File

@ -37,4 +37,10 @@
</center> </center>
{% endfor %} {% endfor %}
{% endif%} {% endif%}
</div> {% if urlscan %}
<center><h1 class="display-4">urlscan.io</h1></center>
<div>
<p>A scan was triggered for this capture, <a href="{{urlscan}}">click see it</a> on urlscan.io.</p>
<p>Note that if you get a 404, it probably means the capture is still ongoing.</p>
</div>
{% endif%}