diff --git a/bin/async_capture.py b/bin/async_capture.py index 8cfb92c..e9e1cf9 100755 --- a/bin/async_capture.py +++ b/bin/async_capture.py @@ -7,12 +7,12 @@ import logging.config import signal from pathlib import Path -from typing import Dict, Optional, Set, Union +from typing import Optional, Set, Union from lacuscore import LacusCore, CaptureStatus as CaptureStatusCore, CaptureResponse as CaptureResponseCore from pylacus import PyLacus, CaptureStatus as CaptureStatusPy, CaptureResponse as CaptureResponsePy -from lookyloo.lookyloo import Lookyloo +from lookyloo.lookyloo import Lookyloo, CaptureSettings from lookyloo.default import AbstractManager, get_config from lookyloo.helpers import get_captures_dir @@ -73,14 +73,14 @@ class AsyncCapture(AbstractManager): self.lookyloo.redis.sadd('ongoing', uuid) queue: Optional[str] = self.lookyloo.redis.getdel(f'{uuid}_mgmt') - to_capture: Dict[str, str] = self.lookyloo.redis.hgetall(uuid) + to_capture: CaptureSettings = self.lookyloo.redis.hgetall(uuid) if get_config('generic', 'default_public'): # By default, the captures are on the index, unless the user mark them as un-listed - listing = False if ('listing' in to_capture and to_capture['listing'].lower() in ['false', '0', '']) else True + listing = False if ('listing' in to_capture and to_capture['listing'].lower() in ['false', '0', '']) else True # type: ignore else: # By default, the captures are not on the index, unless the user mark them as listed - listing = True if ('listing' in to_capture and to_capture['listing'].lower() in ['true', '1']) else False + listing = True if ('listing' in to_capture and to_capture['listing'].lower() in ['true', '1']) else False # type: ignore self.lookyloo.store_capture( uuid, listing, @@ -91,11 +91,15 @@ class AsyncCapture(AbstractManager): error=entries.get('error'), har=entries.get('har'), png=entries.get('png'), html=entries.get('html'), last_redirected_url=entries.get('last_redirected_url'), - cookies=entries.get('cookies') # type: ignore + cookies=entries.get('cookies'), + capture_settings=to_capture ) - if ('auto_report' in to_capture): - settings = json.loads(to_capture['auto_report']) + if 'auto_report' in to_capture: + if isinstance(to_capture['auto_report'], str): + settings = json.loads(to_capture['auto_report']) + else: + settings = to_capture['auto_report'] if settings.get('email'): self.lookyloo.send_mail(uuid, email=settings['email'], comment=settings.get('comment')) diff --git a/lookyloo/helpers.py b/lookyloo/helpers.py index 3fbb55c..6e9b802 100644 --- a/lookyloo/helpers.py +++ b/lookyloo/helpers.py @@ -151,7 +151,7 @@ def load_known_content(directory: str='known_content') -> Dict[str, Dict[str, An return to_return -def load_cookies(cookie_pseudofile: Optional[Union[BufferedIOBase, str, bytes]]=None) -> List[Dict[str, Union[str, bool]]]: +def load_cookies(cookie_pseudofile: Optional[Union[BufferedIOBase, str, bytes, List[Dict[str, Union[str, bool]]]]]=None) -> List[Dict[str, Union[str, bool]]]: cookies: List[Dict[str, Union[str, bool]]] if cookie_pseudofile: if isinstance(cookie_pseudofile, (str, bytes)): @@ -160,13 +160,16 @@ def load_cookies(cookie_pseudofile: Optional[Union[BufferedIOBase, str, bytes]]= except json.decoder.JSONDecodeError: logger.warning(f'Unable to load json content: {cookie_pseudofile!r}') return [] - else: + elif isinstance(cookie_pseudofile, BufferedIOBase): # Note: we might have an empty BytesIO, which is not False. try: cookies = json.load(cookie_pseudofile) except json.decoder.JSONDecodeError: logger.warning(f'Unable to load json content: {cookie_pseudofile}') return [] + else: + # Already a dict + cookies = cookie_pseudofile else: if not (get_homedir() / 'cookies.json').exists(): return [] diff --git a/lookyloo/lookyloo.py b/lookyloo/lookyloo.py index 3b20727..2fc7460 100644 --- a/lookyloo/lookyloo.py +++ b/lookyloo/lookyloo.py @@ -12,8 +12,7 @@ from email.message import EmailMessage from functools import cached_property from io import BytesIO from pathlib import Path -from typing import (Any, Dict, Iterable, List, MutableMapping, Optional, Set, - Tuple, Union) +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, TYPE_CHECKING from urllib.parse import urlparse from uuid import uuid4 from zipfile import ZipFile @@ -21,17 +20,18 @@ from zipfile import ZipFile from defang import defang # type: ignore from har2tree import CrawledTree, HostNode, URLNode from lacuscore import (LacusCore, - CaptureStatus as CaptureStatusCore) -# CaptureResponse as CaptureResponseCore, -# CaptureResponseJson as CaptureResponseJsonCore, -# CaptureSettings as CaptureSettingsCore) + CaptureStatus as CaptureStatusCore, + # CaptureResponse as CaptureResponseCore) + # CaptureResponseJson as CaptureResponseJsonCore, + CaptureSettings as CaptureSettingsCore) from PIL import Image, UnidentifiedImageError from playwrightcapture import get_devices from pylacus import (PyLacus, - CaptureStatus as CaptureStatusPy) -# CaptureResponse as CaptureResponsePy, -# CaptureResponseJson as CaptureResponseJsonPy, -# CaptureSettings as CaptureSettingsPy) + CaptureStatus as CaptureStatusPy + # CaptureResponse as CaptureResponsePy, + # CaptureResponseJson as CaptureResponseJsonPy, + # CaptureSettings as CaptureSettingsPy + ) from pymisp import MISPAttribute, MISPEvent, MISPObject from pysecuritytxt import PySecurityTXT, SecurityTXTNotAvailable from pylookyloomonitoring import PyLookylooMonitoring @@ -52,6 +52,20 @@ from .modules import (MISP, PhishingInitiative, UniversalWhois, UrlScan, VirusTotal, Phishtank, Hashlookup, RiskIQ, RiskIQError, Pandora, URLhaus) +if TYPE_CHECKING: + from playwright.async_api import Cookie + + +class CaptureSettings(CaptureSettingsCore, total=False): + '''The capture settings that can be passed to Lookyloo''' + listing: Optional[int] + not_queued: Optional[int] + auto_report: Optional[Union[str, Dict[str, str]]] + dnt: Optional[str] + browser_name: Optional[str] + os: Optional[str] + parent: Optional[str] + class Lookyloo(): @@ -499,13 +513,20 @@ class Lookyloo(): self._captures_index.reload_cache(capture_uuid) return self._captures_index[capture_uuid].tree - def _prepare_lacus_query(self, query: MutableMapping[str, Any]) -> MutableMapping[str, Any]: - query = {k: v for k, v in query.items() if v is not None} # Remove the none, it makes redis unhappy + def _prepare_lacus_query(self, query: CaptureSettings) -> CaptureSettings: + # Remove the none, it makes redis unhappy + query = {k: v for k, v in query.items() if v is not None} # type: ignore # NOTE: Lookyloo' capture can pass a do not track header independently from the default headers, merging it here - headers = query.pop('headers', '') + headers = query.pop('headers', {}) if 'dnt' in query: - headers += f'\nDNT: {query.pop("dnt")}' - headers = headers.strip() + if isinstance(headers, str): + headers += f'\nDNT: {query.pop("dnt")}' + headers = headers.strip() + elif isinstance(headers, dict): + dnt_entry = query.pop("dnt") + if dnt_entry: + headers['DNT'] = dnt_entry.strip() + if headers: query['headers'] = headers @@ -521,7 +542,7 @@ class Lookyloo(): query['user_agent'] = user_agent if user_agent else self.user_agents.default['useragent'] # NOTE: the document must be base64 encoded - document = query.pop('document', None) + document: Optional[Union[str, bytes]] = query.pop('document', None) if document: if isinstance(document, bytes): query['document'] = base64.b64encode(document).decode() @@ -529,7 +550,7 @@ class Lookyloo(): query['document'] = document return query - def enqueue_capture(self, query: MutableMapping[str, Any], source: str, user: str, authenticated: bool) -> str: + def enqueue_capture(self, query: CaptureSettings, source: str, user: str, authenticated: bool) -> str: '''Enqueue a query in the capture queue (used by the UI and the API for asynchronous processing)''' def get_priority(source: str, user: str, authenticated: bool) -> int: @@ -547,14 +568,15 @@ class Lookyloo(): for key, value in query.items(): if isinstance(value, bool): - query[key] = 1 if value else 0 + query[key] = 1 if value else 0 # type: ignore elif isinstance(value, (list, dict)): - query[key] = json.dumps(value) if value else None + query[key] = json.dumps(value) if value else None # type: ignore query = self._prepare_lacus_query(query) - query['priority'] = get_priority(source, user, authenticated) - if query['priority'] < -10: + priority = get_priority(source, user, authenticated) + query['priority'] = priority + if priority < -10: # Someone is probably abusing the system with useless URLs, remove them from the index query['listing'] = 0 try: @@ -595,7 +617,7 @@ class Lookyloo(): if value: mapping_capture[key] = json.dumps(value) elif value is not None: - mapping_capture[key] = value + mapping_capture[key] = value # type: ignore p = self.redis.pipeline() p.zadd('to_capture', {perma_uuid: query['priority']}) @@ -1323,7 +1345,8 @@ class Lookyloo(): error: Optional[str]=None, har: Optional[Dict[str, Any]]=None, png: Optional[bytes]=None, html: Optional[str]=None, last_redirected_url: Optional[str]=None, - cookies: Optional[List[Dict[str, str]]]=None + cookies: Optional[Union[List['Cookie'], List[Dict[str, str]]]]=None, + capture_settings: Optional[CaptureSettings]=None ) -> None: now = datetime.now() @@ -1383,4 +1406,9 @@ class Lookyloo(): if cookies: with (dirpath / '0.cookies.json').open('w') as _cookies: json.dump(cookies, _cookies) + + if capture_settings: + with (dirpath / 'capture_settings.json').open('w') as _cs: + json.dump(capture_settings, _cs) + self.redis.hset('lookup_dirs', uuid, str(dirpath)) diff --git a/website/web/__init__.py b/website/web/__init__.py index bdc5984..b55f756 100644 --- a/website/web/__init__.py +++ b/website/web/__init__.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 +import base64 import calendar import functools import http @@ -32,7 +33,7 @@ from werkzeug.security import check_password_hash from lookyloo.default import get_config from lookyloo.exceptions import MissingUUID, NoValidHarFile from lookyloo.helpers import get_taxonomies, UserAgents, load_cookies -from lookyloo.lookyloo import Indexing, Lookyloo +from lookyloo.lookyloo import Indexing, Lookyloo, CaptureSettings from .genericapi import api as generic_api from .helpers import (User, build_users_table, get_secret_key, @@ -615,13 +616,14 @@ def bulk_captures(base_tree_uuid: str): cookies = load_cookies(lookyloo.get_cookies(base_tree_uuid)) bulk_captures = [] for url in [urls[int(selected_id) - 1] for selected_id in selected_urls]: - capture = {'url': url, - 'cookies': cookies, - 'referer': cache.redirects[-1] if cache.redirects else cache.url, - 'user_agent': cache.user_agent, - 'parent': base_tree_uuid, - 'listing': False if cache and cache.no_index else True - } + capture: CaptureSettings = { + 'url': url, + 'cookies': cookies, + 'referer': cache.redirects[-1] if cache.redirects else cache.url, + 'user_agent': cache.user_agent, + 'parent': base_tree_uuid, + 'listing': False if cache and cache.no_index else True + } new_capture_uuid = lookyloo.enqueue_capture(capture, source='web', user=user, authenticated=flask_login.current_user.is_authenticated) bulk_captures.append((new_capture_uuid, url)) @@ -1036,10 +1038,10 @@ def capture_web(): flash('Invalid submission: please submit at least a URL or a document.', 'error') return _prepare_capture_template(user_ua=request.headers.get('User-Agent')) - capture_query: Dict[str, Union[str, bytes, int, bool]] = {} + capture_query: CaptureSettings = {} # check if the post request has the file part if 'cookies' in request.files and request.files['cookies'].filename: - capture_query['cookies'] = request.files['cookies'].stream.read() + capture_query['cookies'] = load_cookies(request.files['cookies'].stream.read()) if request.form.get('device_name'): capture_query['device_name'] = request.form['device_name'] @@ -1095,7 +1097,7 @@ def capture_web(): return render_template('bulk_captures.html', bulk_captures=bulk_captures) elif 'document' in request.files: # File upload - capture_query['document'] = request.files['document'].stream.read() + capture_query['document'] = base64.b64encode(request.files['document'].stream.read()).decode() if request.files['document'].filename: capture_query['document_name'] = request.files['document'].filename else: diff --git a/website/web/genericapi.py b/website/web/genericapi.py index bbdfd6b..8819108 100644 --- a/website/web/genericapi.py +++ b/website/web/genericapi.py @@ -14,7 +14,7 @@ from lacuscore import CaptureStatus as CaptureStatusCore from pylacus import CaptureStatus as CaptureStatusPy from lookyloo.comparator import Comparator from lookyloo.exceptions import MissingUUID -from lookyloo.lookyloo import Lookyloo +from lookyloo.lookyloo import Lookyloo, CaptureSettings from .helpers import build_users_table, load_user_from_request, src_request_ip @@ -396,8 +396,9 @@ class SubmitCapture(Resource): if 'url' not in request.args or not request.args.get('url'): return 'No "url" in the URL params, nothting to capture.', 400 - to_query = {'url': request.args['url'], - 'listing': False if 'listing' in request.args and request.args['listing'] in [0, '0'] else True} + to_query: CaptureSettings = { + 'url': request.args['url'], + 'listing': False if 'listing' in request.args and request.args['listing'] in [0, '0'] else True} if request.args.get('user_agent'): to_query['user_agent'] = request.args['user_agent'] if request.args.get('browser_name'): @@ -421,7 +422,7 @@ class SubmitCapture(Resource): user = flask_login.current_user.get_id() else: user = src_request_ip(request) - to_query: Dict = request.get_json(force=True) + to_query: CaptureSettings = request.get_json(force=True) perma_uuid = lookyloo.enqueue_capture(to_query, source='api', user=user, authenticated=flask_login.current_user.is_authenticated) return perma_uuid