new: Store capture settings, use TypedDict whenever possible.

pull/699/head
Raphaël Vinot 2023-05-15 16:08:19 +02:00
parent 1603c99d5e
commit 582b5956e9
5 changed files with 86 additions and 48 deletions

View File

@ -7,12 +7,12 @@ import logging.config
import signal
from pathlib import Path
from typing import Dict, Optional, Set, Union
from typing import Optional, Set, Union
from lacuscore import LacusCore, CaptureStatus as CaptureStatusCore, CaptureResponse as CaptureResponseCore
from pylacus import PyLacus, CaptureStatus as CaptureStatusPy, CaptureResponse as CaptureResponsePy
from lookyloo.lookyloo import Lookyloo
from lookyloo.lookyloo import Lookyloo, CaptureSettings
from lookyloo.default import AbstractManager, get_config
from lookyloo.helpers import get_captures_dir
@ -73,14 +73,14 @@ class AsyncCapture(AbstractManager):
self.lookyloo.redis.sadd('ongoing', uuid)
queue: Optional[str] = self.lookyloo.redis.getdel(f'{uuid}_mgmt')
to_capture: Dict[str, str] = self.lookyloo.redis.hgetall(uuid)
to_capture: CaptureSettings = self.lookyloo.redis.hgetall(uuid)
if get_config('generic', 'default_public'):
# By default, the captures are on the index, unless the user mark them as un-listed
listing = False if ('listing' in to_capture and to_capture['listing'].lower() in ['false', '0', '']) else True
listing = False if ('listing' in to_capture and to_capture['listing'].lower() in ['false', '0', '']) else True # type: ignore
else:
# By default, the captures are not on the index, unless the user mark them as listed
listing = True if ('listing' in to_capture and to_capture['listing'].lower() in ['true', '1']) else False
listing = True if ('listing' in to_capture and to_capture['listing'].lower() in ['true', '1']) else False # type: ignore
self.lookyloo.store_capture(
uuid, listing,
@ -91,11 +91,15 @@ class AsyncCapture(AbstractManager):
error=entries.get('error'), har=entries.get('har'),
png=entries.get('png'), html=entries.get('html'),
last_redirected_url=entries.get('last_redirected_url'),
cookies=entries.get('cookies') # type: ignore
cookies=entries.get('cookies'),
capture_settings=to_capture
)
if ('auto_report' in to_capture):
settings = json.loads(to_capture['auto_report'])
if 'auto_report' in to_capture:
if isinstance(to_capture['auto_report'], str):
settings = json.loads(to_capture['auto_report'])
else:
settings = to_capture['auto_report']
if settings.get('email'):
self.lookyloo.send_mail(uuid, email=settings['email'],
comment=settings.get('comment'))

View File

@ -151,7 +151,7 @@ def load_known_content(directory: str='known_content') -> Dict[str, Dict[str, An
return to_return
def load_cookies(cookie_pseudofile: Optional[Union[BufferedIOBase, str, bytes]]=None) -> List[Dict[str, Union[str, bool]]]:
def load_cookies(cookie_pseudofile: Optional[Union[BufferedIOBase, str, bytes, List[Dict[str, Union[str, bool]]]]]=None) -> List[Dict[str, Union[str, bool]]]:
cookies: List[Dict[str, Union[str, bool]]]
if cookie_pseudofile:
if isinstance(cookie_pseudofile, (str, bytes)):
@ -160,13 +160,16 @@ def load_cookies(cookie_pseudofile: Optional[Union[BufferedIOBase, str, bytes]]=
except json.decoder.JSONDecodeError:
logger.warning(f'Unable to load json content: {cookie_pseudofile!r}')
return []
else:
elif isinstance(cookie_pseudofile, BufferedIOBase):
# Note: we might have an empty BytesIO, which is not False.
try:
cookies = json.load(cookie_pseudofile)
except json.decoder.JSONDecodeError:
logger.warning(f'Unable to load json content: {cookie_pseudofile}')
return []
else:
# Already a dict
cookies = cookie_pseudofile
else:
if not (get_homedir() / 'cookies.json').exists():
return []

View File

@ -12,8 +12,7 @@ from email.message import EmailMessage
from functools import cached_property
from io import BytesIO
from pathlib import Path
from typing import (Any, Dict, Iterable, List, MutableMapping, Optional, Set,
Tuple, Union)
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, TYPE_CHECKING
from urllib.parse import urlparse
from uuid import uuid4
from zipfile import ZipFile
@ -21,17 +20,18 @@ from zipfile import ZipFile
from defang import defang # type: ignore
from har2tree import CrawledTree, HostNode, URLNode
from lacuscore import (LacusCore,
CaptureStatus as CaptureStatusCore)
# CaptureResponse as CaptureResponseCore,
# CaptureResponseJson as CaptureResponseJsonCore,
# CaptureSettings as CaptureSettingsCore)
CaptureStatus as CaptureStatusCore,
# CaptureResponse as CaptureResponseCore)
# CaptureResponseJson as CaptureResponseJsonCore,
CaptureSettings as CaptureSettingsCore)
from PIL import Image, UnidentifiedImageError
from playwrightcapture import get_devices
from pylacus import (PyLacus,
CaptureStatus as CaptureStatusPy)
# CaptureResponse as CaptureResponsePy,
# CaptureResponseJson as CaptureResponseJsonPy,
# CaptureSettings as CaptureSettingsPy)
CaptureStatus as CaptureStatusPy
# CaptureResponse as CaptureResponsePy,
# CaptureResponseJson as CaptureResponseJsonPy,
# CaptureSettings as CaptureSettingsPy
)
from pymisp import MISPAttribute, MISPEvent, MISPObject
from pysecuritytxt import PySecurityTXT, SecurityTXTNotAvailable
from pylookyloomonitoring import PyLookylooMonitoring
@ -52,6 +52,20 @@ from .modules import (MISP, PhishingInitiative, UniversalWhois,
UrlScan, VirusTotal, Phishtank, Hashlookup,
RiskIQ, RiskIQError, Pandora, URLhaus)
if TYPE_CHECKING:
from playwright.async_api import Cookie
class CaptureSettings(CaptureSettingsCore, total=False):
'''The capture settings that can be passed to Lookyloo'''
listing: Optional[int]
not_queued: Optional[int]
auto_report: Optional[Union[str, Dict[str, str]]]
dnt: Optional[str]
browser_name: Optional[str]
os: Optional[str]
parent: Optional[str]
class Lookyloo():
@ -499,13 +513,20 @@ class Lookyloo():
self._captures_index.reload_cache(capture_uuid)
return self._captures_index[capture_uuid].tree
def _prepare_lacus_query(self, query: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
query = {k: v for k, v in query.items() if v is not None} # Remove the none, it makes redis unhappy
def _prepare_lacus_query(self, query: CaptureSettings) -> CaptureSettings:
# Remove the none, it makes redis unhappy
query = {k: v for k, v in query.items() if v is not None} # type: ignore
# NOTE: Lookyloo' capture can pass a do not track header independently from the default headers, merging it here
headers = query.pop('headers', '')
headers = query.pop('headers', {})
if 'dnt' in query:
headers += f'\nDNT: {query.pop("dnt")}'
headers = headers.strip()
if isinstance(headers, str):
headers += f'\nDNT: {query.pop("dnt")}'
headers = headers.strip()
elif isinstance(headers, dict):
dnt_entry = query.pop("dnt")
if dnt_entry:
headers['DNT'] = dnt_entry.strip()
if headers:
query['headers'] = headers
@ -521,7 +542,7 @@ class Lookyloo():
query['user_agent'] = user_agent if user_agent else self.user_agents.default['useragent']
# NOTE: the document must be base64 encoded
document = query.pop('document', None)
document: Optional[Union[str, bytes]] = query.pop('document', None)
if document:
if isinstance(document, bytes):
query['document'] = base64.b64encode(document).decode()
@ -529,7 +550,7 @@ class Lookyloo():
query['document'] = document
return query
def enqueue_capture(self, query: MutableMapping[str, Any], source: str, user: str, authenticated: bool) -> str:
def enqueue_capture(self, query: CaptureSettings, source: str, user: str, authenticated: bool) -> str:
'''Enqueue a query in the capture queue (used by the UI and the API for asynchronous processing)'''
def get_priority(source: str, user: str, authenticated: bool) -> int:
@ -547,14 +568,15 @@ class Lookyloo():
for key, value in query.items():
if isinstance(value, bool):
query[key] = 1 if value else 0
query[key] = 1 if value else 0 # type: ignore
elif isinstance(value, (list, dict)):
query[key] = json.dumps(value) if value else None
query[key] = json.dumps(value) if value else None # type: ignore
query = self._prepare_lacus_query(query)
query['priority'] = get_priority(source, user, authenticated)
if query['priority'] < -10:
priority = get_priority(source, user, authenticated)
query['priority'] = priority
if priority < -10:
# Someone is probably abusing the system with useless URLs, remove them from the index
query['listing'] = 0
try:
@ -595,7 +617,7 @@ class Lookyloo():
if value:
mapping_capture[key] = json.dumps(value)
elif value is not None:
mapping_capture[key] = value
mapping_capture[key] = value # type: ignore
p = self.redis.pipeline()
p.zadd('to_capture', {perma_uuid: query['priority']})
@ -1323,7 +1345,8 @@ class Lookyloo():
error: Optional[str]=None, har: Optional[Dict[str, Any]]=None,
png: Optional[bytes]=None, html: Optional[str]=None,
last_redirected_url: Optional[str]=None,
cookies: Optional[List[Dict[str, str]]]=None
cookies: Optional[Union[List['Cookie'], List[Dict[str, str]]]]=None,
capture_settings: Optional[CaptureSettings]=None
) -> None:
now = datetime.now()
@ -1383,4 +1406,9 @@ class Lookyloo():
if cookies:
with (dirpath / '0.cookies.json').open('w') as _cookies:
json.dump(cookies, _cookies)
if capture_settings:
with (dirpath / 'capture_settings.json').open('w') as _cs:
json.dump(capture_settings, _cs)
self.redis.hset('lookup_dirs', uuid, str(dirpath))

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
import base64
import calendar
import functools
import http
@ -32,7 +33,7 @@ from werkzeug.security import check_password_hash
from lookyloo.default import get_config
from lookyloo.exceptions import MissingUUID, NoValidHarFile
from lookyloo.helpers import get_taxonomies, UserAgents, load_cookies
from lookyloo.lookyloo import Indexing, Lookyloo
from lookyloo.lookyloo import Indexing, Lookyloo, CaptureSettings
from .genericapi import api as generic_api
from .helpers import (User, build_users_table, get_secret_key,
@ -615,13 +616,14 @@ def bulk_captures(base_tree_uuid: str):
cookies = load_cookies(lookyloo.get_cookies(base_tree_uuid))
bulk_captures = []
for url in [urls[int(selected_id) - 1] for selected_id in selected_urls]:
capture = {'url': url,
'cookies': cookies,
'referer': cache.redirects[-1] if cache.redirects else cache.url,
'user_agent': cache.user_agent,
'parent': base_tree_uuid,
'listing': False if cache and cache.no_index else True
}
capture: CaptureSettings = {
'url': url,
'cookies': cookies,
'referer': cache.redirects[-1] if cache.redirects else cache.url,
'user_agent': cache.user_agent,
'parent': base_tree_uuid,
'listing': False if cache and cache.no_index else True
}
new_capture_uuid = lookyloo.enqueue_capture(capture, source='web', user=user, authenticated=flask_login.current_user.is_authenticated)
bulk_captures.append((new_capture_uuid, url))
@ -1036,10 +1038,10 @@ def capture_web():
flash('Invalid submission: please submit at least a URL or a document.', 'error')
return _prepare_capture_template(user_ua=request.headers.get('User-Agent'))
capture_query: Dict[str, Union[str, bytes, int, bool]] = {}
capture_query: CaptureSettings = {}
# check if the post request has the file part
if 'cookies' in request.files and request.files['cookies'].filename:
capture_query['cookies'] = request.files['cookies'].stream.read()
capture_query['cookies'] = load_cookies(request.files['cookies'].stream.read())
if request.form.get('device_name'):
capture_query['device_name'] = request.form['device_name']
@ -1095,7 +1097,7 @@ def capture_web():
return render_template('bulk_captures.html', bulk_captures=bulk_captures)
elif 'document' in request.files:
# File upload
capture_query['document'] = request.files['document'].stream.read()
capture_query['document'] = base64.b64encode(request.files['document'].stream.read()).decode()
if request.files['document'].filename:
capture_query['document_name'] = request.files['document'].filename
else:

View File

@ -14,7 +14,7 @@ from lacuscore import CaptureStatus as CaptureStatusCore
from pylacus import CaptureStatus as CaptureStatusPy
from lookyloo.comparator import Comparator
from lookyloo.exceptions import MissingUUID
from lookyloo.lookyloo import Lookyloo
from lookyloo.lookyloo import Lookyloo, CaptureSettings
from .helpers import build_users_table, load_user_from_request, src_request_ip
@ -396,8 +396,9 @@ class SubmitCapture(Resource):
if 'url' not in request.args or not request.args.get('url'):
return 'No "url" in the URL params, nothting to capture.', 400
to_query = {'url': request.args['url'],
'listing': False if 'listing' in request.args and request.args['listing'] in [0, '0'] else True}
to_query: CaptureSettings = {
'url': request.args['url'],
'listing': False if 'listing' in request.args and request.args['listing'] in [0, '0'] else True}
if request.args.get('user_agent'):
to_query['user_agent'] = request.args['user_agent']
if request.args.get('browser_name'):
@ -421,7 +422,7 @@ class SubmitCapture(Resource):
user = flask_login.current_user.get_id()
else:
user = src_request_ip(request)
to_query: Dict = request.get_json(force=True)
to_query: CaptureSettings = request.get_json(force=True)
perma_uuid = lookyloo.enqueue_capture(to_query, source='api', user=user, authenticated=flask_login.current_user.is_authenticated)
return perma_uuid