mirror of https://github.com/CIRCL/lookyloo
chg: Move API into a new file, cleanup
@ -1,17 +1,13 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import base64
from io import BytesIO, StringIO
import os
from pathlib import Path
from datetime import datetime, timedelta, timezone
import json
import http
import calendar
from typing import Optional, Dict, Any, Union, List
import logging
import hashlib
from urllib.parse import quote_plus, unquote_plus
import time
import pkg_resources
@ -19,28 +15,24 @@ import pkg_resources
from flask import Flask, render_template, request, send_file, redirect, url_for, Response, flash, jsonify
from flask_bootstrap import Bootstrap # type: ignore
import flask_login # type: ignore
from flask_restx import Resource, Api, fields, abort # type: ignore
from werkzeug.security import generate_password_hash, check_password_hash
from flask_restx import Api # type: ignore
from .genericapi import api as generic_api
from werkzeug.security import check_password_hash
from pymisp import MISPEvent, MISPServerError
from lookyloo.helpers import (get_homedir, update_user_agents, get_user_agents, get_config,
from lookyloo.helpers import (update_user_agents, get_user_agents, get_config,
get_taxonomies, load_cookies, CaptureStatus)
from lookyloo.lookyloo import Lookyloo, Indexing
from lookyloo.exceptions import NoValidHarFile, MissingUUID
from .proxied import ReverseProxied
from .helpers import src_request_ip, User, load_user_from_request, build_users_table, get_secret_key
app: Flask = Flask(__name__)
app.wsgi_app = ReverseProxied(app.wsgi_app) # type: ignore
secret_file_path: Path = get_homedir() / 'secret_key'
if not secret_file_path.exists() or secret_file_path.stat().st_size < 64:
with secret_file_path.open('wb') as f:
with secret_file_path.open('rb') as f:
app.config['SECRET_KEY'] = f.read()
app.config['SECRET_KEY'] = get_secret_key()
app.config['BOOTSTRAP_SERVE_LOCAL'] = True
@ -51,45 +43,11 @@ app.debug = False
# Auth stuff
login_manager = flask_login.LoginManager()
# Use legacy user mgmt, no need to print a warning, and it will fail on new install.
users = get_config('generic', 'cache_clean_user', quiet=True)
except Exception:
users = get_config('generic', 'users')
users_table: Dict[str, Dict[str, str]] = {}
for username, authstuff in users.items():
if isinstance(authstuff, str):
# just a password, make a key
users_table[username] = {}
users_table[username]['password'] = generate_password_hash(authstuff)
users_table[username]['authkey'] = hashlib.pbkdf2_hmac('sha256',
elif isinstance(authstuff, list) and len(authstuff) == 2:
if isinstance(authstuff[0], str) and isinstance(authstuff[1], str) and len(authstuff[1]) == 64:
users_table[username] = {}
users_table[username]['password'] = generate_password_hash(authstuff[0])
users_table[username]['authkey'] = authstuff[1]
if username not in users_table:
raise Exception('User setup invalid. Must be "username": "password" or "username": ["password", "token 64 chars (sha256)"]')
keys_table = {}
for username, authstuff in users_table.items():
if 'authkey' in authstuff:
keys_table[authstuff['authkey']] = username
class User(flask_login.UserMixin):
def user_loader(username):
if username not in users_table:
if username not in build_users_table():
return None
user = User()
user.id = username
@ -97,16 +55,8 @@ def user_loader(username):
def load_user_from_request(request):
api_key = request.headers.get('Authorization')
if not api_key:
return None
user = User()
api_key = api_key.strip()
if api_key in keys_table:
user.id = keys_table[api_key]
return user
return None
def _load_user_from_request(request):
return load_user_from_request(request)
@app.route('/login', methods=['GET', 'POST'])
@ -121,6 +71,7 @@ def login():
username = request.form['username']
users_table = build_users_table()
if username in users_table and check_password_hash(users_table[username]['password'], request.form['password']):
user = User()
user.id = username
@ -196,14 +147,6 @@ app.jinja_env.globals.update(month_name=month_name)
# ##### Generic/configuration methods #####
def src_request_ip(request) -> str:
# NOTE: X-Real-IP is the IP passed by the reverse proxy in the headers.
real_ip = request.headers.get('X-Real-IP')
if not real_ip:
real_ip = request.remote_addr
return real_ip
def after_request(response):
# We keep a list user agents in order to build a list to use in the capture
@ -351,6 +294,76 @@ def web_misp_lookup_view(tree_uuid: str):
return render_template('misp_lookup.html', uuid=tree_uuid, hits=hits, misp_root_url=misp_root_url)
@app.route('/tree/<string:tree_uuid>/misp_push', methods=['GET', 'POST'])
def web_misp_push_view(tree_uuid: str):
error = False
if not lookyloo.misp.available:
flash('MISP module not available.', 'error')
return redirect(url_for('tree', tree_uuid=tree_uuid))
elif not lookyloo.misp.enable_push:
flash('Push not enabled in MISP module.', 'error')
return redirect(url_for('tree', tree_uuid=tree_uuid))
event = lookyloo.misp_export(tree_uuid)
if isinstance(event, dict):
flash(f'Unable to generate the MISP export: {event}', 'error')
return redirect(url_for('tree', tree_uuid=tree_uuid))
if request.method == 'POST':
# event is a MISPEvent at this point
# Submit the event
tags = request.form.getlist('tags')
error = False
events: List[MISPEvent] = []
with_parents = request.form.get('with_parents')
if with_parents:
exports = lookyloo.misp_export(tree_uuid, True)
if isinstance(exports, dict):
flash(f'Unable to create event: {exports}', 'error')
error = True
events = exports
events = event
if error:
return redirect(url_for('tree', tree_uuid=tree_uuid))
for e in events:
for tag in tags:
# Change the event info field of the last event in the chain
events[-1].info = request.form.get('event_info')
new_events = lookyloo.misp.push(events, True if request.form.get('force_push') else False,
True if request.form.get('auto_publish') else False)
except MISPServerError:
flash(f'MISP returned an error, the event(s) might still have been created on {lookyloo.misp.client.root_url}', 'error')
if isinstance(new_events, dict):
flash(f'Unable to create event(s): {new_events}', 'error')
for e in new_events:
flash(f'MISP event {e.id} created on {lookyloo.misp.client.root_url}', 'success')
return redirect(url_for('tree', tree_uuid=tree_uuid))
# the 1st attribute in the event is the link to lookyloo
existing_misp_url = lookyloo.misp.get_existing_event_url(event[-1].attributes[0].value)
fav_tags = lookyloo.misp.get_fav_tags()
cache = lookyloo.capture_cache(tree_uuid)
return render_template('misp_push_view.html', tree_uuid=tree_uuid,
event=event[0], fav_tags=fav_tags,
has_parent=True if cache and cache.parent else False,
@app.route('/tree/<string:tree_uuid>/modules', methods=['GET'])
def modules(tree_uuid: str):
modules_responses = lookyloo.get_modules_responses(tree_uuid)
@ -791,6 +804,13 @@ def statsfull():
return render_template('stats.html', stats=stats)
@app.route('/whois/<string:query>', methods=['GET'])
def whois(query: str):
to_return = lookyloo.uwhois.whois(query)
return send_file(BytesIO(to_return.encode()),
mimetype='test/plain', as_attachment=True, attachment_filename=f'whois.{query}.txt')
# ##### Methods related to a specific URLNode #####
@app.route('/tree/<string:tree_uuid>/url/<string:node_uuid>/request_cookies', methods=['GET'])
@ -942,83 +962,6 @@ def add_context(tree_uuid: str, node_uuid: str):
return redirect(url_for('ressources'))
@app.route('/tree/<string:tree_uuid>/misp_push', methods=['GET', 'POST'])
def web_misp_push_view(tree_uuid: str):
error = False
if not lookyloo.misp.available:
flash('MISP module not available.', 'error')
return redirect(url_for('tree', tree_uuid=tree_uuid))
elif not lookyloo.misp.enable_push:
flash('Push not enabled in MISP module.', 'error')
return redirect(url_for('tree', tree_uuid=tree_uuid))
event = lookyloo.misp_export(tree_uuid)
if isinstance(event, dict):
flash(f'Unable to generate the MISP export: {event}', 'error')
return redirect(url_for('tree', tree_uuid=tree_uuid))
if request.method == 'POST':
# event is a MISPEvent at this point
# Submit the event
tags = request.form.getlist('tags')
error = False
events: List[MISPEvent] = []
with_parents = request.form.get('with_parents')
if with_parents:
exports = lookyloo.misp_export(tree_uuid, True)
if isinstance(exports, dict):
flash(f'Unable to create event: {exports}', 'error')
error = True
events = exports
events = event
if error:
return redirect(url_for('tree', tree_uuid=tree_uuid))
for e in events:
for tag in tags:
# Change the event info field of the last event in the chain
events[-1].info = request.form.get('event_info')
new_events = lookyloo.misp.push(events, True if request.form.get('force_push') else False,
True if request.form.get('auto_publish') else False)
except MISPServerError:
flash(f'MISP returned an error, the event(s) might still have been created on {lookyloo.misp.client.root_url}', 'error')
if isinstance(new_events, dict):
flash(f'Unable to create event(s): {new_events}', 'error')
for e in new_events:
flash(f'MISP event {e.id} created on {lookyloo.misp.client.root_url}', 'success')
return redirect(url_for('tree', tree_uuid=tree_uuid))
# the 1st attribute in the event is the link to lookyloo
existing_misp_url = lookyloo.misp.get_existing_event_url(event[-1].attributes[0].value)
fav_tags = lookyloo.misp.get_fav_tags()
cache = lookyloo.capture_cache(tree_uuid)
return render_template('misp_push_view.html', tree_uuid=tree_uuid,
event=event[0], fav_tags=fav_tags,
has_parent=True if cache and cache.parent else False,
@app.route('/whois/<string:query>', methods=['GET'])
def whois(query: str):
to_return = lookyloo.uwhois.whois(query)
return send_file(BytesIO(to_return.encode()),
mimetype='test/plain', as_attachment=True, attachment_filename=f'whois.{query}.txt')
# Query API
authorizations = {
@ -1035,232 +978,4 @@ api = Api(app, title='Lookyloo API',
def api_auth_check(method):
if flask_login.current_user.is_authenticated or load_user_from_request(request):
return method
abort(403, 'Authentication required.')
token_request_fields = api.model('AuthTokenFields', {
'username': fields.String(description="Your username", required=True),
'password': fields.String(description="Your password", required=True),
@api.doc(description='Get the API token required for authenticated calls')
class AuthToken(Resource):
@api.param('username', 'Your username')
@api.param('password', 'Your password')
def get(self):
username = request.args['username'] if request.args.get('username') else False
password = request.args['password'] if request.args.get('password') else False
if username in users_table and check_password_hash(users_table[username]['password'], password):
return {'authkey': users_table[username]['authkey']}
return {'error': 'User/Password invalid.'}
def post(self):
auth: Dict = request.get_json(force=True)
if 'username' in auth and 'password' in auth: # Expected keys in json
if (auth['username'] in users_table
and check_password_hash(users_table[auth['username']]['password'], auth['password'])):
return {'authkey': users_table[auth['username']]['authkey']}
return {'error': 'User/Password invalid.'}
@api.doc(description='Get the status of a capture',
params={'tree_uuid': 'The UUID of the capture'})
class CaptureStatusQuery(Resource):
def get(self, tree_uuid: str):
return {'status_code': lookyloo.get_capture_status(tree_uuid)}
@api.doc(description='Get all the redirects of a capture',
params={'tree_uuid': 'The UUID of the capture'})
class CaptureRedirects(Resource):
def get(self, tree_uuid: str):
cache = lookyloo.capture_cache(tree_uuid)
if not cache:
return {'error': 'UUID missing in cache, try again later.'}
to_return: Dict[str, Any] = {'response': {'url': cache.url, 'redirects': []}}
if not cache.redirects:
to_return['response']['info'] = 'No redirects'
return to_return
if cache.incomplete_redirects:
# Trigger tree build, get all redirects
cache = lookyloo.capture_cache(tree_uuid)
if cache:
to_return['response']['redirects'] = cache.redirects
to_return['response']['redirects'] = cache.redirects
return to_return
@api.doc(description='Get an export of the capture in MISP format',
params={'tree_uuid': 'The UUID of the capture'})
class MISPExport(Resource):
def get(self, tree_uuid: str):
with_parents = request.args.get('with_parents')
event = lookyloo.misp_export(tree_uuid, True if with_parents else False)
if isinstance(event, dict):
return event
to_return = []
for e in event:
return to_return
misp_push_fields = api.model('MISPPushFields', {
'allow_duplicates': fields.Integer(description="Push the event even if it is already present on the MISP instance",
example=0, min=0, max=1),
'with_parents': fields.Integer(description="Also push the parents of the capture (if any)",
example=0, min=0, max=1),
@api.doc(description='Push an event to a pre-configured MISP instance',
params={'tree_uuid': 'The UUID of the capture'},
class MISPPush(Resource):
method_decorators = [api_auth_check]
@api.param('with_parents', 'Also push the parents of the capture (if any)')
@api.param('allow_duplicates', 'Push the event even if it is already present on the MISP instance')
def get(self, tree_uuid: str):
with_parents = True if request.args.get('with_parents') else False
allow_duplicates = True if request.args.get('allow_duplicates') else False
to_return: Dict = {}
if not lookyloo.misp.available:
to_return['error'] = 'MISP module not available.'
elif not lookyloo.misp.enable_push:
to_return['error'] = 'Push not enabled in MISP module.'
event = lookyloo.misp_export(tree_uuid, with_parents)
if isinstance(event, dict):
to_return['error'] = event
new_events = lookyloo.misp.push(event, allow_duplicates)
if isinstance(new_events, dict):
to_return['error'] = new_events
events_to_return = []
for e in new_events:
return events_to_return
return to_return
def post(self, tree_uuid: str):
parameters: Dict = request.get_json(force=True)
with_parents = True if parameters.get('with_parents') else False
allow_duplicates = True if parameters.get('allow_duplicates') else False
to_return: Dict = {}
if not lookyloo.misp.available:
to_return['error'] = 'MISP module not available.'
elif not lookyloo.misp.enable_push:
to_return['error'] = 'Push not enabled in MISP module.'
event = lookyloo.misp_export(tree_uuid, with_parents)
if isinstance(event, dict):
to_return['error'] = event
new_events = lookyloo.misp.push(event, allow_duplicates)
if isinstance(new_events, dict):
to_return['error'] = new_events
events_to_return = []
for e in new_events:
return events_to_return
return to_return
@api.doc(description='Search for a ressource with a specific hash (sha512)',
params={'h': 'The hash (sha512)'})
class HashInfo(Resource):
def get(self, h: str):
details, body = lookyloo.get_body_hash_full(h)
if not details:
return {'error': 'Unknown Hash.'}
to_return: Dict[str, Any] = {'response': {'hash': h, 'details': details,
'body': base64.b64encode(body.getvalue()).decode()}}
return to_return
url_info_fields = api.model('URLInfoFields', {
'url': fields.String(description="The URL to search", required=True),
'limit': fields.Integer(description="The maximal amount of captures to return", example=20),
@api.doc(description='Search for a URL')
class URLInfo(Resource):
def post(self):
to_query: Dict = request.get_json(force=True)
occurrences = lookyloo.get_url_occurrences(to_query.pop('url'), **to_query)
return occurrences
hostname_info_fields = api.model('HostnameInfoFields', {
'hostname': fields.String(description="The hostname to search", required=True),
'limit': fields.Integer(description="The maximal amount of captures to return", example=20),
@api.doc(description='Search for a hostname')
class HostnameInfo(Resource):
def post(self):
to_query: Dict = request.get_json(force=True)
occurrences = lookyloo.get_hostname_occurrences(to_query.pop('hostname'), **to_query)
return occurrences
@api.doc(description='Get the statistics of the lookyloo instance.')
class InstanceStats(Resource):
def get(self):
return lookyloo.get_stats()
submit_fields = api.model('SubmitFields', {
'url': fields.String(description="The URL to capture", required=True),
'listing': fields.Integer(description="Display the capture on the index", min=0, max=1, example=1),
'user_agent': fields.String(description="User agent to use for the capture", example=''),
'referer': fields.String(description="Referer to pass to the capture", example=''),
'cookies': fields.String(description="JSON export of a list of cookies as exported from an other capture", example='')
class SubmitCapture(Resource):
def post(self):
if flask_login.current_user.is_authenticated:
user = flask_login.current_user.get_id()
user = src_request_ip(request)
to_query: Dict = request.get_json(force=True)
perma_uuid = lookyloo.enqueue_capture(to_query, source='api', user=user, authenticated=flask_login.current_user.is_authenticated)
return Response(perma_uuid, mimetype='text/text')
@ -0,0 +1,258 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import base64
from typing import Dict, Any
from flask import request, Response
import flask_login # type: ignore
from flask_restx import Namespace, Resource, fields, abort # type: ignore
from werkzeug.security import check_password_hash
from lookyloo.lookyloo import Lookyloo
from .helpers import src_request_ip, load_user_from_request, build_users_table
api = Namespace('GenericAPI', description='Generic Lookyloo API', path='/')
lookyloo: Lookyloo = Lookyloo()
def api_auth_check(method):
if flask_login.current_user.is_authenticated or load_user_from_request(request):
return method
abort(403, 'Authentication required.')
token_request_fields = api.model('AuthTokenFields', {
'username': fields.String(description="Your username", required=True),
'password': fields.String(description="Your password", required=True),
@api.doc(description='Get the API token required for authenticated calls')
class AuthToken(Resource):
users_table = build_users_table()
@api.param('username', 'Your username')
@api.param('password', 'Your password')
def get(self):
username = request.args['username'] if request.args.get('username') else False
password = request.args['password'] if request.args.get('password') else False
if username in self.users_table and check_password_hash(self.users_table[username]['password'], password):
return {'authkey': self.users_table[username]['authkey']}
return {'error': 'User/Password invalid.'}
def post(self):
auth: Dict = request.get_json(force=True)
if 'username' in auth and 'password' in auth: # Expected keys in json
if (auth['username'] in self.users_table
and check_password_hash(self.users_table[auth['username']]['password'], auth['password'])):
return {'authkey': self.users_table[auth['username']]['authkey']}
return {'error': 'User/Password invalid.'}
@api.doc(description='Get the status of a capture',
params={'tree_uuid': 'The UUID of the capture'})
class CaptureStatusQuery(Resource):
def get(self, tree_uuid: str):
return {'status_code': lookyloo.get_capture_status(tree_uuid)}
@api.doc(description='Get all the redirects of a capture',
params={'tree_uuid': 'The UUID of the capture'})
class CaptureRedirects(Resource):
def get(self, tree_uuid: str):
cache = lookyloo.capture_cache(tree_uuid)
if not cache:
return {'error': 'UUID missing in cache, try again later.'}
to_return: Dict[str, Any] = {'response': {'url': cache.url, 'redirects': []}}
if not cache.redirects:
to_return['response']['info'] = 'No redirects'
return to_return
if cache.incomplete_redirects:
# Trigger tree build, get all redirects
cache = lookyloo.capture_cache(tree_uuid)
if cache:
to_return['response']['redirects'] = cache.redirects
to_return['response']['redirects'] = cache.redirects
return to_return
@api.doc(description='Get an export of the capture in MISP format',
params={'tree_uuid': 'The UUID of the capture'})
class MISPExport(Resource):
def get(self, tree_uuid: str):
with_parents = request.args.get('with_parents')
event = lookyloo.misp_export(tree_uuid, True if with_parents else False)
if isinstance(event, dict):
return event
to_return = []
for e in event:
return to_return
misp_push_fields = api.model('MISPPushFields', {
'allow_duplicates': fields.Integer(description="Push the event even if it is already present on the MISP instance",
example=0, min=0, max=1),
'with_parents': fields.Integer(description="Also push the parents of the capture (if any)",
example=0, min=0, max=1),
@api.doc(description='Push an event to a pre-configured MISP instance',
params={'tree_uuid': 'The UUID of the capture'},
class MISPPush(Resource):
method_decorators = [api_auth_check]
@api.param('with_parents', 'Also push the parents of the capture (if any)')
@api.param('allow_duplicates', 'Push the event even if it is already present on the MISP instance')
def get(self, tree_uuid: str):
with_parents = True if request.args.get('with_parents') else False
allow_duplicates = True if request.args.get('allow_duplicates') else False
to_return: Dict = {}
if not lookyloo.misp.available:
to_return['error'] = 'MISP module not available.'
elif not lookyloo.misp.enable_push:
to_return['error'] = 'Push not enabled in MISP module.'
event = lookyloo.misp_export(tree_uuid, with_parents)
if isinstance(event, dict):
to_return['error'] = event
new_events = lookyloo.misp.push(event, allow_duplicates)
if isinstance(new_events, dict):
to_return['error'] = new_events
events_to_return = []
for e in new_events:
return events_to_return
return to_return
def post(self, tree_uuid: str):
parameters: Dict = request.get_json(force=True)
with_parents = True if parameters.get('with_parents') else False
allow_duplicates = True if parameters.get('allow_duplicates') else False
to_return: Dict = {}
if not lookyloo.misp.available:
to_return['error'] = 'MISP module not available.'
elif not lookyloo.misp.enable_push:
to_return['error'] = 'Push not enabled in MISP module.'
event = lookyloo.misp_export(tree_uuid, with_parents)
if isinstance(event, dict):
to_return['error'] = event
new_events = lookyloo.misp.push(event, allow_duplicates)
if isinstance(new_events, dict):
to_return['error'] = new_events
events_to_return = []
for e in new_events:
return events_to_return
return to_return
@api.doc(description='Search for a ressource with a specific hash (sha512)',
params={'h': 'The hash (sha512)'})
class HashInfo(Resource):
def get(self, h: str):
details, body = lookyloo.get_body_hash_full(h)
if not details:
return {'error': 'Unknown Hash.'}
to_return: Dict[str, Any] = {'response': {'hash': h, 'details': details,
'body': base64.b64encode(body.getvalue()).decode()}}
return to_return
url_info_fields = api.model('URLInfoFields', {
'url': fields.String(description="The URL to search", required=True),
'limit': fields.Integer(description="The maximal amount of captures to return", example=20),
@api.doc(description='Search for a URL')
class URLInfo(Resource):
def post(self):
to_query: Dict = request.get_json(force=True)
occurrences = lookyloo.get_url_occurrences(to_query.pop('url'), **to_query)
return occurrences
hostname_info_fields = api.model('HostnameInfoFields', {
'hostname': fields.String(description="The hostname to search", required=True),
'limit': fields.Integer(description="The maximal amount of captures to return", example=20),
@api.doc(description='Search for a hostname')
class HostnameInfo(Resource):
def post(self):
to_query: Dict = request.get_json(force=True)
occurrences = lookyloo.get_hostname_occurrences(to_query.pop('hostname'), **to_query)
return occurrences
@api.doc(description='Get the statistics of the lookyloo instance.')
class InstanceStats(Resource):
def get(self):
return lookyloo.get_stats()
submit_fields = api.model('SubmitFields', {
'url': fields.String(description="The URL to capture", required=True),
'listing': fields.Integer(description="Display the capture on the index", min=0, max=1, example=1),
'user_agent': fields.String(description="User agent to use for the capture", example=''),
'referer': fields.String(description="Referer to pass to the capture", example=''),
'cookies': fields.String(description="JSON export of a list of cookies as exported from an other capture", example='')
class SubmitCapture(Resource):
def post(self):
if flask_login.current_user.is_authenticated:
user = flask_login.current_user.get_id()
user = src_request_ip(request)
to_query: Dict = request.get_json(force=True)
perma_uuid = lookyloo.enqueue_capture(to_query, source='api', user=user, authenticated=flask_login.current_user.is_authenticated)
return Response(perma_uuid, mimetype='text/text')
@api.doc(description='Get the statistics of the capture.')
class CaptureStats(Resource):
def get(self, tree_uuid: str):
return lookyloo.get_statistics(tree_uuid)
@ -0,0 +1,91 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import hashlib
import os
from functools import lru_cache
from pathlib import Path
from typing import Dict, List, Union
import flask_login # type: ignore
from werkzeug.security import generate_password_hash
from lookyloo.helpers import get_homedir, get_config
def src_request_ip(request) -> str:
# NOTE: X-Real-IP is the IP passed by the reverse proxy in the headers.
real_ip = request.headers.get('X-Real-IP')
if not real_ip:
real_ip = request.remote_addr
return real_ip
class User(flask_login.UserMixin):
def load_user_from_request(request):
api_key = request.headers.get('Authorization')
if not api_key:
return None
user = User()
api_key = api_key.strip()
keys_table = build_keys_table()
if api_key in keys_table:
user.id = keys_table[api_key]
return user
return None
def build_keys_table() -> Dict[str, str]:
keys_table = {}
for username, authstuff in build_users_table().items():
if 'authkey' in authstuff:
keys_table[authstuff['authkey']] = username
return keys_table
def get_users() -> Dict[str, Union[str, List[str]]]:
# Use legacy user mgmt, no need to print a warning, and it will fail on new install.
return get_config('generic', 'cache_clean_user', quiet=True)
except Exception:
return get_config('generic', 'users')
def build_users_table() -> Dict[str, Dict[str, str]]:
users_table: Dict[str, Dict[str, str]] = {}
for username, authstuff in get_users().items():
if isinstance(authstuff, str):
# just a password, make a key
users_table[username] = {}
users_table[username]['password'] = generate_password_hash(authstuff)
users_table[username]['authkey'] = hashlib.pbkdf2_hmac('sha256', get_secret_key(),
elif isinstance(authstuff, list) and len(authstuff) == 2:
if isinstance(authstuff[0], str) and isinstance(authstuff[1], str) and len(authstuff[1]) == 64:
users_table[username] = {}
users_table[username]['password'] = generate_password_hash(authstuff[0])
users_table[username]['authkey'] = authstuff[1]
raise Exception('User setup invalid. Must be "username": "password" or "username": ["password", "token 64 chars (sha256)"]')
return users_table
def get_secret_key() -> bytes:
secret_file_path: Path = get_homedir() / 'secret_key'
if not secret_file_path.exists() or secret_file_path.stat().st_size < 64:
if not secret_file_path.exists() or secret_file_path.stat().st_size < 64:
with secret_file_path.open('wb') as f:
with secret_file_path.open('rb') as f:
return f.read()
Reference in New Issue