From 804bab6d2fe59dd7ba38627832d668fc71e1f610 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Fri, 19 Jul 2024 16:12:24 +0200 Subject: [PATCH] new: Use Pydantic for CaptureSettings --- bin/async_capture.py | 31 ++----- bin/background_processing.py | 72 ++++++++-------- lookyloo/capturecache.py | 9 +- lookyloo/helpers.py | 105 +++++++--------------- lookyloo/lookyloo.py | 163 +++++++++++++---------------------- website/web/__init__.py | 38 ++++---- website/web/genericapi.py | 12 +-- 7 files changed, 164 insertions(+), 266 deletions(-) diff --git a/bin/async_capture.py b/bin/async_capture.py index 2354860..6283524 100755 --- a/bin/async_capture.py +++ b/bin/async_capture.py @@ -3,7 +3,6 @@ from __future__ import annotations import asyncio -import json import logging import logging.config import signal @@ -78,19 +77,14 @@ class AsyncCapture(AbstractManager): self.lookyloo.redis.sadd('ongoing', uuid) queue: str | None = self.lookyloo.redis.getdel(f'{uuid}_mgmt') - to_capture: CaptureSettings = self.lookyloo.get_capture_settings(uuid) - - if get_config('generic', 'default_public'): - # By default, the captures are on the index, unless the user mark them as un-listed - listing = False if ('listing' in to_capture and to_capture['listing'] == 0) else True - else: - # By default, the captures are not on the index, unless the user mark them as listed - listing = True if ('listing' in to_capture and to_capture['listing'] == 1) else False + to_capture: CaptureSettings | None = self.lookyloo.get_capture_settings(uuid) + if not to_capture: + continue self.lookyloo.store_capture( - uuid, listing, - os=to_capture.get('os'), browser=to_capture.get('browser'), - parent=to_capture.get('parent'), + uuid, to_capture.listing, + os=to_capture.os, browser=to_capture.browser, + parent=to_capture.parent, downloaded_filename=entries.get('downloaded_filename'), downloaded_file=entries.get('downloaded_file'), error=entries.get('error'), har=entries.get('har'), @@ -101,18 +95,11 @@ class AsyncCapture(AbstractManager): potential_favicons=entries.get('potential_favicons') ) - if 'auto_report' in to_capture: + if to_capture.auto_report: send_report = True settings = {} - if isinstance(to_capture['auto_report'], str): - if to_capture['auto_report'].isdigit(): - # auto_report was a bool in the submission, it can be 1 or 0. 0 means no. - if to_capture['auto_report'] == '0': - send_report = False - else: - settings = json.loads(to_capture['auto_report']) - elif isinstance(to_capture['auto_report'], dict): - settings = to_capture['auto_report'] + if isinstance(to_capture.auto_report, dict): + settings = to_capture.auto_report if send_report: self.lookyloo.send_mail(uuid, email=settings.get('email', ''), diff --git a/bin/background_processing.py b/bin/background_processing.py index 0bee4bc..96fddbb 100755 --- a/bin/background_processing.py +++ b/bin/background_processing.py @@ -14,7 +14,7 @@ from lacuscore import CaptureStatus as CaptureStatusCore from lookyloo import Lookyloo from lookyloo.exceptions import LacusUnreachable from lookyloo.default import AbstractManager, get_config, get_homedir, safe_create_dir -from lookyloo.helpers import ParsedUserAgent, serialize_to_json, CaptureSettings +from lookyloo.helpers import ParsedUserAgent, serialize_to_json from pylacus import CaptureStatus as CaptureStatusPy logging.config.dictConfig(get_config('logging')) @@ -109,41 +109,41 @@ class Processing(AbstractManager): continue self.logger.info(f'Found a non-queued capture ({uuid}), retrying now.') # This capture couldn't be queued and we created the uuid locally - query: CaptureSettings = self.lookyloo.get_capture_settings(uuid) - try: - new_uuid = self.lookyloo.lacus.enqueue( - url=query.get('url', None), - document_name=query.get('document_name', None), - document=query.get('document', None), - # depth=query.get('depth', 0), - browser=query.get('browser', None), - device_name=query.get('device_name', None), - user_agent=query.get('user_agent', None), - proxy=query.get('proxy', None), - general_timeout_in_sec=query.get('general_timeout_in_sec', None), - cookies=query.get('cookies', None), - headers=query.get('headers', None), - http_credentials=query.get('http_credentials', None), - viewport=query.get('viewport', None), - referer=query.get('referer', None), - rendered_hostname_only=query.get('rendered_hostname_only', True), - # force=query.get('force', False), - # recapture_interval=query.get('recapture_interval', 300), - priority=query.get('priority', 0), - uuid=uuid - ) - if new_uuid != uuid: - # somehow, between the check and queuing, the UUID isn't UNKNOWN anymore, just checking that - self.logger.warning(f'Had to change the capture UUID (duplicate). Old: {uuid} / New: {new_uuid}') - except LacusUnreachable: - self.logger.warning('Lacus still unreachable.') - break - except Exception as e: - self.logger.warning(f'Still unable to enqueue capture: {e}') - break - else: - self.lookyloo.redis.hdel(uuid, 'not_queued') - self.logger.info(f'{uuid} enqueued.') + if query := self.lookyloo.get_capture_settings(uuid): + try: + new_uuid = self.lookyloo.lacus.enqueue( + url=query.url, + document_name=query.document_name, + document=query.document, + # depth=query.depth, + browser=query.browser, + device_name=query.device_name, + user_agent=query.user_agent, + proxy=query.proxy, + general_timeout_in_sec=query.general_timeout_in_sec, + cookies=query.cookies, + headers=query.headers, + http_credentials=query.http_credentials, + viewport=query.viewport, + referer=query.referer, + rendered_hostname_only=query.rendered_hostname_only, + # force=query.force, + # recapture_interval=query.recapture_interval, + priority=query.priority, + uuid=uuid + ) + if new_uuid != uuid: + # somehow, between the check and queuing, the UUID isn't UNKNOWN anymore, just checking that + self.logger.warning(f'Had to change the capture UUID (duplicate). Old: {uuid} / New: {new_uuid}') + except LacusUnreachable: + self.logger.warning('Lacus still unreachable.') + break + except Exception as e: + self.logger.warning(f'Still unable to enqueue capture: {e}') + break + else: + self.lookyloo.redis.hdel(uuid, 'not_queued') + self.logger.info(f'{uuid} enqueued.') def main() -> None: diff --git a/lookyloo/capturecache.py b/lookyloo/capturecache.py index 2581d0d..f3cfcba 100644 --- a/lookyloo/capturecache.py +++ b/lookyloo/capturecache.py @@ -431,8 +431,13 @@ class CapturesIndex(Mapping): # type: ignore[type-arg] capture_settings_file = capture_dir / 'capture_settings.json' if capture_settings_file.exists(): with capture_settings_file.open() as f: - capture_settings = json.loads(f.read()) - + _s = f.read() + try: + capture_settings = json.loads(_s) + capture_settings.get('url') + except AttributeError: + # That's if we have broken dumps that are twice json encoded + capture_settings = json.load(capture_settings) if capture_settings.get('url') and capture_settings['url'] is not None: cache['url'] = capture_settings['url'] diff --git a/lookyloo/helpers.py b/lookyloo/helpers.py index 313b837..f75e2a1 100644 --- a/lookyloo/helpers.py +++ b/lookyloo/helpers.py @@ -15,6 +15,8 @@ from functools import lru_cache from importlib.metadata import version from io import BufferedIOBase from pathlib import Path +from pydantic import field_validator +from pydantic_core import from_json from typing import Any from urllib.parse import urlparse @@ -83,68 +85,6 @@ def get_email_template() -> str: return f.read() -def cast_capture_settings(capture_settings: dict[str, str]) -> CaptureSettings: - to_return: CaptureSettings = {} - # NOTE: Replace the if / else below with a case / match as soon as we require python 3.10+ - for setting_key, setting_value in capture_settings.items(): - if setting_key == 'listing': - to_return['listing'] = bool(int(setting_value)) - elif setting_key == 'not_queued': - to_return['not_queued'] = bool(int(setting_value)) - elif setting_key == 'auto_report': - if isinstance(setting_value, str) and setting_value: - if setting_value.startswith('{'): - to_return['auto_report'] = json.loads(setting_value) - elif setting_value.isdigit(): - to_return['auto_report'] = bool(int(setting_value)) - else: - to_return['auto_report'] = setting_value - elif setting_key == 'proxy' and setting_value: - if setting_value.startswith('{'): - to_return['proxy'] = json.loads(setting_value) - else: - to_return['proxy'] = setting_value - elif setting_key in ('dnt', 'browser_name', 'os', 'parent'): - to_return[setting_key] = setting_value # type: ignore[literal-required] - # Lacus core keys - elif setting_key == 'general_timeout_in_sec': - to_return['general_timeout_in_sec'] = int(setting_value) - elif setting_key == 'cookies': - to_return['cookies'] = load_cookies(setting_value) - elif setting_key == 'headers': - to_return['headers'] = json.loads(setting_value) - elif setting_key == 'http_credentials': - to_return['http_credentials'] = json.loads(setting_value) - elif setting_key == 'geolocation': - to_return['geolocation'] = json.loads(setting_value) - elif setting_key == 'viewport': - to_return['viewport'] = json.loads(setting_value) - elif setting_key == 'with_favicon': - to_return['with_favicon'] = bool(int(setting_value)) - elif setting_key == 'allow_tracking': - to_return['allow_tracking'] = bool(int(setting_value)) - elif setting_key == 'force': - to_return['force'] = bool(int(setting_value)) - elif setting_key == 'recapture_interval': - to_return['recapture_interval'] = int(setting_value) - elif setting_key == 'priority': - to_return['priority'] = int(setting_value) - elif setting_key == 'depth': - to_return['depth'] = int(setting_value) - elif setting_key == 'rendered_hostname_only': - to_return['rendered_hostname_only'] = bool(int(setting_value)) - elif setting_key in ('url', 'document_name', 'document', 'browser', 'device_name', - 'user_agent', 'timezone_id', 'locale', 'color_scheme', 'referer', - 'uuid') and setting_value: - # Value is a non-empty string, keep it as-is - to_return[setting_key] = setting_value # type: ignore[literal-required] - else: - # NOTE: we may have to add more settings here, will be fixed with pydantic soon. - # raise InvalidCaptureSetting(f'Unknown setting: {setting_key} with value: {setting_value}') - print(f'Unknown setting: {setting_key} with value: {setting_value}') - return to_return - - @lru_cache def load_takedown_filters() -> tuple[re.Pattern[str], re.Pattern[str], dict[str, list[str]]]: filter_ini_file = get_homedir() / 'config' / 'takedown_filters.ini' @@ -458,26 +398,39 @@ class ParsedUserAgent(UserAgent): return f'OS: {self.platform} - Browser: {self.browser} {self.version} - UA: {self.string}' -class CaptureSettings(LacuscoreCaptureSettings, total=False): +class CaptureSettings(LacuscoreCaptureSettings): '''The capture settings that can be passed to Lookyloo''' - listing: bool | int | None - not_queued: bool | int | None - auto_report: bool | str | dict[str, str] | None # {'email': , 'comment': , 'recipient_mail':} - dnt: str | None - browser_name: str | None - os: str | None - parent: str | None + listing: bool = get_config('generic', 'default_public') + not_queued: bool = False + auto_report: bool | dict[str, str] | None = None # {'email': , 'comment': , 'recipient_mail':} + dnt: str | None = None + browser_name: str | None = None + os: str | None = None + parent: str | None = None + @field_validator('auto_report', mode='before') + @classmethod + def load_auto_report_json(cls, v: Any) -> bool | dict[str, str] | None: + if isinstance(v, str): + if v.isdigit(): + return bool(v) + elif v.startswith('{'): + return from_json(v) + elif isinstance(v, dict): + return v + return v -# overwrite set to True means the settings in the config file overwrite the settings -# provided by the user. False will simply append the settings from the config file if they -# don't exist. -class UserCaptureSettings(CaptureSettings, total=False): - overwrite: bool + @field_validator('cookies', mode='before') + @classmethod + def load_cookies(cls, v: Any) -> list[dict[str, Any]] | None: + # NOTE: Lookyloo can get the cookies in somewhat weird formats, mornalizing them + if v: + return load_cookies(v) + return None @lru_cache(64) -def load_user_config(username: str) -> UserCaptureSettings | None: +def load_user_config(username: str) -> dict[str, Any] | None: user_config_path = get_homedir() / 'config' / 'users' / f'{username}.json' if not user_config_path.exists(): return None diff --git a/lookyloo/lookyloo.py b/lookyloo/lookyloo.py index 029413e..0d6a824 100644 --- a/lookyloo/lookyloo.py +++ b/lookyloo/lookyloo.py @@ -57,10 +57,9 @@ from .exceptions import (MissingCaptureDirectory, MissingUUID, TreeNeedsRebuild, NoValidHarFile, LacusUnreachable) from .helpers import (get_captures_dir, get_email_template, get_resources_hashes, get_taxonomies, - uniq_domains, ParsedUserAgent, load_cookies, UserAgents, + uniq_domains, ParsedUserAgent, UserAgents, get_useragent_for_requests, load_takedown_filters, - CaptureSettings, UserCaptureSettings, load_user_config, - cast_capture_settings + CaptureSettings, load_user_config ) from .modules import (MISPs, PhishingInitiative, UniversalWhois, UrlScan, VirusTotal, Phishtank, Hashlookup, @@ -287,17 +286,17 @@ class Lookyloo(): json.dump(meta, f) return meta - def get_capture_settings(self, capture_uuid: str, /) -> CaptureSettings: + def get_capture_settings(self, capture_uuid: str, /) -> CaptureSettings | None: if capture_settings := self.redis.hgetall(capture_uuid): - return cast_capture_settings(capture_settings) + return CaptureSettings(**capture_settings) cache = self.capture_cache(capture_uuid) if not cache: - return {} + return None cs_file = cache.capture_dir / 'capture_settings.json' if cs_file.exists(): with cs_file.open('r') as f: - return cast_capture_settings(json.load(f)) - return {} + return CaptureSettings(**json.load(f)) + return None def categories_capture(self, capture_uuid: str, /) -> dict[str, Any]: '''Get all the categories related to a capture, in MISP Taxonomies format''' @@ -602,67 +601,24 @@ class Lookyloo(): self._captures_index.reload_cache(capture_uuid) return self._captures_index[capture_uuid].tree - def _prepare_lacus_query(self, query: CaptureSettings) -> CaptureSettings: - # Remove the none, it makes redis unhappy - query = {k: v for k, v in query.items() if v is not None} # type: ignore[assignment] - - if 'url' in query and query['url'] is not None: - # Make sure the URL does not have any space or newline - query['url'] = query['url'].strip() - - # NOTE: Lookyloo' capture can pass a do not track header independently from the default headers, merging it here - headers = query.pop('headers', {}) - if 'dnt' in query: - if isinstance(headers, str): - headers += f'\nDNT: {query.pop("dnt")}' - headers = headers.strip() - elif isinstance(headers, dict): - dnt_entry = query.pop("dnt") - if dnt_entry: - headers['DNT'] = dnt_entry.strip() - - if headers: - query['headers'] = headers - - # NOTE: Lookyloo can get the cookies in somewhat weird formats, mornalizing them - query['cookies'] = load_cookies(query.pop('cookies', None)) - - # NOTE: Make sure we have a useragent - user_agent = query.pop('user_agent', None) - if not user_agent: - # Catch case where the UA is broken on the UI, and the async submission. - self.user_agents.user_agents # triggers an update of the default UAs - if 'device_name' not in query: - query['user_agent'] = user_agent if user_agent else self.user_agents.default['useragent'] - - # NOTE: the document must be base64 encoded - document: str | bytes | None = query.pop('document', None) - if document: - if isinstance(document, bytes): - query['document'] = base64.b64encode(document).decode() - else: - query['document'] = document - return query - - def _apply_user_config(self, query: CaptureSettings, user_config: UserCaptureSettings) -> CaptureSettings: - def recursive_merge(dict1: CaptureSettings | UserCaptureSettings, - dict2: CaptureSettings | UserCaptureSettings) -> CaptureSettings: + def _apply_user_config(self, query: CaptureSettings, user_config: dict[str, Any]) -> CaptureSettings: + def recursive_merge(dict1: dict[str, Any], dict2: dict[str, Any]) -> dict[str, Any]: # dict2 overwrites dict1 for key, value in dict2.items(): - if key in dict1 and isinstance(dict1[key], dict) and isinstance(value, dict): # type: ignore[literal-required] + if key in dict1 and isinstance(dict1[key], dict) and isinstance(value, dict): # Recursively merge nested dictionaries - dict1[key] = recursive_merge(dict1[key], value) # type: ignore[literal-required,arg-type] + dict1[key] = recursive_merge(dict1[key], value) else: # Merge non-dictionary values - dict1[key] = value # type: ignore[literal-required] + dict1[key] = value return dict1 # merge - if user_config.pop('overwrite', None): + if user_config.get('overwrite'): # config from file takes priority - return recursive_merge(query, user_config) + return CaptureSettings(**recursive_merge(query.model_dump(), user_config)) else: - return recursive_merge(user_config, query) + return CaptureSettings(**recursive_merge(user_config, query.model_dump())) def enqueue_capture(self, query: CaptureSettings, source: str, user: str, authenticated: bool) -> str: '''Enqueue a query in the capture queue (used by the UI and the API for asynchronous processing)''' @@ -680,13 +636,20 @@ class Lookyloo(): usr_prio = self._priority['users'][user] if self._priority['users'].get(user) else self._priority['users']['_default_auth'] return src_prio + usr_prio - for key, value in query.items(): - if isinstance(value, bool): - query[key] = 1 if value else 0 # type: ignore[literal-required] - elif isinstance(value, (list, dict)): - query[key] = json.dumps(value) if value else None # type: ignore[literal-required] + # NOTE: Make sure we have a useragent + if not query.user_agent: + # Catch case where the UA is broken on the UI, and the async submission. + self.user_agents.user_agents # triggers an update of the default UAs + if not query.device_name and not query.user_agent: + query.user_agent = self.user_agents.default['useragent'] - query = self._prepare_lacus_query(query) + # merge DNT into headers + if query.dnt: + print('DNT - ######', query.dnt) + if query.headers is None: + query.headers = {} + query.headers['dnt'] = query.dnt + print('Header', query.headers) if authenticated: if user_config := load_user_config(user): query = self._apply_user_config(query, user_config) @@ -694,56 +657,45 @@ class Lookyloo(): priority = get_priority(source, user, authenticated) if priority < -100: # Someone is probably abusing the system with useless URLs, remove them from the index - query['listing'] = 0 + query.listing = False try: perma_uuid = self.lacus.enqueue( - url=query.get('url', None), - document_name=query.get('document_name', None), - document=query.get('document', None), - # depth=query.get('depth', 0), - browser=query.get('browser', None), - device_name=query.get('device_name', None), - user_agent=query.get('user_agent', None), - proxy=self.global_proxy if self.global_proxy else query.get('proxy', None), - general_timeout_in_sec=query.get('general_timeout_in_sec', None), - cookies=query.get('cookies', None), - headers=query.get('headers', None), - http_credentials=query.get('http_credentials', None), - viewport=query.get('viewport', None), - referer=query.get('referer', None), - timezone_id=query.get('timezone_id', None), - locale=query.get('locale', None), - geolocation=query.get('geolocation', None), - color_scheme=query.get('color_scheme', None), - rendered_hostname_only=query.get('rendered_hostname_only', True), - with_favicon=query.get('with_favicon', True), - allow_tracking=query.get('allow_tracking', True), - # force=query.get('force', False), - # recapture_interval=query.get('recapture_interval', 300), + url=query.url, + document_name=query.document_name, + document=query.document, + # depth=query.depth, + browser=query.browser, + device_name=query.device_name, + user_agent=query.user_agent, + proxy=self.global_proxy if self.global_proxy else query.proxy, + general_timeout_in_sec=query.general_timeout_in_sec, + cookies=query.cookies, + headers=query.headers, + http_credentials=query.http_credentials, + viewport=query.viewport, + referer=query.referer, + timezone_id=query.timezone_id, + locale=query.locale, + geolocation=query.geolocation, + color_scheme=query.color_scheme, + rendered_hostname_only=query.rendered_hostname_only, + with_favicon=query.with_favicon, + allow_tracking=query.allow_tracking, + # force=query.force, + # recapture_interval=query.recapture_interval, priority=priority ) except Exception as e: self.logger.critical(f'Unable to enqueue capture: {e}') perma_uuid = str(uuid4()) - query['not_queued'] = 1 + query.not_queued = True finally: if (not self.redis.hexists('lookup_dirs', perma_uuid) # already captured and self.redis.zscore('to_capture', perma_uuid) is None): # capture ongoing - # Make the settings redis compatible - mapping_capture: dict[str, bytes | float | int | str] = {} - for key, value in query.items(): - if isinstance(value, bool): - mapping_capture[key] = 1 if value else 0 - elif isinstance(value, (list, dict)): - if value: - mapping_capture[key] = json.dumps(value) - elif value is not None: - mapping_capture[key] = value # type: ignore[assignment] - p = self.redis.pipeline() p.zadd('to_capture', {perma_uuid: priority}) - p.hset(perma_uuid, mapping=mapping_capture) # type: ignore[arg-type] + p.hset(perma_uuid, mapping=query.redis_dump()) p.zincrby('queues', 1, f'{source}|{authenticated}|{user}') p.set(f'{perma_uuid}_mgmt', f'{source}|{authenticated}|{user}') p.execute() @@ -1478,7 +1430,8 @@ class Lookyloo(): elif filename.endswith('error.txt'): error = lookyloo_capture.read(filename).decode() elif filename.endswith('capture_settings.json'): - capture_settings = json.loads(lookyloo_capture.read(filename)) + _capture_settings = json.loads(lookyloo_capture.read(filename)) + capture_settings = CaptureSettings(**_capture_settings) else: for to_skip in files_to_skip: if filename.endswith(to_skip): @@ -1503,7 +1456,7 @@ class Lookyloo(): error=error, har=har, png=screenshot, html=html, last_redirected_url=last_redirected_url, cookies=cookies, - capture_settings=capture_settings, + capture_settings=capture_settings if capture_settings else None, potential_favicons=potential_favicons) return uuid, messages @@ -1585,7 +1538,7 @@ class Lookyloo(): if capture_settings: with (dirpath / 'capture_settings.json').open('w') as _cs: - json.dump(capture_settings, _cs) + _cs.write(capture_settings.model_dump_json(indent=2, exclude_none=True)) if potential_favicons: for f_id, favicon in enumerate(potential_favicons): diff --git a/website/web/__init__.py b/website/web/__init__.py index 2b08436..637ad5a 100644 --- a/website/web/__init__.py +++ b/website/web/__init__.py @@ -42,7 +42,7 @@ from lookyloo import Lookyloo, CaptureSettings from lookyloo.default import get_config from lookyloo.exceptions import MissingUUID, NoValidHarFile, LacusUnreachable from lookyloo.helpers import (get_taxonomies, UserAgents, load_cookies, - UserCaptureSettings, load_user_config) + load_user_config) if sys.version_info < (3, 9): from pytz import all_timezones_set @@ -1023,7 +1023,7 @@ def bulk_captures(base_tree_uuid: str) -> WerkzeugResponse | str | Response: cookies = load_cookies(lookyloo.get_cookies(base_tree_uuid)) bulk_captures = [] for url in [urls[int(selected_id) - 1] for selected_id in selected_urls]: - capture: CaptureSettings = { + capture: dict[str, Any] = { 'url': url, 'cookies': cookies, 'referer': cache.redirects[-1] if cache.redirects else cache.url, @@ -1031,7 +1031,7 @@ def bulk_captures(base_tree_uuid: str) -> WerkzeugResponse | str | Response: 'parent': base_tree_uuid, 'listing': False if cache and cache.no_index else True } - new_capture_uuid = lookyloo.enqueue_capture(capture, source='web', user=user, authenticated=flask_login.current_user.is_authenticated) + new_capture_uuid = lookyloo.enqueue_capture(CaptureSettings(**capture), source='web', user=user, authenticated=flask_login.current_user.is_authenticated) bulk_captures.append((new_capture_uuid, url)) return render_template('bulk_captures.html', uuid=base_tree_uuid, bulk_captures=bulk_captures) @@ -1196,7 +1196,7 @@ def tree(tree_uuid: str, node_uuid: str | None=None) -> Response | str | Werkzeu confirm_message=confirm_message if confirm_message else 'Tick to confirm.', parent_uuid=cache.parent, has_redirects=True if cache.redirects else False, - capture_settings=capture_settings) + capture_settings=capture_settings.model_dump(exclude_none=True) if capture_settings else {}) except NoValidHarFile: flash(f'Unable to build a tree for {tree_uuid}: {cache.error}.', 'warning') @@ -1445,8 +1445,8 @@ def search() -> str | Response | WerkzeugResponse: return render_template('search.html') -def _prepare_capture_template(user_ua: str | None, predefined_settings: CaptureSettings | None=None, *, - user_config: UserCaptureSettings | None=None) -> str: +def _prepare_capture_template(user_ua: str | None, predefined_settings: dict[str, Any] | None=None, *, + user_config: dict[str, Any] | None=None) -> str: return render_template('capture.html', user_agents=user_agents.user_agents, default=user_agents.default, personal_ua=user_ua, @@ -1463,9 +1463,9 @@ def _prepare_capture_template(user_ua: str | None, predefined_settings: CaptureS def recapture(tree_uuid: str) -> str | Response | WerkzeugResponse: cache = lookyloo.capture_cache(tree_uuid) if cache and hasattr(cache, 'capture_dir'): - capture_settings = lookyloo.get_capture_settings(tree_uuid) - return _prepare_capture_template(user_ua=request.headers.get('User-Agent'), - predefined_settings=capture_settings) + if capture_settings := lookyloo.get_capture_settings(tree_uuid): + return _prepare_capture_template(user_ua=request.headers.get('User-Agent'), + predefined_settings=capture_settings.model_dump(exclude_none=True)) flash(f'Unable to find the capture {tree_uuid} in the cache.', 'error') return _prepare_capture_template(user_ua=request.headers.get('User-Agent')) @@ -1524,7 +1524,7 @@ def submit_capture() -> str | Response | WerkzeugResponse: @app.route('/capture', methods=['GET', 'POST']) def capture_web() -> str | Response | WerkzeugResponse: - user_config: UserCaptureSettings | None = None + user_config: dict[str, Any] | None = None if flask_login.current_user.is_authenticated: user = flask_login.current_user.get_id() user_config = load_user_config(user) @@ -1536,7 +1536,7 @@ def capture_web() -> str | Response | WerkzeugResponse: flash('Invalid submission: please submit at least a URL or a document.', 'error') return _prepare_capture_template(user_ua=request.headers.get('User-Agent')) - capture_query: CaptureSettings = {} + capture_query: dict[str, Any] = {} # check if the post request has the file part if 'cookies' in request.files and request.files['cookies'].filename: capture_query['cookies'] = load_cookies(request.files['cookies'].stream.read()) @@ -1553,7 +1553,7 @@ def capture_web() -> str | Response | WerkzeugResponse: browser = request.form['browser'] if browser in ['chromium', 'firefox', 'webkit']: # Will be guessed otherwise. - capture_query['browser'] = browser # type: ignore[typeddict-item] + capture_query['browser'] = browser capture_query['listing'] = True if request.form.get('listing') else False capture_query['allow_tracking'] = True if request.form.get('allow_tracking') else False @@ -1608,7 +1608,7 @@ def capture_web() -> str | Response | WerkzeugResponse: if request.form.get('url'): capture_query['url'] = request.form['url'] - perma_uuid = lookyloo.enqueue_capture(capture_query, source='web', user=user, authenticated=flask_login.current_user.is_authenticated) + perma_uuid = lookyloo.enqueue_capture(CaptureSettings(**capture_query), source='web', user=user, authenticated=flask_login.current_user.is_authenticated) time.sleep(2) return redirect(url_for('tree', tree_uuid=perma_uuid)) elif request.form.get('urls'): @@ -1619,7 +1619,7 @@ def capture_web() -> str | Response | WerkzeugResponse: continue query = capture_query.copy() query['url'] = url - new_capture_uuid = lookyloo.enqueue_capture(query, source='web', user=user, authenticated=flask_login.current_user.is_authenticated) + new_capture_uuid = lookyloo.enqueue_capture(CaptureSettings(**query), source='web', user=user, authenticated=flask_login.current_user.is_authenticated) bulk_captures.append((new_capture_uuid, url)) return render_template('bulk_captures.html', bulk_captures=bulk_captures) @@ -1630,7 +1630,7 @@ def capture_web() -> str | Response | WerkzeugResponse: capture_query['document_name'] = request.files['document'].filename else: capture_query['document_name'] = 'unknown_name.bin' - perma_uuid = lookyloo.enqueue_capture(capture_query, source='web', user=user, authenticated=flask_login.current_user.is_authenticated) + perma_uuid = lookyloo.enqueue_capture(CaptureSettings(**capture_query), source='web', user=user, authenticated=flask_login.current_user.is_authenticated) time.sleep(2) return redirect(url_for('tree', tree_uuid=perma_uuid)) else: @@ -1638,7 +1638,7 @@ def capture_web() -> str | Response | WerkzeugResponse: elif request.method == 'GET' and request.args.get('url'): url = unquote_plus(request.args['url']).strip() capture_query = {'url': url} - perma_uuid = lookyloo.enqueue_capture(capture_query, source='web', user=user, authenticated=flask_login.current_user.is_authenticated) + perma_uuid = lookyloo.enqueue_capture(CaptureSettings(**capture_query), source='web', user=user, authenticated=flask_login.current_user.is_authenticated) return redirect(url_for('tree', tree_uuid=perma_uuid)) # render template @@ -1654,10 +1654,10 @@ def simple_capture() -> str | Response | WerkzeugResponse: if not (request.form.get('url') or request.form.get('urls')): flash('Invalid submission: please submit at least a URL.', 'error') return render_template('simple_capture.html') - capture_query: CaptureSettings = {} + capture_query: dict[str, Any] = {} if request.form.get('url'): capture_query['url'] = request.form['url'] - perma_uuid = lookyloo.enqueue_capture(capture_query, source='web', user=user, + perma_uuid = lookyloo.enqueue_capture(CaptureSettings(**capture_query), source='web', user=user, authenticated=flask_login.current_user.is_authenticated) time.sleep(2) if perma_uuid: @@ -1669,7 +1669,7 @@ def simple_capture() -> str | Response | WerkzeugResponse: continue query = capture_query.copy() query['url'] = url - new_capture_uuid = lookyloo.enqueue_capture(query, source='web', user=user, + new_capture_uuid = lookyloo.enqueue_capture(CaptureSettings(**query), source='web', user=user, authenticated=flask_login.current_user.is_authenticated) if new_capture_uuid: flash('Recording is in progress and is reported automatically.', 'success') diff --git a/website/web/genericapi.py b/website/web/genericapi.py index 504c3ac..95c6bbf 100644 --- a/website/web/genericapi.py +++ b/website/web/genericapi.py @@ -22,7 +22,7 @@ from pylacus import CaptureStatus as CaptureStatusPy from lookyloo import CaptureSettings, Lookyloo from lookyloo.comparator import Comparator from lookyloo.exceptions import MissingUUID, NoValidHarFile -from lookyloo.helpers import load_user_config, UserCaptureSettings +from lookyloo.helpers import load_user_config from .helpers import (build_users_table, load_user_from_request, src_request_ip, get_lookyloo_instance, get_indexing) @@ -56,7 +56,7 @@ def handle_no_HAR_file_exception(error: Any) -> tuple[dict[str, str], int]: class UserConfig(Resource): # type: ignore[misc] method_decorators = [api_auth_check] - def get(self) -> UserCaptureSettings | None | tuple[dict[str, str], int]: + def get(self) -> dict[str, Any] | None | tuple[dict[str, str], int]: if not flask_login.current_user.is_authenticated: return {'error': 'User not authenticated.'}, 401 return load_user_config(flask_login.current_user.get_id()) @@ -548,7 +548,7 @@ class SubmitCapture(Resource): # type: ignore[misc] if 'url' not in request.args or not request.args.get('url'): return {'error': 'No "url" in the URL params, nothting to capture.'}, 400 - to_query: CaptureSettings = { + to_query: dict[str, Any] = { 'url': request.args['url'], 'listing': False if 'listing' in request.args and request.args['listing'] in [0, '0'] else True, 'allow_tracking': False if 'allow_tracking' in request.args and request.args['allow_tracking'] in [0, '0'] else True @@ -566,7 +566,7 @@ class SubmitCapture(Resource): # type: ignore[misc] if request.args.get('proxy'): to_query['proxy'] = request.args['proxy'] - perma_uuid = lookyloo.enqueue_capture(to_query, source='api', user=user, authenticated=flask_login.current_user.is_authenticated) + perma_uuid = lookyloo.enqueue_capture(CaptureSettings(**to_query), source='api', user=user, authenticated=flask_login.current_user.is_authenticated) return perma_uuid @api.doc(body=submit_fields_post) # type: ignore[misc] @@ -576,8 +576,8 @@ class SubmitCapture(Resource): # type: ignore[misc] user = flask_login.current_user.get_id() else: user = src_request_ip(request) - to_query: CaptureSettings = request.get_json(force=True) - perma_uuid = lookyloo.enqueue_capture(to_query, source='api', user=user, authenticated=flask_login.current_user.is_authenticated) + to_query: dict[str, Any] = request.get_json(force=True) + perma_uuid = lookyloo.enqueue_capture(CaptureSettings(**to_query), source='api', user=user, authenticated=flask_login.current_user.is_authenticated) return perma_uuid