diff --git a/bin/async_capture.py b/bin/async_capture.py index d471784..7e017c1 100755 --- a/bin/async_capture.py +++ b/bin/async_capture.py @@ -116,7 +116,8 @@ class AsyncCapture(AbstractManager): if send_report: self.lookyloo.send_mail(uuid, email=settings.get('email', ''), - comment=settings.get('comment')) + comment=settings.get('comment'), + recipient_mail=settings.get("recipient_mail")) lazy_cleanup = self.lookyloo.redis.pipeline() if queue and self.lookyloo.redis.zscore('queues', queue): diff --git a/config/users/.keepdir b/config/users/.keepdir new file mode 100644 index 0000000..e69de29 diff --git a/config/users/admin.json.sample b/config/users/admin.json.sample new file mode 100644 index 0000000..62d31e0 --- /dev/null +++ b/config/users/admin.json.sample @@ -0,0 +1,7 @@ +{ + "overwrite": true, + "listing": false, + "auto_report": { + "recipient_mail": "analyst@test.de" + } +} diff --git a/lookyloo/default/helpers.py b/lookyloo/default/helpers.py index 4631a8f..b28446d 100644 --- a/lookyloo/default/helpers.py +++ b/lookyloo/default/helpers.py @@ -57,6 +57,10 @@ def load_configs(path_to_config_files: str | Path | None=None) -> None: for path in config_path.glob('*.json'): with path.open() as _c: configs[path.stem] = json.load(_c) + user_path = config_path / 'users' + for path in user_path.glob('*.json'): + with path.open() as _c: + configs[path.stem] = json.load(_c) @lru_cache(64) diff --git a/lookyloo/helpers.py b/lookyloo/helpers.py index 30da76b..01ee29a 100644 --- a/lookyloo/helpers.py +++ b/lookyloo/helpers.py @@ -29,6 +29,7 @@ from werkzeug.utils import cached_property from .default import get_homedir, safe_create_dir, get_config, LookylooException + logger = logging.getLogger('Lookyloo - Helpers') diff --git a/lookyloo/lookyloo.py b/lookyloo/lookyloo.py index af66733..d82282b 100644 --- a/lookyloo/lookyloo.py +++ b/lookyloo/lookyloo.py @@ -16,6 +16,7 @@ import time from collections import defaultdict from datetime import date, datetime, timedelta, timezone +from functools import lru_cache from email.message import EmailMessage from functools import cached_property from io import BytesIO @@ -78,6 +79,22 @@ class CaptureSettings(CaptureSettingsCore, total=False): parent: str | None +# overwrite set to True means the settings in the config file overwrite the settings +# provided by the user. False will simply append the settings from the config file if they +# don't exist. +class UserCaptureSettings(CaptureSettings, total=False): + overwrite: bool + + +@lru_cache(64) +def load_user_config(username: str) -> UserCaptureSettings | None: + user_config_path = get_homedir() / 'config' / 'users' / f'{username}.json' + if not user_config_path.exists(): + return None + with user_config_path.open() as _c: + return json.load(_c) + + class Lookyloo(): def __init__(self, cache_max_size: int | None=None) -> None: @@ -629,6 +646,26 @@ class Lookyloo(): query['document'] = document return query + def _apply_user_config(self, query: CaptureSettings, user_config: UserCaptureSettings) -> CaptureSettings: + def recursive_merge(dict1: CaptureSettings | UserCaptureSettings, + dict2: CaptureSettings | UserCaptureSettings) -> CaptureSettings: + # dict2 overwrites dict1 + for key, value in dict2.items(): + if key in dict1 and isinstance(dict1[key], dict) and isinstance(value, dict): # type: ignore[literal-required] + # Recursively merge nested dictionaries + dict1[key] = recursive_merge(dict1[key], value) # type: ignore[literal-required,arg-type] + else: + # Merge non-dictionary values + dict1[key] = value # type: ignore[literal-required] + return dict1 + + # merge + if user_config.pop('overwrite', None): + # config from file takes priority + return recursive_merge(query, user_config) + else: + return recursive_merge(user_config, query) + def enqueue_capture(self, query: CaptureSettings, source: str, user: str, authenticated: bool) -> str: '''Enqueue a query in the capture queue (used by the UI and the API for asynchronous processing)''' @@ -652,6 +689,9 @@ class Lookyloo(): query[key] = json.dumps(value) if value else None # type: ignore[literal-required] query = self._prepare_lacus_query(query) + if authenticated: + if user_config := load_user_config(user): + query = self._apply_user_config(query, user_config) priority = get_priority(source, user, authenticated) if priority < -100: @@ -864,7 +904,8 @@ class Lookyloo(): return f"Malicious capture according to {len(modules)} module(s): {', '.join(modules)}" - def send_mail(self, capture_uuid: str, /, email: str='', comment: str | None=None) -> bool | dict[str, Any]: + def send_mail(self, capture_uuid: str, /, email: str='', comment: str | None=None, + recipient_mail: str | None = None) -> bool | dict[str, Any]: '''Send an email notification regarding a specific capture''' if not get_config('generic', 'enable_mail_notification'): return {"error": "Unable to send mail: mail notification disabled"} @@ -913,7 +954,7 @@ class Lookyloo(): msg['From'] = email_config['from'] if email: msg['Reply-To'] = email - msg['To'] = email_config['to'] + msg['To'] = email_config['to'] if not recipient_mail else recipient_mail msg['Subject'] = email_config['subject'] body = get_email_template() body = body.format( diff --git a/website/web/__init__.py b/website/web/__init__.py index 43809e6..8fc3ee1 100644 --- a/website/web/__init__.py +++ b/website/web/__init__.py @@ -1588,7 +1588,7 @@ def capture_web() -> str | Response | WerkzeugResponse: if request.form.get('proxy'): parsed_proxy = urlparse(request.form['proxy']) if parsed_proxy.scheme and parsed_proxy.hostname and parsed_proxy.port: - if parsed_proxy.scheme in ['http', 'https', 'socks5']: + if parsed_proxy.scheme in ['http', 'https', 'socks5', 'socks5h']: if (parsed_proxy.username and parsed_proxy.password) or (not parsed_proxy.username and not parsed_proxy.password): capture_query['proxy'] = request.form['proxy'] else: @@ -1637,6 +1637,38 @@ def capture_web() -> str | Response | WerkzeugResponse: return _prepare_capture_template(user_ua=request.headers.get('User-Agent')) +@app.route('/simple_capture', methods=['GET', 'POST']) +@flask_login.login_required # type: ignore[misc] +def simple_capture() -> str | Response | WerkzeugResponse: + user = flask_login.current_user.get_id() + if request.method == 'POST': + if not (request.form.get('url') or request.form.get('urls')): + flash('Invalid submission: please submit at least a URL.', 'error') + return render_template('simple_capture.html') + capture_query: CaptureSettings = {} + if request.form.get('url'): + capture_query['url'] = request.form['url'] + perma_uuid = lookyloo.enqueue_capture(capture_query, source='web', user=user, + authenticated=flask_login.current_user.is_authenticated) + time.sleep(2) + if perma_uuid: + flash('Recording is in progress and is reported automatically.', 'success') + return redirect(url_for('simple_capture')) + elif request.form.get('urls'): + for url in request.form['urls'].strip().split('\n'): + if not url: + continue + query = capture_query.copy() + query['url'] = url + new_capture_uuid = lookyloo.enqueue_capture(query, source='web', user=user, + authenticated=flask_login.current_user.is_authenticated) + if new_capture_uuid: + flash('Recording is in progress and is reported automatically.', 'success') + return redirect(url_for('simple_capture')) + # render template + return render_template('simple_capture.html') + + @app.route('/cookies/', methods=['GET']) def cookies_name_detail(cookie_name: str) -> str: captures, domains = get_cookie_name_investigator(cookie_name.strip()) diff --git a/website/web/helpers.py b/website/web/helpers.py index 4a107a8..6464895 100644 --- a/website/web/helpers.py +++ b/website/web/helpers.py @@ -5,6 +5,7 @@ from __future__ import annotations import hashlib import json import os +import re from functools import lru_cache from pathlib import Path @@ -50,6 +51,10 @@ def load_user_from_request(request: Request) -> User | None: return None +def is_valid_username(username: str) -> bool: + return bool(re.match("^[A-Za-z0-9]+$", username)) + + @lru_cache(64) def build_keys_table() -> dict[str, str]: keys_table = {} @@ -72,6 +77,9 @@ def get_users() -> dict[str, str | list[str]]: def build_users_table() -> dict[str, dict[str, str]]: users_table: dict[str, dict[str, str]] = {} for username, authstuff in get_users().items(): + if not is_valid_username(username): + raise Exception('Invalid username, can only contain characters and numbers.') + if isinstance(authstuff, str): # just a password, make a key users_table[username] = {} diff --git a/website/web/templates/index.html b/website/web/templates/index.html index 7b3f30f..3f80695 100644 --- a/website/web/templates/index.html +++ b/website/web/templates/index.html @@ -83,6 +83,11 @@ $(document).ready(function () { + {% if current_user.is_authenticated %} + + + + {% endif %}

{{ render_messages(container=True, dismissible=True) }} diff --git a/website/web/templates/simple_capture.html b/website/web/templates/simple_capture.html new file mode 100644 index 0000000..3a69e22 --- /dev/null +++ b/website/web/templates/simple_capture.html @@ -0,0 +1,103 @@ +{% extends "main.html" %} +{% from 'bootstrap5/utils.html' import render_messages %} +{% block title %}Capture{% endblock %} + +{% block card %} + + + + + + +{% endblock %} + +{% block content %} +
+
+ + Lookyloo + +
+ {{ render_messages(container=True, dismissible=True) }} +
+ + + +
+ +
+
+ +
+
+
+{% endblock %} + +{% block scripts %} + {{ super() }} + + + +{% endblock %}