lookyloo/website/web/genericapi.py

553 lines
23 KiB
Python
Raw Normal View History

2021-06-07 22:12:23 +02:00
#!/usr/bin/env python3
import base64
import hashlib
2021-09-07 12:59:31 +02:00
import json
2022-11-02 12:23:41 +01:00
from typing import Any, Dict, Optional
2021-06-07 22:12:23 +02:00
import flask_login # type: ignore
2021-09-07 12:59:31 +02:00
from flask import request, send_file
from flask_restx import Namespace, Resource, abort, fields # type: ignore
2021-06-07 22:12:23 +02:00
from werkzeug.security import check_password_hash
from lacuscore import CaptureStatus as CaptureStatusCore
from pylacus import CaptureStatus as CaptureStatusPy
2021-06-07 22:12:23 +02:00
from lookyloo.lookyloo import Lookyloo
from lookyloo.comparator import Comparator
2021-06-07 22:12:23 +02:00
2021-09-07 12:59:31 +02:00
from .helpers import build_users_table, load_user_from_request, src_request_ip
2021-06-07 22:12:23 +02:00
api = Namespace('GenericAPI', description='Generic Lookyloo API', path='/')
lookyloo: Lookyloo = Lookyloo()
2023-01-31 11:22:43 +01:00
comparator: Comparator = Comparator()
2021-06-07 22:12:23 +02:00
def api_auth_check(method):
if flask_login.current_user.is_authenticated or load_user_from_request(request):
return method
abort(403, 'Authentication required.')
token_request_fields = api.model('AuthTokenFields', {
'username': fields.String(description="Your username", required=True),
'password': fields.String(description="Your password", required=True),
})
@api.route('/json/get_token')
@api.doc(description='Get the API token required for authenticated calls')
class AuthToken(Resource):
users_table = build_users_table()
@api.param('username', 'Your username')
@api.param('password', 'Your password')
def get(self):
2022-11-02 12:23:41 +01:00
username: Optional[str] = request.args['username'] if request.args.get('username') else None
password: Optional[str] = request.args['password'] if request.args.get('password') else None
if username and password and username in self.users_table and check_password_hash(self.users_table[username]['password'], password):
2021-06-07 22:12:23 +02:00
return {'authkey': self.users_table[username]['authkey']}
return {'error': 'User/Password invalid.'}, 401
2021-06-07 22:12:23 +02:00
@api.doc(body=token_request_fields)
def post(self):
2022-11-02 12:23:41 +01:00
auth: Dict = request.get_json(force=True) # type: ignore
2021-06-07 22:12:23 +02:00
if 'username' in auth and 'password' in auth: # Expected keys in json
if (auth['username'] in self.users_table
and check_password_hash(self.users_table[auth['username']]['password'], auth['password'])):
return {'authkey': self.users_table[auth['username']]['authkey']}
return {'error': 'User/Password invalid.'}, 401
2021-06-07 22:12:23 +02:00
@api.route('/json/<string:capture_uuid>/status')
2021-06-07 22:12:23 +02:00
@api.doc(description='Get the status of a capture',
params={'capture_uuid': 'The UUID of the capture'})
2021-06-07 22:12:23 +02:00
class CaptureStatusQuery(Resource):
@api.param('with_error', 'Add the error message of the capture (if there is one)')
def get(self, capture_uuid: str):
with_error: bool = True if request.args.get('with_error') else False
status_code = lookyloo.get_capture_status(capture_uuid)
to_return: Dict[str, Any] = {'status_code': status_code}
if status_code in [CaptureStatusCore.DONE, CaptureStatusPy.DONE] and with_error:
cache = lookyloo.capture_cache(capture_uuid)
if cache and cache.error:
to_return['error'] = cache.error
return to_return
2021-06-07 22:12:23 +02:00
@api.route('/json/<string:capture_uuid>/hostnames')
@api.doc(description='Get all the hostnames of all the resources of a capture',
params={'capture_uuid': 'The UUID of the capture'})
class CaptureHostnames(Resource):
def get(self, capture_uuid: str):
cache = lookyloo.capture_cache(capture_uuid)
if not cache:
2022-09-28 12:40:26 +02:00
return {'error': 'UUID missing in cache, try again later and check the status first.'}, 400
to_return: Dict[str, Any] = {'response': {'hostnames': list(lookyloo.get_hostnames(capture_uuid))}}
return to_return
@api.route('/json/<string:capture_uuid>/urls')
@api.doc(description='Get all the URLs of all the resources of a capture',
params={'capture_uuid': 'The UUID of the capture'})
class CaptureURLs(Resource):
def get(self, capture_uuid: str):
cache = lookyloo.capture_cache(capture_uuid)
if not cache:
2022-09-28 12:40:26 +02:00
return {'error': 'UUID missing in cache, try again later and check the status first.'}, 400
to_return: Dict[str, Any] = {'response': {'urls': list(lookyloo.get_urls(capture_uuid))}}
return to_return
@api.route('/json/<string:capture_uuid>/hashes')
@api.doc(description='Get all the hashes of all the resources of a capture',
params={'capture_uuid': 'The UUID of the capture'})
class CaptureHashes(Resource):
# Note: shake algos require a length for the digest, discarding them.
supported_hash_algos = [algo for algo in hashlib.algorithms_available if not algo.startswith('shake')]
# NOTE: the SHA512 hashes are pre-computed in the tree, anything else must be computed on the spot
# so we return the SHA512 hashes by default
@api.param('algorithm', default='sha512', description=f'Algorithm of the hashes (default: sha512). Supported options: {", ".join(supported_hash_algos)}')
@api.param('hashes_only', default=1, description='If 1 (default), only returns a list hashes instead of a dictionary of hashes with their respective URLs..')
def get(self, capture_uuid: str):
cache = lookyloo.capture_cache(capture_uuid)
if not cache:
2022-09-28 12:40:26 +02:00
return {'error': 'UUID missing in cache, try again later and check the status first.'}, 400
algorithm = request.args['algorithm'].lower() if request.args.get('algorithm') else 'sha512'
hashes_only = False if 'hashes_only' in request.args and request.args['hashes_only'] in [0, '0'] else True
if algorithm == 'sha512' and hashes_only:
to_return: Dict[str, Any] = {'response': {'hashes': list(lookyloo.get_hashes(capture_uuid))}}
else:
hashes = lookyloo.get_hashes_with_context(capture_uuid, algorithm=algorithm, urls_only=True)
to_return = {'response': {'hashes': list(hashes.keys())}}
if not hashes_only:
to_return['response']['hashes_with_urls'] = {h: list(urls) for h, urls in hashes.items()}
return to_return
@api.route('/json/<string:capture_uuid>/redirects')
2021-06-07 22:12:23 +02:00
@api.doc(description='Get all the redirects of a capture',
params={'capture_uuid': 'The UUID of the capture'})
2021-06-07 22:12:23 +02:00
class CaptureRedirects(Resource):
def get(self, capture_uuid: str):
cache = lookyloo.capture_cache(capture_uuid)
2021-06-07 22:12:23 +02:00
if not cache:
2022-09-28 12:40:26 +02:00
return {'error': 'UUID missing in cache, try again later and check the status first.'}, 400
2021-06-07 22:12:23 +02:00
to_return: Dict[str, Any] = {}
try:
to_return = {'response': {'url': cache.url, 'redirects': []}}
if not cache.redirects:
to_return['response']['info'] = 'No redirects'
return to_return
if cache.incomplete_redirects:
# Trigger tree build, get all redirects
lookyloo.get_crawled_tree(capture_uuid)
cache = lookyloo.capture_cache(capture_uuid)
if cache:
to_return['response']['redirects'] = cache.redirects
else:
2021-06-07 22:12:23 +02:00
to_return['response']['redirects'] = cache.redirects
except Exception as e:
if cache and hasattr(cache, 'error'):
to_return['error'] = cache.error
else:
to_return['error'] = str(e)
2021-06-07 22:12:23 +02:00
return to_return
@api.route('/json/<string:capture_uuid>/misp_export')
2021-06-07 22:12:23 +02:00
@api.doc(description='Get an export of the capture in MISP format',
params={'capture_uuid': 'The UUID of the capture'})
2021-06-07 22:12:23 +02:00
class MISPExport(Resource):
def get(self, capture_uuid: str):
2021-06-07 22:12:23 +02:00
with_parents = request.args.get('with_parents')
event = lookyloo.misp_export(capture_uuid, True if with_parents else False)
2021-06-07 22:12:23 +02:00
if isinstance(event, dict):
return event
to_return = []
for e in event:
to_return.append(e.to_json(indent=2))
return to_return
misp_push_fields = api.model('MISPPushFields', {
'allow_duplicates': fields.Integer(description="Push the event even if it is already present on the MISP instance",
example=0, min=0, max=1),
'with_parents': fields.Integer(description="Also push the parents of the capture (if any)",
example=0, min=0, max=1),
})
@api.route('/json/<string:capture_uuid>/misp_push')
2021-06-07 22:12:23 +02:00
@api.doc(description='Push an event to a pre-configured MISP instance',
params={'capture_uuid': 'The UUID of the capture'},
2021-06-07 22:12:23 +02:00
security='apikey')
class MISPPush(Resource):
method_decorators = [api_auth_check]
@api.param('with_parents', 'Also push the parents of the capture (if any)')
@api.param('allow_duplicates', 'Push the event even if it is already present on the MISP instance')
def get(self, capture_uuid: str):
2021-06-07 22:12:23 +02:00
with_parents = True if request.args.get('with_parents') else False
allow_duplicates = True if request.args.get('allow_duplicates') else False
to_return: Dict = {}
if not lookyloo.misp.available:
to_return['error'] = 'MISP module not available.'
elif not lookyloo.misp.enable_push:
to_return['error'] = 'Push not enabled in MISP module.'
else:
event = lookyloo.misp_export(capture_uuid, with_parents)
2021-06-07 22:12:23 +02:00
if isinstance(event, dict):
to_return['error'] = event
else:
new_events = lookyloo.misp.push(event, allow_duplicates)
if isinstance(new_events, dict):
to_return['error'] = new_events
else:
events_to_return = []
for e in new_events:
events_to_return.append(e.to_json(indent=2))
return events_to_return
return to_return
@api.doc(body=misp_push_fields)
def post(self, capture_uuid: str):
2022-11-02 12:23:41 +01:00
parameters: Dict = request.get_json(force=True) # type: ignore
2021-06-07 22:12:23 +02:00
with_parents = True if parameters.get('with_parents') else False
allow_duplicates = True if parameters.get('allow_duplicates') else False
to_return: Dict = {}
if not lookyloo.misp.available:
to_return['error'] = 'MISP module not available.'
elif not lookyloo.misp.enable_push:
to_return['error'] = 'Push not enabled in MISP module.'
else:
event = lookyloo.misp_export(capture_uuid, with_parents)
2021-06-07 22:12:23 +02:00
if isinstance(event, dict):
to_return['error'] = event
else:
new_events = lookyloo.misp.push(event, allow_duplicates)
if isinstance(new_events, dict):
to_return['error'] = new_events
else:
events_to_return = []
for e in new_events:
events_to_return.append(e.to_json(indent=2))
return events_to_return
return to_return
trigger_modules_fields = api.model('TriggerModulesFields', {
'force': fields.Boolean(description="Force trigger the modules, even if the results are already cached.",
default=False, required=False),
})
@api.route('/json/<string:capture_uuid>/trigger_modules')
@api.doc(description='Trigger all the available 3rd party modules on the given capture',
params={'capture_uuid': 'The UUID of the capture'})
class TriggerModules(Resource):
@api.doc(body=trigger_modules_fields)
def post(self, capture_uuid: str):
2022-11-02 12:23:41 +01:00
parameters: Dict = request.get_json(force=True) # type: ignore
force = True if parameters.get('force') else False
return lookyloo.trigger_modules(capture_uuid, force=force)
2021-06-07 22:12:23 +02:00
@api.route('/json/hash_info/<h>')
@api.doc(description='Search for a ressource with a specific hash (sha512)',
params={'h': 'The hash (sha512)'})
class HashInfo(Resource):
def get(self, h: str):
details, body = lookyloo.get_body_hash_full(h)
if not details:
return {'error': 'Unknown Hash.'}, 400
2021-06-07 22:12:23 +02:00
to_return: Dict[str, Any] = {'response': {'hash': h, 'details': details,
'body': base64.b64encode(body.getvalue()).decode()}}
return to_return
url_info_fields = api.model('URLInfoFields', {
'url': fields.String(description="The URL to search", required=True),
'limit': fields.Integer(description="The maximal amount of captures to return", example=20),
'cached_captures_only': fields.Boolean(description="If false, re-cache the missing captures (can take a while)", default=True),
2021-06-07 22:12:23 +02:00
})
@api.route('/json/url_info')
@api.doc(description='Search for a URL')
class URLInfo(Resource):
@api.doc(body=url_info_fields)
def post(self):
2022-11-02 12:23:41 +01:00
to_query: Dict = request.get_json(force=True) # type: ignore
2021-06-07 22:12:23 +02:00
occurrences = lookyloo.get_url_occurrences(to_query.pop('url'), **to_query)
return occurrences
hostname_info_fields = api.model('HostnameInfoFields', {
'hostname': fields.String(description="The hostname to search", required=True),
'limit': fields.Integer(description="The maximal amount of captures to return", example=20),
'cached_captures_only': fields.Boolean(description="If false, re-cache the missing captures (can take a while)", default=True),
2021-06-07 22:12:23 +02:00
})
@api.route('/json/hostname_info')
@api.doc(description='Search for a hostname')
class HostnameInfo(Resource):
@api.doc(body=hostname_info_fields)
def post(self):
2022-11-02 12:23:41 +01:00
to_query: Dict = request.get_json(force=True) # type: ignore
2021-06-07 22:12:23 +02:00
occurrences = lookyloo.get_hostname_occurrences(to_query.pop('hostname'), **to_query)
return occurrences
@api.route('/json/stats')
@api.doc(description='Get the statistics of the lookyloo instance.')
class InstanceStats(Resource):
def get(self):
return lookyloo.get_stats()
@api.route('/json/devices')
@api.doc(description='Get the list of devices pre-configured on the platform')
class Devices(Resource):
def get(self):
return lookyloo.get_playwright_devices()
@api.route('/json/<string:capture_uuid>/stats')
@api.doc(description='Get the statistics of the capture.',
params={'capture_uuid': 'The UUID of the capture'})
class CaptureStats(Resource):
def get(self, capture_uuid: str):
return lookyloo.get_statistics(capture_uuid)
2021-06-10 02:59:24 +02:00
@api.route('/json/<string:capture_uuid>/info')
@api.doc(description='Get basic information about the capture.',
params={'capture_uuid': 'The UUID of the capture'})
class CaptureInfo(Resource):
def get(self, capture_uuid: str):
return lookyloo.get_info(capture_uuid)
@api.route('/json/<string:capture_uuid>/cookies')
@api.doc(description='Get the complete cookie jar created during the capture.',
params={'capture_uuid': 'The UUID of the capture'})
class CaptureCookies(Resource):
def get(self, capture_uuid: str):
return json.loads(lookyloo.get_cookies(capture_uuid).read())
# Just text
submit_fields_post = api.model('SubmitFieldsPost', {
'url': fields.Url(description="The URL to capture"),
'document': fields.String(description="A base64 encoded document, it can be anything a browser can display."),
'document_name': fields.String(description="The name of the document."),
'listing': fields.Integer(description="Display the capture on the index", min=0, max=1, example=1),
'user_agent': fields.String(description="User agent to use for the capture", example=''),
'browser_name': fields.String(description="Use this browser. Must be chromium, firefox or webkit.", example=''),
'device_name': fields.String(description="Use the pre-configured settings for this device. Get a list from /json/devices.", example=''),
'referer': fields.String(description="Referer to pass to the capture", example=''),
'headers': fields.String(description="Headers to pass to the capture", example='Accept-Language: en-US;q=0.5, fr-FR;q=0.4'),
'proxy': fields.Url(description="Proxy to use for the capture. Format: [scheme]://[username]:[password]@[hostname]:[port]", example=''),
'cookies': fields.String(description="JSON export of a list of cookies as exported from an other capture", example='')
})
2021-06-07 22:12:23 +02:00
@api.route('/submit')
class SubmitCapture(Resource):
@api.param('url', 'The URL to capture', required=True)
@api.param('listing', 'Display the capture on the index', default=1)
@api.param('user_agent', 'User agent to use for the capture')
@api.param('browser_name', 'Use this browser. Must be chromium, firefox or webkit.')
@api.param('device_name', 'Use the pre-configured settings for this device')
@api.param('referer', 'Referer to pass to the capture')
@api.param('proxy', 'Proxy to use for the the capture')
@api.produces(['text/text'])
def get(self):
if flask_login.current_user.is_authenticated:
user = flask_login.current_user.get_id()
else:
user = src_request_ip(request)
if 'url' not in request.args or not request.args.get('url'):
return 'No "url" in the URL params, nothting to capture.', 400
to_query = {'url': request.args['url'],
'listing': False if 'listing' in request.args and request.args['listing'] in [0, '0'] else True}
if request.args.get('user_agent'):
to_query['user_agent'] = request.args['user_agent']
if request.args.get('browser_name'):
to_query['browser_name'] = request.args['browser_name']
if request.args.get('device_name'):
to_query['device_name'] = request.args['device_name']
if request.args.get('referer'):
to_query['referer'] = request.args['referer']
if request.args.get('headers'):
to_query['headers'] = request.args['headers']
if request.args.get('proxy'):
to_query['proxy'] = request.args['proxy']
perma_uuid = lookyloo.enqueue_capture(to_query, source='api', user=user, authenticated=flask_login.current_user.is_authenticated)
return perma_uuid
@api.doc(body=submit_fields_post)
@api.produces(['text/text'])
2021-06-07 22:12:23 +02:00
def post(self):
if flask_login.current_user.is_authenticated:
user = flask_login.current_user.get_id()
else:
user = src_request_ip(request)
2022-11-02 12:23:41 +01:00
to_query: Dict = request.get_json(force=True) # type: ignore
2021-06-07 22:12:23 +02:00
perma_uuid = lookyloo.enqueue_capture(to_query, source='api', user=user, authenticated=flask_login.current_user.is_authenticated)
return perma_uuid
2021-06-07 22:12:23 +02:00
# Binary stuff
@api.route('/bin/<string:capture_uuid>/screenshot')
@api.doc(description='Get the screenshot associated to the capture.',
params={'capture_uuid': 'The UUID of the capture'})
class CaptureScreenshot(Resource):
@api.produces(['image/png'])
def get(self, capture_uuid: str):
return send_file(lookyloo.get_screenshot(capture_uuid), mimetype='image/png')
@api.route('/bin/<string:capture_uuid>/export')
@api.doc(description='Get all the files generated by the capture, except the pickle.',
params={'capture_uuid': 'The UUID of the capture'})
class CaptureExport(Resource):
@api.produces(['application/zip'])
def get(self, capture_uuid: str):
return send_file(lookyloo.get_capture(capture_uuid), mimetype='application/zip')
# Compare captures (WiP)
compare_captures_fields = api.model('CompareCapturesFields', {
2023-01-31 11:22:43 +01:00
'capture_left': fields.String(description="Left capture to compare.", required=True),
'capture_right': fields.String(description="Right capture to compare.", required=True),
})
@api.route('/json/compare_captures')
2023-01-31 11:22:43 +01:00
@api.doc(description='Compare two captures')
class CompareCaptures(Resource):
@api.doc(body=compare_captures_fields)
def post(self):
parameters: Dict = request.get_json(force=True) # type: ignore
2023-01-31 11:22:43 +01:00
result = comparator.compare_captures(parameters.get('capture_left'), parameters.get('capture_right'))
return result
# Get information for takedown
takedown_fields = api.model('TakedownFields', {
'capture_uuid': fields.String(description="The UUID of the capture.", required=True),
})
@api.route('/json/takedown')
@api.doc(description='Get information for triggering a takedown request')
class Takedown(Resource):
@api.doc(body=takedown_fields)
def post(self):
parameters: Dict = request.get_json(force=True) # type: ignore
capture_uuid = parameters.get('capture_uuid')
if not capture_uuid:
return {'error': f'Invalid UUID: {capture_uuid}'}
capture = lookyloo.get_crawled_tree(capture_uuid)
if not capture:
return {'error': f'Unknown capture {capture_uuid}'}
rendered_hostnode = lookyloo.get_hostnode_from_tree(capture_uuid, capture.root_hartree.rendered_node.hostnode_uuid)
result = []
for node in reversed(rendered_hostnode.get_ancestors()):
result.append(lookyloo.takedown_details(node))
result.append(lookyloo.takedown_details(rendered_hostnode))
return result
# Admin stuff
@api.route('/admin/rebuild_all')
@api.doc(description='Rebuild all the trees. WARNING: IT IS GOING TO TAKE A VERY LONG TIME.',
security='apikey')
class RebuildAll(Resource):
method_decorators = [api_auth_check]
def post(self):
try:
lookyloo.rebuild_all()
except Exception as e:
return {'error': f'Unable to rebuild all captures: {e}.'}, 400
else:
return {'info': 'Captures successfully rebuilt.'}
@api.route('/admin/rebuild_all_cache')
@api.doc(description='Rebuild all the caches. It will take a while, but less that rebuild all.',
security='apikey')
class RebuildAllCache(Resource):
method_decorators = [api_auth_check]
def post(self):
try:
lookyloo.rebuild_cache()
except Exception as e:
return {'error': f'Unable to rebuild all the caches: {e}.'}, 400
else:
return {'info': 'All caches successfully rebuilt.'}
@api.route('/admin/<string:capture_uuid>/rebuild')
@api.doc(description='Rebuild the tree.',
params={'capture_uuid': 'The UUID of the capture'},
security='apikey')
class CaptureRebuildTree(Resource):
method_decorators = [api_auth_check]
def post(self, capture_uuid):
try:
lookyloo.remove_pickle(capture_uuid)
lookyloo.get_crawled_tree(capture_uuid)
except Exception as e:
return {'error': f'Unable to rebuild tree: {e}.'}, 400
else:
return {'info': f'Tree {capture_uuid} successfully rebuilt.'}
@api.route('/admin/<string:capture_uuid>/hide')
@api.doc(description='Hide the capture from the index.',
params={'capture_uuid': 'The UUID of the capture'},
security='apikey')
class CaptureHide(Resource):
method_decorators = [api_auth_check]
def post(self, capture_uuid):
try:
lookyloo.hide_capture(capture_uuid)
except Exception as e:
return {'error': f'Unable to hide the tree: {e}.'}, 400
else:
return {'info': f'Capture {capture_uuid} successfully hidden.'}