diff --git a/website/web/__init__.py b/website/web/__init__.py index 0a7b81c1..f9237b51 100644 --- a/website/web/__init__.py +++ b/website/web/__init__.py @@ -1,17 +1,13 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import base64 from io import BytesIO, StringIO -import os -from pathlib import Path from datetime import datetime, timedelta, timezone import json import http import calendar from typing import Optional, Dict, Any, Union, List import logging -import hashlib from urllib.parse import quote_plus, unquote_plus import time import pkg_resources @@ -19,28 +15,24 @@ import pkg_resources from flask import Flask, render_template, request, send_file, redirect, url_for, Response, flash, jsonify from flask_bootstrap import Bootstrap # type: ignore import flask_login # type: ignore -from flask_restx import Resource, Api, fields, abort # type: ignore -from werkzeug.security import generate_password_hash, check_password_hash +from flask_restx import Api # type: ignore +from .genericapi import api as generic_api +from werkzeug.security import check_password_hash from pymisp import MISPEvent, MISPServerError -from lookyloo.helpers import (get_homedir, update_user_agents, get_user_agents, get_config, +from lookyloo.helpers import (update_user_agents, get_user_agents, get_config, get_taxonomies, load_cookies, CaptureStatus) from lookyloo.lookyloo import Lookyloo, Indexing from lookyloo.exceptions import NoValidHarFile, MissingUUID + from .proxied import ReverseProxied +from .helpers import src_request_ip, User, load_user_from_request, build_users_table, get_secret_key app: Flask = Flask(__name__) app.wsgi_app = ReverseProxied(app.wsgi_app) # type: ignore -secret_file_path: Path = get_homedir() / 'secret_key' - -if not secret_file_path.exists() or secret_file_path.stat().st_size < 64: - with secret_file_path.open('wb') as f: - f.write(os.urandom(64)) - -with secret_file_path.open('rb') as f: - app.config['SECRET_KEY'] = f.read() +app.config['SECRET_KEY'] = get_secret_key() Bootstrap(app) app.config['BOOTSTRAP_SERVE_LOCAL'] = True @@ -51,45 +43,11 @@ app.debug = False # Auth stuff login_manager = flask_login.LoginManager() login_manager.init_app(app) -try: - # Use legacy user mgmt, no need to print a warning, and it will fail on new install. - users = get_config('generic', 'cache_clean_user', quiet=True) -except Exception: - users = get_config('generic', 'users') - -users_table: Dict[str, Dict[str, str]] = {} -for username, authstuff in users.items(): - if isinstance(authstuff, str): - # just a password, make a key - users_table[username] = {} - users_table[username]['password'] = generate_password_hash(authstuff) - users_table[username]['authkey'] = hashlib.pbkdf2_hmac('sha256', - app.config['SECRET_KEY'], - authstuff.encode(), - 100000).hex() - - elif isinstance(authstuff, list) and len(authstuff) == 2: - if isinstance(authstuff[0], str) and isinstance(authstuff[1], str) and len(authstuff[1]) == 64: - users_table[username] = {} - users_table[username]['password'] = generate_password_hash(authstuff[0]) - users_table[username]['authkey'] = authstuff[1] - - if username not in users_table: - raise Exception('User setup invalid. Must be "username": "password" or "username": ["password", "token 64 chars (sha256)"]') - -keys_table = {} -for username, authstuff in users_table.items(): - if 'authkey' in authstuff: - keys_table[authstuff['authkey']] = username - - -class User(flask_login.UserMixin): - pass @login_manager.user_loader def user_loader(username): - if username not in users_table: + if username not in build_users_table(): return None user = User() user.id = username @@ -97,16 +55,8 @@ def user_loader(username): @login_manager.request_loader -def load_user_from_request(request): - api_key = request.headers.get('Authorization') - if not api_key: - return None - user = User() - api_key = api_key.strip() - if api_key in keys_table: - user.id = keys_table[api_key] - return user - return None +def _load_user_from_request(request): + return load_user_from_request(request) @app.route('/login', methods=['GET', 'POST']) @@ -121,6 +71,7 @@ def login(): ''' username = request.form['username'] + users_table = build_users_table() if username in users_table and check_password_hash(users_table[username]['password'], request.form['password']): user = User() user.id = username @@ -196,14 +147,6 @@ app.jinja_env.globals.update(month_name=month_name) # ##### Generic/configuration methods ##### -def src_request_ip(request) -> str: - # NOTE: X-Real-IP is the IP passed by the reverse proxy in the headers. - real_ip = request.headers.get('X-Real-IP') - if not real_ip: - real_ip = request.remote_addr - return real_ip - - @app.after_request def after_request(response): # We keep a list user agents in order to build a list to use in the capture @@ -351,6 +294,76 @@ def web_misp_lookup_view(tree_uuid: str): return render_template('misp_lookup.html', uuid=tree_uuid, hits=hits, misp_root_url=misp_root_url) +@app.route('/tree//misp_push', methods=['GET', 'POST']) +@flask_login.login_required +def web_misp_push_view(tree_uuid: str): + error = False + if not lookyloo.misp.available: + flash('MISP module not available.', 'error') + return redirect(url_for('tree', tree_uuid=tree_uuid)) + elif not lookyloo.misp.enable_push: + flash('Push not enabled in MISP module.', 'error') + return redirect(url_for('tree', tree_uuid=tree_uuid)) + else: + event = lookyloo.misp_export(tree_uuid) + if isinstance(event, dict): + flash(f'Unable to generate the MISP export: {event}', 'error') + return redirect(url_for('tree', tree_uuid=tree_uuid)) + + if request.method == 'POST': + # event is a MISPEvent at this point + # Submit the event + tags = request.form.getlist('tags') + error = False + events: List[MISPEvent] = [] + with_parents = request.form.get('with_parents') + if with_parents: + exports = lookyloo.misp_export(tree_uuid, True) + if isinstance(exports, dict): + flash(f'Unable to create event: {exports}', 'error') + error = True + else: + events = exports + else: + events = event + + if error: + return redirect(url_for('tree', tree_uuid=tree_uuid)) + + for e in events: + for tag in tags: + e.add_tag(tag) + + # Change the event info field of the last event in the chain + events[-1].info = request.form.get('event_info') + + try: + new_events = lookyloo.misp.push(events, True if request.form.get('force_push') else False, + True if request.form.get('auto_publish') else False) + except MISPServerError: + flash(f'MISP returned an error, the event(s) might still have been created on {lookyloo.misp.client.root_url}', 'error') + else: + if isinstance(new_events, dict): + flash(f'Unable to create event(s): {new_events}', 'error') + else: + for e in new_events: + flash(f'MISP event {e.id} created on {lookyloo.misp.client.root_url}', 'success') + return redirect(url_for('tree', tree_uuid=tree_uuid)) + else: + # the 1st attribute in the event is the link to lookyloo + existing_misp_url = lookyloo.misp.get_existing_event_url(event[-1].attributes[0].value) + + fav_tags = lookyloo.misp.get_fav_tags() + cache = lookyloo.capture_cache(tree_uuid) + + return render_template('misp_push_view.html', tree_uuid=tree_uuid, + event=event[0], fav_tags=fav_tags, + existing_event=existing_misp_url, + auto_publish=lookyloo.misp.auto_publish, + has_parent=True if cache and cache.parent else False, + default_tags=lookyloo.misp.default_tags) + + @app.route('/tree//modules', methods=['GET']) def modules(tree_uuid: str): modules_responses = lookyloo.get_modules_responses(tree_uuid) @@ -791,6 +804,13 @@ def statsfull(): return render_template('stats.html', stats=stats) +@app.route('/whois/', methods=['GET']) +def whois(query: str): + to_return = lookyloo.uwhois.whois(query) + return send_file(BytesIO(to_return.encode()), + mimetype='test/plain', as_attachment=True, attachment_filename=f'whois.{query}.txt') + + # ##### Methods related to a specific URLNode ##### @app.route('/tree//url//request_cookies', methods=['GET']) @@ -942,83 +962,6 @@ def add_context(tree_uuid: str, node_uuid: str): return redirect(url_for('ressources')) -@app.route('/tree//misp_push', methods=['GET', 'POST']) -@flask_login.login_required -def web_misp_push_view(tree_uuid: str): - error = False - if not lookyloo.misp.available: - flash('MISP module not available.', 'error') - return redirect(url_for('tree', tree_uuid=tree_uuid)) - elif not lookyloo.misp.enable_push: - flash('Push not enabled in MISP module.', 'error') - return redirect(url_for('tree', tree_uuid=tree_uuid)) - else: - event = lookyloo.misp_export(tree_uuid) - if isinstance(event, dict): - flash(f'Unable to generate the MISP export: {event}', 'error') - return redirect(url_for('tree', tree_uuid=tree_uuid)) - - if request.method == 'POST': - # event is a MISPEvent at this point - # Submit the event - tags = request.form.getlist('tags') - error = False - events: List[MISPEvent] = [] - with_parents = request.form.get('with_parents') - if with_parents: - exports = lookyloo.misp_export(tree_uuid, True) - if isinstance(exports, dict): - flash(f'Unable to create event: {exports}', 'error') - error = True - else: - events = exports - else: - events = event - - if error: - return redirect(url_for('tree', tree_uuid=tree_uuid)) - - for e in events: - for tag in tags: - e.add_tag(tag) - - # Change the event info field of the last event in the chain - events[-1].info = request.form.get('event_info') - - try: - new_events = lookyloo.misp.push(events, True if request.form.get('force_push') else False, - True if request.form.get('auto_publish') else False) - except MISPServerError: - flash(f'MISP returned an error, the event(s) might still have been created on {lookyloo.misp.client.root_url}', 'error') - else: - if isinstance(new_events, dict): - flash(f'Unable to create event(s): {new_events}', 'error') - else: - for e in new_events: - flash(f'MISP event {e.id} created on {lookyloo.misp.client.root_url}', 'success') - return redirect(url_for('tree', tree_uuid=tree_uuid)) - else: - # the 1st attribute in the event is the link to lookyloo - existing_misp_url = lookyloo.misp.get_existing_event_url(event[-1].attributes[0].value) - - fav_tags = lookyloo.misp.get_fav_tags() - cache = lookyloo.capture_cache(tree_uuid) - - return render_template('misp_push_view.html', tree_uuid=tree_uuid, - event=event[0], fav_tags=fav_tags, - existing_event=existing_misp_url, - auto_publish=lookyloo.misp.auto_publish, - has_parent=True if cache and cache.parent else False, - default_tags=lookyloo.misp.default_tags) - - -@app.route('/whois/', methods=['GET']) -def whois(query: str): - to_return = lookyloo.uwhois.whois(query) - return send_file(BytesIO(to_return.encode()), - mimetype='test/plain', as_attachment=True, attachment_filename=f'whois.{query}.txt') - - # Query API authorizations = { @@ -1035,232 +978,4 @@ api = Api(app, title='Lookyloo API', authorizations=authorizations, version=pkg_resources.get_distribution('lookyloo').version) - -def api_auth_check(method): - if flask_login.current_user.is_authenticated or load_user_from_request(request): - return method - abort(403, 'Authentication required.') - - -token_request_fields = api.model('AuthTokenFields', { - 'username': fields.String(description="Your username", required=True), - 'password': fields.String(description="Your password", required=True), -}) - - -@api.route('/json/get_token') -@api.doc(description='Get the API token required for authenticated calls') -class AuthToken(Resource): - - @api.param('username', 'Your username') - @api.param('password', 'Your password') - def get(self): - username = request.args['username'] if request.args.get('username') else False - password = request.args['password'] if request.args.get('password') else False - if username in users_table and check_password_hash(users_table[username]['password'], password): - return {'authkey': users_table[username]['authkey']} - return {'error': 'User/Password invalid.'} - - @api.doc(body=token_request_fields) - def post(self): - auth: Dict = request.get_json(force=True) - if 'username' in auth and 'password' in auth: # Expected keys in json - if (auth['username'] in users_table - and check_password_hash(users_table[auth['username']]['password'], auth['password'])): - return {'authkey': users_table[auth['username']]['authkey']} - return {'error': 'User/Password invalid.'} - - -@api.route('/json//status') -@api.doc(description='Get the status of a capture', - params={'tree_uuid': 'The UUID of the capture'}) -class CaptureStatusQuery(Resource): - def get(self, tree_uuid: str): - return {'status_code': lookyloo.get_capture_status(tree_uuid)} - - -@api.route('/json//redirects') -@api.doc(description='Get all the redirects of a capture', - params={'tree_uuid': 'The UUID of the capture'}) -class CaptureRedirects(Resource): - def get(self, tree_uuid: str): - cache = lookyloo.capture_cache(tree_uuid) - if not cache: - return {'error': 'UUID missing in cache, try again later.'} - - to_return: Dict[str, Any] = {'response': {'url': cache.url, 'redirects': []}} - if not cache.redirects: - to_return['response']['info'] = 'No redirects' - return to_return - if cache.incomplete_redirects: - # Trigger tree build, get all redirects - lookyloo.get_crawled_tree(tree_uuid) - cache = lookyloo.capture_cache(tree_uuid) - if cache: - to_return['response']['redirects'] = cache.redirects - else: - to_return['response']['redirects'] = cache.redirects - - return to_return - - -@api.route('/json//misp_export') -@api.doc(description='Get an export of the capture in MISP format', - params={'tree_uuid': 'The UUID of the capture'}) -class MISPExport(Resource): - def get(self, tree_uuid: str): - with_parents = request.args.get('with_parents') - event = lookyloo.misp_export(tree_uuid, True if with_parents else False) - if isinstance(event, dict): - return event - - to_return = [] - for e in event: - to_return.append(e.to_json(indent=2)) - return to_return - - -misp_push_fields = api.model('MISPPushFields', { - 'allow_duplicates': fields.Integer(description="Push the event even if it is already present on the MISP instance", - example=0, min=0, max=1), - 'with_parents': fields.Integer(description="Also push the parents of the capture (if any)", - example=0, min=0, max=1), -}) - - -@api.route('/json//misp_push') -@api.doc(description='Push an event to a pre-configured MISP instance', - params={'tree_uuid': 'The UUID of the capture'}, - security='apikey') -class MISPPush(Resource): - method_decorators = [api_auth_check] - - @api.param('with_parents', 'Also push the parents of the capture (if any)') - @api.param('allow_duplicates', 'Push the event even if it is already present on the MISP instance') - def get(self, tree_uuid: str): - with_parents = True if request.args.get('with_parents') else False - allow_duplicates = True if request.args.get('allow_duplicates') else False - to_return: Dict = {} - if not lookyloo.misp.available: - to_return['error'] = 'MISP module not available.' - elif not lookyloo.misp.enable_push: - to_return['error'] = 'Push not enabled in MISP module.' - else: - event = lookyloo.misp_export(tree_uuid, with_parents) - if isinstance(event, dict): - to_return['error'] = event - else: - new_events = lookyloo.misp.push(event, allow_duplicates) - if isinstance(new_events, dict): - to_return['error'] = new_events - else: - events_to_return = [] - for e in new_events: - events_to_return.append(e.to_json(indent=2)) - return events_to_return - - return to_return - - @api.doc(body=misp_push_fields) - def post(self, tree_uuid: str): - parameters: Dict = request.get_json(force=True) - with_parents = True if parameters.get('with_parents') else False - allow_duplicates = True if parameters.get('allow_duplicates') else False - - to_return: Dict = {} - if not lookyloo.misp.available: - to_return['error'] = 'MISP module not available.' - elif not lookyloo.misp.enable_push: - to_return['error'] = 'Push not enabled in MISP module.' - else: - event = lookyloo.misp_export(tree_uuid, with_parents) - if isinstance(event, dict): - to_return['error'] = event - else: - new_events = lookyloo.misp.push(event, allow_duplicates) - if isinstance(new_events, dict): - to_return['error'] = new_events - else: - events_to_return = [] - for e in new_events: - events_to_return.append(e.to_json(indent=2)) - return events_to_return - - return to_return - - -@api.route('/json/hash_info/') -@api.doc(description='Search for a ressource with a specific hash (sha512)', - params={'h': 'The hash (sha512)'}) -class HashInfo(Resource): - def get(self, h: str): - details, body = lookyloo.get_body_hash_full(h) - if not details: - return {'error': 'Unknown Hash.'} - to_return: Dict[str, Any] = {'response': {'hash': h, 'details': details, - 'body': base64.b64encode(body.getvalue()).decode()}} - return to_return - - -url_info_fields = api.model('URLInfoFields', { - 'url': fields.String(description="The URL to search", required=True), - 'limit': fields.Integer(description="The maximal amount of captures to return", example=20), -}) - - -@api.route('/json/url_info') -@api.doc(description='Search for a URL') -class URLInfo(Resource): - - @api.doc(body=url_info_fields) - def post(self): - to_query: Dict = request.get_json(force=True) - occurrences = lookyloo.get_url_occurrences(to_query.pop('url'), **to_query) - return occurrences - - -hostname_info_fields = api.model('HostnameInfoFields', { - 'hostname': fields.String(description="The hostname to search", required=True), - 'limit': fields.Integer(description="The maximal amount of captures to return", example=20), -}) - - -@api.route('/json/hostname_info') -@api.doc(description='Search for a hostname') -class HostnameInfo(Resource): - - @api.doc(body=hostname_info_fields) - def post(self): - to_query: Dict = request.get_json(force=True) - occurrences = lookyloo.get_hostname_occurrences(to_query.pop('hostname'), **to_query) - return occurrences - - -@api.route('/json/stats') -@api.doc(description='Get the statistics of the lookyloo instance.') -class InstanceStats(Resource): - def get(self): - return lookyloo.get_stats() - - -submit_fields = api.model('SubmitFields', { - 'url': fields.String(description="The URL to capture", required=True), - 'listing': fields.Integer(description="Display the capture on the index", min=0, max=1, example=1), - 'user_agent': fields.String(description="User agent to use for the capture", example=''), - 'referer': fields.String(description="Referer to pass to the capture", example=''), - 'cookies': fields.String(description="JSON export of a list of cookies as exported from an other capture", example='') -}) - - -@api.route('/submit') -class SubmitCapture(Resource): - - @api.doc(body=submit_fields) - def post(self): - if flask_login.current_user.is_authenticated: - user = flask_login.current_user.get_id() - else: - user = src_request_ip(request) - to_query: Dict = request.get_json(force=True) - perma_uuid = lookyloo.enqueue_capture(to_query, source='api', user=user, authenticated=flask_login.current_user.is_authenticated) - return Response(perma_uuid, mimetype='text/text') +api.add_namespace(generic_api) diff --git a/website/web/genericapi.py b/website/web/genericapi.py new file mode 100644 index 00000000..434069eb --- /dev/null +++ b/website/web/genericapi.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import base64 +from typing import Dict, Any + +from flask import request, Response +import flask_login # type: ignore +from flask_restx import Namespace, Resource, fields, abort # type: ignore +from werkzeug.security import check_password_hash + +from lookyloo.lookyloo import Lookyloo + +from .helpers import src_request_ip, load_user_from_request, build_users_table + +api = Namespace('GenericAPI', description='Generic Lookyloo API', path='/') + + +lookyloo: Lookyloo = Lookyloo() + + +def api_auth_check(method): + if flask_login.current_user.is_authenticated or load_user_from_request(request): + return method + abort(403, 'Authentication required.') + + +token_request_fields = api.model('AuthTokenFields', { + 'username': fields.String(description="Your username", required=True), + 'password': fields.String(description="Your password", required=True), +}) + + +@api.route('/json/get_token') +@api.doc(description='Get the API token required for authenticated calls') +class AuthToken(Resource): + + users_table = build_users_table() + + @api.param('username', 'Your username') + @api.param('password', 'Your password') + def get(self): + username = request.args['username'] if request.args.get('username') else False + password = request.args['password'] if request.args.get('password') else False + if username in self.users_table and check_password_hash(self.users_table[username]['password'], password): + return {'authkey': self.users_table[username]['authkey']} + return {'error': 'User/Password invalid.'} + + @api.doc(body=token_request_fields) + def post(self): + auth: Dict = request.get_json(force=True) + if 'username' in auth and 'password' in auth: # Expected keys in json + if (auth['username'] in self.users_table + and check_password_hash(self.users_table[auth['username']]['password'], auth['password'])): + return {'authkey': self.users_table[auth['username']]['authkey']} + return {'error': 'User/Password invalid.'} + + +@api.route('/json//status') +@api.doc(description='Get the status of a capture', + params={'tree_uuid': 'The UUID of the capture'}) +class CaptureStatusQuery(Resource): + def get(self, tree_uuid: str): + return {'status_code': lookyloo.get_capture_status(tree_uuid)} + + +@api.route('/json//redirects') +@api.doc(description='Get all the redirects of a capture', + params={'tree_uuid': 'The UUID of the capture'}) +class CaptureRedirects(Resource): + def get(self, tree_uuid: str): + cache = lookyloo.capture_cache(tree_uuid) + if not cache: + return {'error': 'UUID missing in cache, try again later.'} + + to_return: Dict[str, Any] = {'response': {'url': cache.url, 'redirects': []}} + if not cache.redirects: + to_return['response']['info'] = 'No redirects' + return to_return + if cache.incomplete_redirects: + # Trigger tree build, get all redirects + lookyloo.get_crawled_tree(tree_uuid) + cache = lookyloo.capture_cache(tree_uuid) + if cache: + to_return['response']['redirects'] = cache.redirects + else: + to_return['response']['redirects'] = cache.redirects + + return to_return + + +@api.route('/json//misp_export') +@api.doc(description='Get an export of the capture in MISP format', + params={'tree_uuid': 'The UUID of the capture'}) +class MISPExport(Resource): + def get(self, tree_uuid: str): + with_parents = request.args.get('with_parents') + event = lookyloo.misp_export(tree_uuid, True if with_parents else False) + if isinstance(event, dict): + return event + + to_return = [] + for e in event: + to_return.append(e.to_json(indent=2)) + return to_return + + +misp_push_fields = api.model('MISPPushFields', { + 'allow_duplicates': fields.Integer(description="Push the event even if it is already present on the MISP instance", + example=0, min=0, max=1), + 'with_parents': fields.Integer(description="Also push the parents of the capture (if any)", + example=0, min=0, max=1), +}) + + +@api.route('/json//misp_push') +@api.doc(description='Push an event to a pre-configured MISP instance', + params={'tree_uuid': 'The UUID of the capture'}, + security='apikey') +class MISPPush(Resource): + method_decorators = [api_auth_check] + + @api.param('with_parents', 'Also push the parents of the capture (if any)') + @api.param('allow_duplicates', 'Push the event even if it is already present on the MISP instance') + def get(self, tree_uuid: str): + with_parents = True if request.args.get('with_parents') else False + allow_duplicates = True if request.args.get('allow_duplicates') else False + to_return: Dict = {} + if not lookyloo.misp.available: + to_return['error'] = 'MISP module not available.' + elif not lookyloo.misp.enable_push: + to_return['error'] = 'Push not enabled in MISP module.' + else: + event = lookyloo.misp_export(tree_uuid, with_parents) + if isinstance(event, dict): + to_return['error'] = event + else: + new_events = lookyloo.misp.push(event, allow_duplicates) + if isinstance(new_events, dict): + to_return['error'] = new_events + else: + events_to_return = [] + for e in new_events: + events_to_return.append(e.to_json(indent=2)) + return events_to_return + + return to_return + + @api.doc(body=misp_push_fields) + def post(self, tree_uuid: str): + parameters: Dict = request.get_json(force=True) + with_parents = True if parameters.get('with_parents') else False + allow_duplicates = True if parameters.get('allow_duplicates') else False + + to_return: Dict = {} + if not lookyloo.misp.available: + to_return['error'] = 'MISP module not available.' + elif not lookyloo.misp.enable_push: + to_return['error'] = 'Push not enabled in MISP module.' + else: + event = lookyloo.misp_export(tree_uuid, with_parents) + if isinstance(event, dict): + to_return['error'] = event + else: + new_events = lookyloo.misp.push(event, allow_duplicates) + if isinstance(new_events, dict): + to_return['error'] = new_events + else: + events_to_return = [] + for e in new_events: + events_to_return.append(e.to_json(indent=2)) + return events_to_return + + return to_return + + +@api.route('/json/hash_info/') +@api.doc(description='Search for a ressource with a specific hash (sha512)', + params={'h': 'The hash (sha512)'}) +class HashInfo(Resource): + def get(self, h: str): + details, body = lookyloo.get_body_hash_full(h) + if not details: + return {'error': 'Unknown Hash.'} + to_return: Dict[str, Any] = {'response': {'hash': h, 'details': details, + 'body': base64.b64encode(body.getvalue()).decode()}} + return to_return + + +url_info_fields = api.model('URLInfoFields', { + 'url': fields.String(description="The URL to search", required=True), + 'limit': fields.Integer(description="The maximal amount of captures to return", example=20), +}) + + +@api.route('/json/url_info') +@api.doc(description='Search for a URL') +class URLInfo(Resource): + + @api.doc(body=url_info_fields) + def post(self): + to_query: Dict = request.get_json(force=True) + occurrences = lookyloo.get_url_occurrences(to_query.pop('url'), **to_query) + return occurrences + + +hostname_info_fields = api.model('HostnameInfoFields', { + 'hostname': fields.String(description="The hostname to search", required=True), + 'limit': fields.Integer(description="The maximal amount of captures to return", example=20), +}) + + +@api.route('/json/hostname_info') +@api.doc(description='Search for a hostname') +class HostnameInfo(Resource): + + @api.doc(body=hostname_info_fields) + def post(self): + to_query: Dict = request.get_json(force=True) + occurrences = lookyloo.get_hostname_occurrences(to_query.pop('hostname'), **to_query) + return occurrences + + +@api.route('/json/stats') +@api.doc(description='Get the statistics of the lookyloo instance.') +class InstanceStats(Resource): + def get(self): + return lookyloo.get_stats() + + +submit_fields = api.model('SubmitFields', { + 'url': fields.String(description="The URL to capture", required=True), + 'listing': fields.Integer(description="Display the capture on the index", min=0, max=1, example=1), + 'user_agent': fields.String(description="User agent to use for the capture", example=''), + 'referer': fields.String(description="Referer to pass to the capture", example=''), + 'cookies': fields.String(description="JSON export of a list of cookies as exported from an other capture", example='') +}) + + +@api.route('/submit') +class SubmitCapture(Resource): + + @api.doc(body=submit_fields) + def post(self): + if flask_login.current_user.is_authenticated: + user = flask_login.current_user.get_id() + else: + user = src_request_ip(request) + to_query: Dict = request.get_json(force=True) + perma_uuid = lookyloo.enqueue_capture(to_query, source='api', user=user, authenticated=flask_login.current_user.is_authenticated) + return Response(perma_uuid, mimetype='text/text') + + +@api.route('/json//stats') +@api.doc(description='Get the statistics of the capture.') +class CaptureStats(Resource): + def get(self, tree_uuid: str): + return lookyloo.get_statistics(tree_uuid) diff --git a/website/web/helpers.py b/website/web/helpers.py new file mode 100644 index 00000000..74a083e4 --- /dev/null +++ b/website/web/helpers.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import hashlib +import os + +from functools import lru_cache +from pathlib import Path +from typing import Dict, List, Union + +import flask_login # type: ignore + +from werkzeug.security import generate_password_hash + +from lookyloo.helpers import get_homedir, get_config + + +def src_request_ip(request) -> str: + # NOTE: X-Real-IP is the IP passed by the reverse proxy in the headers. + real_ip = request.headers.get('X-Real-IP') + if not real_ip: + real_ip = request.remote_addr + return real_ip + + +class User(flask_login.UserMixin): + pass + + +def load_user_from_request(request): + api_key = request.headers.get('Authorization') + if not api_key: + return None + user = User() + api_key = api_key.strip() + keys_table = build_keys_table() + if api_key in keys_table: + user.id = keys_table[api_key] + return user + return None + + +@lru_cache(64) +def build_keys_table() -> Dict[str, str]: + keys_table = {} + for username, authstuff in build_users_table().items(): + if 'authkey' in authstuff: + keys_table[authstuff['authkey']] = username + return keys_table + + +@lru_cache(64) +def get_users() -> Dict[str, Union[str, List[str]]]: + try: + # Use legacy user mgmt, no need to print a warning, and it will fail on new install. + return get_config('generic', 'cache_clean_user', quiet=True) + except Exception: + return get_config('generic', 'users') + + +@lru_cache(64) +def build_users_table() -> Dict[str, Dict[str, str]]: + users_table: Dict[str, Dict[str, str]] = {} + for username, authstuff in get_users().items(): + if isinstance(authstuff, str): + # just a password, make a key + users_table[username] = {} + users_table[username]['password'] = generate_password_hash(authstuff) + users_table[username]['authkey'] = hashlib.pbkdf2_hmac('sha256', get_secret_key(), + authstuff.encode(), + 100000).hex() + + elif isinstance(authstuff, list) and len(authstuff) == 2: + if isinstance(authstuff[0], str) and isinstance(authstuff[1], str) and len(authstuff[1]) == 64: + users_table[username] = {} + users_table[username]['password'] = generate_password_hash(authstuff[0]) + users_table[username]['authkey'] = authstuff[1] + else: + raise Exception('User setup invalid. Must be "username": "password" or "username": ["password", "token 64 chars (sha256)"]') + return users_table + + +@lru_cache(64) +def get_secret_key() -> bytes: + secret_file_path: Path = get_homedir() / 'secret_key' + if not secret_file_path.exists() or secret_file_path.stat().st_size < 64: + if not secret_file_path.exists() or secret_file_path.stat().st_size < 64: + with secret_file_path.open('wb') as f: + f.write(os.urandom(64)) + with secret_file_path.open('rb') as f: + return f.read()