new: Use Pydantic for CaptureSettings

pull/926/head
Raphaël Vinot 2024-07-19 16:12:24 +02:00
parent 40ad6d0031
commit 804bab6d2f
7 changed files with 164 additions and 266 deletions

View File

@ -3,7 +3,6 @@
from __future__ import annotations
import asyncio
import json
import logging
import logging.config
import signal
@ -78,19 +77,14 @@ class AsyncCapture(AbstractManager):
self.lookyloo.redis.sadd('ongoing', uuid)
queue: str | None = self.lookyloo.redis.getdel(f'{uuid}_mgmt')
to_capture: CaptureSettings = self.lookyloo.get_capture_settings(uuid)
if get_config('generic', 'default_public'):
# By default, the captures are on the index, unless the user mark them as un-listed
listing = False if ('listing' in to_capture and to_capture['listing'] == 0) else True
else:
# By default, the captures are not on the index, unless the user mark them as listed
listing = True if ('listing' in to_capture and to_capture['listing'] == 1) else False
to_capture: CaptureSettings | None = self.lookyloo.get_capture_settings(uuid)
if not to_capture:
continue
self.lookyloo.store_capture(
uuid, listing,
os=to_capture.get('os'), browser=to_capture.get('browser'),
parent=to_capture.get('parent'),
uuid, to_capture.listing,
os=to_capture.os, browser=to_capture.browser,
parent=to_capture.parent,
downloaded_filename=entries.get('downloaded_filename'),
downloaded_file=entries.get('downloaded_file'),
error=entries.get('error'), har=entries.get('har'),
@ -101,18 +95,11 @@ class AsyncCapture(AbstractManager):
potential_favicons=entries.get('potential_favicons')
)
if 'auto_report' in to_capture:
if to_capture.auto_report:
send_report = True
settings = {}
if isinstance(to_capture['auto_report'], str):
if to_capture['auto_report'].isdigit():
# auto_report was a bool in the submission, it can be 1 or 0. 0 means no.
if to_capture['auto_report'] == '0':
send_report = False
else:
settings = json.loads(to_capture['auto_report'])
elif isinstance(to_capture['auto_report'], dict):
settings = to_capture['auto_report']
if isinstance(to_capture.auto_report, dict):
settings = to_capture.auto_report
if send_report:
self.lookyloo.send_mail(uuid, email=settings.get('email', ''),

View File

@ -14,7 +14,7 @@ from lacuscore import CaptureStatus as CaptureStatusCore
from lookyloo import Lookyloo
from lookyloo.exceptions import LacusUnreachable
from lookyloo.default import AbstractManager, get_config, get_homedir, safe_create_dir
from lookyloo.helpers import ParsedUserAgent, serialize_to_json, CaptureSettings
from lookyloo.helpers import ParsedUserAgent, serialize_to_json
from pylacus import CaptureStatus as CaptureStatusPy
logging.config.dictConfig(get_config('logging'))
@ -109,41 +109,41 @@ class Processing(AbstractManager):
continue
self.logger.info(f'Found a non-queued capture ({uuid}), retrying now.')
# This capture couldn't be queued and we created the uuid locally
query: CaptureSettings = self.lookyloo.get_capture_settings(uuid)
try:
new_uuid = self.lookyloo.lacus.enqueue(
url=query.get('url', None),
document_name=query.get('document_name', None),
document=query.get('document', None),
# depth=query.get('depth', 0),
browser=query.get('browser', None),
device_name=query.get('device_name', None),
user_agent=query.get('user_agent', None),
proxy=query.get('proxy', None),
general_timeout_in_sec=query.get('general_timeout_in_sec', None),
cookies=query.get('cookies', None),
headers=query.get('headers', None),
http_credentials=query.get('http_credentials', None),
viewport=query.get('viewport', None),
referer=query.get('referer', None),
rendered_hostname_only=query.get('rendered_hostname_only', True),
# force=query.get('force', False),
# recapture_interval=query.get('recapture_interval', 300),
priority=query.get('priority', 0),
uuid=uuid
)
if new_uuid != uuid:
# somehow, between the check and queuing, the UUID isn't UNKNOWN anymore, just checking that
self.logger.warning(f'Had to change the capture UUID (duplicate). Old: {uuid} / New: {new_uuid}')
except LacusUnreachable:
self.logger.warning('Lacus still unreachable.')
break
except Exception as e:
self.logger.warning(f'Still unable to enqueue capture: {e}')
break
else:
self.lookyloo.redis.hdel(uuid, 'not_queued')
self.logger.info(f'{uuid} enqueued.')
if query := self.lookyloo.get_capture_settings(uuid):
try:
new_uuid = self.lookyloo.lacus.enqueue(
url=query.url,
document_name=query.document_name,
document=query.document,
# depth=query.depth,
browser=query.browser,
device_name=query.device_name,
user_agent=query.user_agent,
proxy=query.proxy,
general_timeout_in_sec=query.general_timeout_in_sec,
cookies=query.cookies,
headers=query.headers,
http_credentials=query.http_credentials,
viewport=query.viewport,
referer=query.referer,
rendered_hostname_only=query.rendered_hostname_only,
# force=query.force,
# recapture_interval=query.recapture_interval,
priority=query.priority,
uuid=uuid
)
if new_uuid != uuid:
# somehow, between the check and queuing, the UUID isn't UNKNOWN anymore, just checking that
self.logger.warning(f'Had to change the capture UUID (duplicate). Old: {uuid} / New: {new_uuid}')
except LacusUnreachable:
self.logger.warning('Lacus still unreachable.')
break
except Exception as e:
self.logger.warning(f'Still unable to enqueue capture: {e}')
break
else:
self.lookyloo.redis.hdel(uuid, 'not_queued')
self.logger.info(f'{uuid} enqueued.')
def main() -> None:

View File

@ -431,8 +431,13 @@ class CapturesIndex(Mapping): # type: ignore[type-arg]
capture_settings_file = capture_dir / 'capture_settings.json'
if capture_settings_file.exists():
with capture_settings_file.open() as f:
capture_settings = json.loads(f.read())
_s = f.read()
try:
capture_settings = json.loads(_s)
capture_settings.get('url')
except AttributeError:
# That's if we have broken dumps that are twice json encoded
capture_settings = json.load(capture_settings)
if capture_settings.get('url') and capture_settings['url'] is not None:
cache['url'] = capture_settings['url']

View File

@ -15,6 +15,8 @@ from functools import lru_cache
from importlib.metadata import version
from io import BufferedIOBase
from pathlib import Path
from pydantic import field_validator
from pydantic_core import from_json
from typing import Any
from urllib.parse import urlparse
@ -83,68 +85,6 @@ def get_email_template() -> str:
return f.read()
def cast_capture_settings(capture_settings: dict[str, str]) -> CaptureSettings:
to_return: CaptureSettings = {}
# NOTE: Replace the if / else below with a case / match as soon as we require python 3.10+
for setting_key, setting_value in capture_settings.items():
if setting_key == 'listing':
to_return['listing'] = bool(int(setting_value))
elif setting_key == 'not_queued':
to_return['not_queued'] = bool(int(setting_value))
elif setting_key == 'auto_report':
if isinstance(setting_value, str) and setting_value:
if setting_value.startswith('{'):
to_return['auto_report'] = json.loads(setting_value)
elif setting_value.isdigit():
to_return['auto_report'] = bool(int(setting_value))
else:
to_return['auto_report'] = setting_value
elif setting_key == 'proxy' and setting_value:
if setting_value.startswith('{'):
to_return['proxy'] = json.loads(setting_value)
else:
to_return['proxy'] = setting_value
elif setting_key in ('dnt', 'browser_name', 'os', 'parent'):
to_return[setting_key] = setting_value # type: ignore[literal-required]
# Lacus core keys
elif setting_key == 'general_timeout_in_sec':
to_return['general_timeout_in_sec'] = int(setting_value)
elif setting_key == 'cookies':
to_return['cookies'] = load_cookies(setting_value)
elif setting_key == 'headers':
to_return['headers'] = json.loads(setting_value)
elif setting_key == 'http_credentials':
to_return['http_credentials'] = json.loads(setting_value)
elif setting_key == 'geolocation':
to_return['geolocation'] = json.loads(setting_value)
elif setting_key == 'viewport':
to_return['viewport'] = json.loads(setting_value)
elif setting_key == 'with_favicon':
to_return['with_favicon'] = bool(int(setting_value))
elif setting_key == 'allow_tracking':
to_return['allow_tracking'] = bool(int(setting_value))
elif setting_key == 'force':
to_return['force'] = bool(int(setting_value))
elif setting_key == 'recapture_interval':
to_return['recapture_interval'] = int(setting_value)
elif setting_key == 'priority':
to_return['priority'] = int(setting_value)
elif setting_key == 'depth':
to_return['depth'] = int(setting_value)
elif setting_key == 'rendered_hostname_only':
to_return['rendered_hostname_only'] = bool(int(setting_value))
elif setting_key in ('url', 'document_name', 'document', 'browser', 'device_name',
'user_agent', 'timezone_id', 'locale', 'color_scheme', 'referer',
'uuid') and setting_value:
# Value is a non-empty string, keep it as-is
to_return[setting_key] = setting_value # type: ignore[literal-required]
else:
# NOTE: we may have to add more settings here, will be fixed with pydantic soon.
# raise InvalidCaptureSetting(f'Unknown setting: {setting_key} with value: {setting_value}')
print(f'Unknown setting: {setting_key} with value: {setting_value}')
return to_return
@lru_cache
def load_takedown_filters() -> tuple[re.Pattern[str], re.Pattern[str], dict[str, list[str]]]:
filter_ini_file = get_homedir() / 'config' / 'takedown_filters.ini'
@ -458,26 +398,39 @@ class ParsedUserAgent(UserAgent):
return f'OS: {self.platform} - Browser: {self.browser} {self.version} - UA: {self.string}'
class CaptureSettings(LacuscoreCaptureSettings, total=False):
class CaptureSettings(LacuscoreCaptureSettings):
'''The capture settings that can be passed to Lookyloo'''
listing: bool | int | None
not_queued: bool | int | None
auto_report: bool | str | dict[str, str] | None # {'email': , 'comment': , 'recipient_mail':}
dnt: str | None
browser_name: str | None
os: str | None
parent: str | None
listing: bool = get_config('generic', 'default_public')
not_queued: bool = False
auto_report: bool | dict[str, str] | None = None # {'email': , 'comment': , 'recipient_mail':}
dnt: str | None = None
browser_name: str | None = None
os: str | None = None
parent: str | None = None
@field_validator('auto_report', mode='before')
@classmethod
def load_auto_report_json(cls, v: Any) -> bool | dict[str, str] | None:
if isinstance(v, str):
if v.isdigit():
return bool(v)
elif v.startswith('{'):
return from_json(v)
elif isinstance(v, dict):
return v
return v
# overwrite set to True means the settings in the config file overwrite the settings
# provided by the user. False will simply append the settings from the config file if they
# don't exist.
class UserCaptureSettings(CaptureSettings, total=False):
overwrite: bool
@field_validator('cookies', mode='before')
@classmethod
def load_cookies(cls, v: Any) -> list[dict[str, Any]] | None:
# NOTE: Lookyloo can get the cookies in somewhat weird formats, mornalizing them
if v:
return load_cookies(v)
return None
@lru_cache(64)
def load_user_config(username: str) -> UserCaptureSettings | None:
def load_user_config(username: str) -> dict[str, Any] | None:
user_config_path = get_homedir() / 'config' / 'users' / f'{username}.json'
if not user_config_path.exists():
return None

View File

@ -57,10 +57,9 @@ from .exceptions import (MissingCaptureDirectory,
MissingUUID, TreeNeedsRebuild, NoValidHarFile, LacusUnreachable)
from .helpers import (get_captures_dir, get_email_template,
get_resources_hashes, get_taxonomies,
uniq_domains, ParsedUserAgent, load_cookies, UserAgents,
uniq_domains, ParsedUserAgent, UserAgents,
get_useragent_for_requests, load_takedown_filters,
CaptureSettings, UserCaptureSettings, load_user_config,
cast_capture_settings
CaptureSettings, load_user_config
)
from .modules import (MISPs, PhishingInitiative, UniversalWhois,
UrlScan, VirusTotal, Phishtank, Hashlookup,
@ -287,17 +286,17 @@ class Lookyloo():
json.dump(meta, f)
return meta
def get_capture_settings(self, capture_uuid: str, /) -> CaptureSettings:
def get_capture_settings(self, capture_uuid: str, /) -> CaptureSettings | None:
if capture_settings := self.redis.hgetall(capture_uuid):
return cast_capture_settings(capture_settings)
return CaptureSettings(**capture_settings)
cache = self.capture_cache(capture_uuid)
if not cache:
return {}
return None
cs_file = cache.capture_dir / 'capture_settings.json'
if cs_file.exists():
with cs_file.open('r') as f:
return cast_capture_settings(json.load(f))
return {}
return CaptureSettings(**json.load(f))
return None
def categories_capture(self, capture_uuid: str, /) -> dict[str, Any]:
'''Get all the categories related to a capture, in MISP Taxonomies format'''
@ -602,67 +601,24 @@ class Lookyloo():
self._captures_index.reload_cache(capture_uuid)
return self._captures_index[capture_uuid].tree
def _prepare_lacus_query(self, query: CaptureSettings) -> CaptureSettings:
# Remove the none, it makes redis unhappy
query = {k: v for k, v in query.items() if v is not None} # type: ignore[assignment]
if 'url' in query and query['url'] is not None:
# Make sure the URL does not have any space or newline
query['url'] = query['url'].strip()
# NOTE: Lookyloo' capture can pass a do not track header independently from the default headers, merging it here
headers = query.pop('headers', {})
if 'dnt' in query:
if isinstance(headers, str):
headers += f'\nDNT: {query.pop("dnt")}'
headers = headers.strip()
elif isinstance(headers, dict):
dnt_entry = query.pop("dnt")
if dnt_entry:
headers['DNT'] = dnt_entry.strip()
if headers:
query['headers'] = headers
# NOTE: Lookyloo can get the cookies in somewhat weird formats, mornalizing them
query['cookies'] = load_cookies(query.pop('cookies', None))
# NOTE: Make sure we have a useragent
user_agent = query.pop('user_agent', None)
if not user_agent:
# Catch case where the UA is broken on the UI, and the async submission.
self.user_agents.user_agents # triggers an update of the default UAs
if 'device_name' not in query:
query['user_agent'] = user_agent if user_agent else self.user_agents.default['useragent']
# NOTE: the document must be base64 encoded
document: str | bytes | None = query.pop('document', None)
if document:
if isinstance(document, bytes):
query['document'] = base64.b64encode(document).decode()
else:
query['document'] = document
return query
def _apply_user_config(self, query: CaptureSettings, user_config: UserCaptureSettings) -> CaptureSettings:
def recursive_merge(dict1: CaptureSettings | UserCaptureSettings,
dict2: CaptureSettings | UserCaptureSettings) -> CaptureSettings:
def _apply_user_config(self, query: CaptureSettings, user_config: dict[str, Any]) -> CaptureSettings:
def recursive_merge(dict1: dict[str, Any], dict2: dict[str, Any]) -> dict[str, Any]:
# dict2 overwrites dict1
for key, value in dict2.items():
if key in dict1 and isinstance(dict1[key], dict) and isinstance(value, dict): # type: ignore[literal-required]
if key in dict1 and isinstance(dict1[key], dict) and isinstance(value, dict):
# Recursively merge nested dictionaries
dict1[key] = recursive_merge(dict1[key], value) # type: ignore[literal-required,arg-type]
dict1[key] = recursive_merge(dict1[key], value)
else:
# Merge non-dictionary values
dict1[key] = value # type: ignore[literal-required]
dict1[key] = value
return dict1
# merge
if user_config.pop('overwrite', None):
if user_config.get('overwrite'):
# config from file takes priority
return recursive_merge(query, user_config)
return CaptureSettings(**recursive_merge(query.model_dump(), user_config))
else:
return recursive_merge(user_config, query)
return CaptureSettings(**recursive_merge(user_config, query.model_dump()))
def enqueue_capture(self, query: CaptureSettings, source: str, user: str, authenticated: bool) -> str:
'''Enqueue a query in the capture queue (used by the UI and the API for asynchronous processing)'''
@ -680,13 +636,20 @@ class Lookyloo():
usr_prio = self._priority['users'][user] if self._priority['users'].get(user) else self._priority['users']['_default_auth']
return src_prio + usr_prio
for key, value in query.items():
if isinstance(value, bool):
query[key] = 1 if value else 0 # type: ignore[literal-required]
elif isinstance(value, (list, dict)):
query[key] = json.dumps(value) if value else None # type: ignore[literal-required]
# NOTE: Make sure we have a useragent
if not query.user_agent:
# Catch case where the UA is broken on the UI, and the async submission.
self.user_agents.user_agents # triggers an update of the default UAs
if not query.device_name and not query.user_agent:
query.user_agent = self.user_agents.default['useragent']
query = self._prepare_lacus_query(query)
# merge DNT into headers
if query.dnt:
print('DNT - ######', query.dnt)
if query.headers is None:
query.headers = {}
query.headers['dnt'] = query.dnt
print('Header', query.headers)
if authenticated:
if user_config := load_user_config(user):
query = self._apply_user_config(query, user_config)
@ -694,56 +657,45 @@ class Lookyloo():
priority = get_priority(source, user, authenticated)
if priority < -100:
# Someone is probably abusing the system with useless URLs, remove them from the index
query['listing'] = 0
query.listing = False
try:
perma_uuid = self.lacus.enqueue(
url=query.get('url', None),
document_name=query.get('document_name', None),
document=query.get('document', None),
# depth=query.get('depth', 0),
browser=query.get('browser', None),
device_name=query.get('device_name', None),
user_agent=query.get('user_agent', None),
proxy=self.global_proxy if self.global_proxy else query.get('proxy', None),
general_timeout_in_sec=query.get('general_timeout_in_sec', None),
cookies=query.get('cookies', None),
headers=query.get('headers', None),
http_credentials=query.get('http_credentials', None),
viewport=query.get('viewport', None),
referer=query.get('referer', None),
timezone_id=query.get('timezone_id', None),
locale=query.get('locale', None),
geolocation=query.get('geolocation', None),
color_scheme=query.get('color_scheme', None),
rendered_hostname_only=query.get('rendered_hostname_only', True),
with_favicon=query.get('with_favicon', True),
allow_tracking=query.get('allow_tracking', True),
# force=query.get('force', False),
# recapture_interval=query.get('recapture_interval', 300),
url=query.url,
document_name=query.document_name,
document=query.document,
# depth=query.depth,
browser=query.browser,
device_name=query.device_name,
user_agent=query.user_agent,
proxy=self.global_proxy if self.global_proxy else query.proxy,
general_timeout_in_sec=query.general_timeout_in_sec,
cookies=query.cookies,
headers=query.headers,
http_credentials=query.http_credentials,
viewport=query.viewport,
referer=query.referer,
timezone_id=query.timezone_id,
locale=query.locale,
geolocation=query.geolocation,
color_scheme=query.color_scheme,
rendered_hostname_only=query.rendered_hostname_only,
with_favicon=query.with_favicon,
allow_tracking=query.allow_tracking,
# force=query.force,
# recapture_interval=query.recapture_interval,
priority=priority
)
except Exception as e:
self.logger.critical(f'Unable to enqueue capture: {e}')
perma_uuid = str(uuid4())
query['not_queued'] = 1
query.not_queued = True
finally:
if (not self.redis.hexists('lookup_dirs', perma_uuid) # already captured
and self.redis.zscore('to_capture', perma_uuid) is None): # capture ongoing
# Make the settings redis compatible
mapping_capture: dict[str, bytes | float | int | str] = {}
for key, value in query.items():
if isinstance(value, bool):
mapping_capture[key] = 1 if value else 0
elif isinstance(value, (list, dict)):
if value:
mapping_capture[key] = json.dumps(value)
elif value is not None:
mapping_capture[key] = value # type: ignore[assignment]
p = self.redis.pipeline()
p.zadd('to_capture', {perma_uuid: priority})
p.hset(perma_uuid, mapping=mapping_capture) # type: ignore[arg-type]
p.hset(perma_uuid, mapping=query.redis_dump())
p.zincrby('queues', 1, f'{source}|{authenticated}|{user}')
p.set(f'{perma_uuid}_mgmt', f'{source}|{authenticated}|{user}')
p.execute()
@ -1478,7 +1430,8 @@ class Lookyloo():
elif filename.endswith('error.txt'):
error = lookyloo_capture.read(filename).decode()
elif filename.endswith('capture_settings.json'):
capture_settings = json.loads(lookyloo_capture.read(filename))
_capture_settings = json.loads(lookyloo_capture.read(filename))
capture_settings = CaptureSettings(**_capture_settings)
else:
for to_skip in files_to_skip:
if filename.endswith(to_skip):
@ -1503,7 +1456,7 @@ class Lookyloo():
error=error, har=har, png=screenshot, html=html,
last_redirected_url=last_redirected_url,
cookies=cookies,
capture_settings=capture_settings,
capture_settings=capture_settings if capture_settings else None,
potential_favicons=potential_favicons)
return uuid, messages
@ -1585,7 +1538,7 @@ class Lookyloo():
if capture_settings:
with (dirpath / 'capture_settings.json').open('w') as _cs:
json.dump(capture_settings, _cs)
_cs.write(capture_settings.model_dump_json(indent=2, exclude_none=True))
if potential_favicons:
for f_id, favicon in enumerate(potential_favicons):

View File

@ -42,7 +42,7 @@ from lookyloo import Lookyloo, CaptureSettings
from lookyloo.default import get_config
from lookyloo.exceptions import MissingUUID, NoValidHarFile, LacusUnreachable
from lookyloo.helpers import (get_taxonomies, UserAgents, load_cookies,
UserCaptureSettings, load_user_config)
load_user_config)
if sys.version_info < (3, 9):
from pytz import all_timezones_set
@ -1023,7 +1023,7 @@ def bulk_captures(base_tree_uuid: str) -> WerkzeugResponse | str | Response:
cookies = load_cookies(lookyloo.get_cookies(base_tree_uuid))
bulk_captures = []
for url in [urls[int(selected_id) - 1] for selected_id in selected_urls]:
capture: CaptureSettings = {
capture: dict[str, Any] = {
'url': url,
'cookies': cookies,
'referer': cache.redirects[-1] if cache.redirects else cache.url,
@ -1031,7 +1031,7 @@ def bulk_captures(base_tree_uuid: str) -> WerkzeugResponse | str | Response:
'parent': base_tree_uuid,
'listing': False if cache and cache.no_index else True
}
new_capture_uuid = lookyloo.enqueue_capture(capture, source='web', user=user, authenticated=flask_login.current_user.is_authenticated)
new_capture_uuid = lookyloo.enqueue_capture(CaptureSettings(**capture), source='web', user=user, authenticated=flask_login.current_user.is_authenticated)
bulk_captures.append((new_capture_uuid, url))
return render_template('bulk_captures.html', uuid=base_tree_uuid, bulk_captures=bulk_captures)
@ -1196,7 +1196,7 @@ def tree(tree_uuid: str, node_uuid: str | None=None) -> Response | str | Werkzeu
confirm_message=confirm_message if confirm_message else 'Tick to confirm.',
parent_uuid=cache.parent,
has_redirects=True if cache.redirects else False,
capture_settings=capture_settings)
capture_settings=capture_settings.model_dump(exclude_none=True) if capture_settings else {})
except NoValidHarFile:
flash(f'Unable to build a tree for {tree_uuid}: {cache.error}.', 'warning')
@ -1445,8 +1445,8 @@ def search() -> str | Response | WerkzeugResponse:
return render_template('search.html')
def _prepare_capture_template(user_ua: str | None, predefined_settings: CaptureSettings | None=None, *,
user_config: UserCaptureSettings | None=None) -> str:
def _prepare_capture_template(user_ua: str | None, predefined_settings: dict[str, Any] | None=None, *,
user_config: dict[str, Any] | None=None) -> str:
return render_template('capture.html', user_agents=user_agents.user_agents,
default=user_agents.default,
personal_ua=user_ua,
@ -1463,9 +1463,9 @@ def _prepare_capture_template(user_ua: str | None, predefined_settings: CaptureS
def recapture(tree_uuid: str) -> str | Response | WerkzeugResponse:
cache = lookyloo.capture_cache(tree_uuid)
if cache and hasattr(cache, 'capture_dir'):
capture_settings = lookyloo.get_capture_settings(tree_uuid)
return _prepare_capture_template(user_ua=request.headers.get('User-Agent'),
predefined_settings=capture_settings)
if capture_settings := lookyloo.get_capture_settings(tree_uuid):
return _prepare_capture_template(user_ua=request.headers.get('User-Agent'),
predefined_settings=capture_settings.model_dump(exclude_none=True))
flash(f'Unable to find the capture {tree_uuid} in the cache.', 'error')
return _prepare_capture_template(user_ua=request.headers.get('User-Agent'))
@ -1524,7 +1524,7 @@ def submit_capture() -> str | Response | WerkzeugResponse:
@app.route('/capture', methods=['GET', 'POST'])
def capture_web() -> str | Response | WerkzeugResponse:
user_config: UserCaptureSettings | None = None
user_config: dict[str, Any] | None = None
if flask_login.current_user.is_authenticated:
user = flask_login.current_user.get_id()
user_config = load_user_config(user)
@ -1536,7 +1536,7 @@ def capture_web() -> str | Response | WerkzeugResponse:
flash('Invalid submission: please submit at least a URL or a document.', 'error')
return _prepare_capture_template(user_ua=request.headers.get('User-Agent'))
capture_query: CaptureSettings = {}
capture_query: dict[str, Any] = {}
# check if the post request has the file part
if 'cookies' in request.files and request.files['cookies'].filename:
capture_query['cookies'] = load_cookies(request.files['cookies'].stream.read())
@ -1553,7 +1553,7 @@ def capture_web() -> str | Response | WerkzeugResponse:
browser = request.form['browser']
if browser in ['chromium', 'firefox', 'webkit']:
# Will be guessed otherwise.
capture_query['browser'] = browser # type: ignore[typeddict-item]
capture_query['browser'] = browser
capture_query['listing'] = True if request.form.get('listing') else False
capture_query['allow_tracking'] = True if request.form.get('allow_tracking') else False
@ -1608,7 +1608,7 @@ def capture_web() -> str | Response | WerkzeugResponse:
if request.form.get('url'):
capture_query['url'] = request.form['url']
perma_uuid = lookyloo.enqueue_capture(capture_query, source='web', user=user, authenticated=flask_login.current_user.is_authenticated)
perma_uuid = lookyloo.enqueue_capture(CaptureSettings(**capture_query), source='web', user=user, authenticated=flask_login.current_user.is_authenticated)
time.sleep(2)
return redirect(url_for('tree', tree_uuid=perma_uuid))
elif request.form.get('urls'):
@ -1619,7 +1619,7 @@ def capture_web() -> str | Response | WerkzeugResponse:
continue
query = capture_query.copy()
query['url'] = url
new_capture_uuid = lookyloo.enqueue_capture(query, source='web', user=user, authenticated=flask_login.current_user.is_authenticated)
new_capture_uuid = lookyloo.enqueue_capture(CaptureSettings(**query), source='web', user=user, authenticated=flask_login.current_user.is_authenticated)
bulk_captures.append((new_capture_uuid, url))
return render_template('bulk_captures.html', bulk_captures=bulk_captures)
@ -1630,7 +1630,7 @@ def capture_web() -> str | Response | WerkzeugResponse:
capture_query['document_name'] = request.files['document'].filename
else:
capture_query['document_name'] = 'unknown_name.bin'
perma_uuid = lookyloo.enqueue_capture(capture_query, source='web', user=user, authenticated=flask_login.current_user.is_authenticated)
perma_uuid = lookyloo.enqueue_capture(CaptureSettings(**capture_query), source='web', user=user, authenticated=flask_login.current_user.is_authenticated)
time.sleep(2)
return redirect(url_for('tree', tree_uuid=perma_uuid))
else:
@ -1638,7 +1638,7 @@ def capture_web() -> str | Response | WerkzeugResponse:
elif request.method == 'GET' and request.args.get('url'):
url = unquote_plus(request.args['url']).strip()
capture_query = {'url': url}
perma_uuid = lookyloo.enqueue_capture(capture_query, source='web', user=user, authenticated=flask_login.current_user.is_authenticated)
perma_uuid = lookyloo.enqueue_capture(CaptureSettings(**capture_query), source='web', user=user, authenticated=flask_login.current_user.is_authenticated)
return redirect(url_for('tree', tree_uuid=perma_uuid))
# render template
@ -1654,10 +1654,10 @@ def simple_capture() -> str | Response | WerkzeugResponse:
if not (request.form.get('url') or request.form.get('urls')):
flash('Invalid submission: please submit at least a URL.', 'error')
return render_template('simple_capture.html')
capture_query: CaptureSettings = {}
capture_query: dict[str, Any] = {}
if request.form.get('url'):
capture_query['url'] = request.form['url']
perma_uuid = lookyloo.enqueue_capture(capture_query, source='web', user=user,
perma_uuid = lookyloo.enqueue_capture(CaptureSettings(**capture_query), source='web', user=user,
authenticated=flask_login.current_user.is_authenticated)
time.sleep(2)
if perma_uuid:
@ -1669,7 +1669,7 @@ def simple_capture() -> str | Response | WerkzeugResponse:
continue
query = capture_query.copy()
query['url'] = url
new_capture_uuid = lookyloo.enqueue_capture(query, source='web', user=user,
new_capture_uuid = lookyloo.enqueue_capture(CaptureSettings(**query), source='web', user=user,
authenticated=flask_login.current_user.is_authenticated)
if new_capture_uuid:
flash('Recording is in progress and is reported automatically.', 'success')

View File

@ -22,7 +22,7 @@ from pylacus import CaptureStatus as CaptureStatusPy
from lookyloo import CaptureSettings, Lookyloo
from lookyloo.comparator import Comparator
from lookyloo.exceptions import MissingUUID, NoValidHarFile
from lookyloo.helpers import load_user_config, UserCaptureSettings
from lookyloo.helpers import load_user_config
from .helpers import (build_users_table, load_user_from_request, src_request_ip,
get_lookyloo_instance, get_indexing)
@ -56,7 +56,7 @@ def handle_no_HAR_file_exception(error: Any) -> tuple[dict[str, str], int]:
class UserConfig(Resource): # type: ignore[misc]
method_decorators = [api_auth_check]
def get(self) -> UserCaptureSettings | None | tuple[dict[str, str], int]:
def get(self) -> dict[str, Any] | None | tuple[dict[str, str], int]:
if not flask_login.current_user.is_authenticated:
return {'error': 'User not authenticated.'}, 401
return load_user_config(flask_login.current_user.get_id())
@ -548,7 +548,7 @@ class SubmitCapture(Resource): # type: ignore[misc]
if 'url' not in request.args or not request.args.get('url'):
return {'error': 'No "url" in the URL params, nothting to capture.'}, 400
to_query: CaptureSettings = {
to_query: dict[str, Any] = {
'url': request.args['url'],
'listing': False if 'listing' in request.args and request.args['listing'] in [0, '0'] else True,
'allow_tracking': False if 'allow_tracking' in request.args and request.args['allow_tracking'] in [0, '0'] else True
@ -566,7 +566,7 @@ class SubmitCapture(Resource): # type: ignore[misc]
if request.args.get('proxy'):
to_query['proxy'] = request.args['proxy']
perma_uuid = lookyloo.enqueue_capture(to_query, source='api', user=user, authenticated=flask_login.current_user.is_authenticated)
perma_uuid = lookyloo.enqueue_capture(CaptureSettings(**to_query), source='api', user=user, authenticated=flask_login.current_user.is_authenticated)
return perma_uuid
@api.doc(body=submit_fields_post) # type: ignore[misc]
@ -576,8 +576,8 @@ class SubmitCapture(Resource): # type: ignore[misc]
user = flask_login.current_user.get_id()
else:
user = src_request_ip(request)
to_query: CaptureSettings = request.get_json(force=True)
perma_uuid = lookyloo.enqueue_capture(to_query, source='api', user=user, authenticated=flask_login.current_user.is_authenticated)
to_query: dict[str, Any] = request.get_json(force=True)
perma_uuid = lookyloo.enqueue_capture(CaptureSettings(**to_query), source='api', user=user, authenticated=flask_login.current_user.is_authenticated)
return perma_uuid