Merge branch 'AntoniaBK-upload_capture'

pull/920/head
Raphaël Vinot 2024-06-11 20:09:18 +02:00
commit 2a6ed52fef
6 changed files with 177 additions and 59 deletions

View File

@ -15,7 +15,7 @@ from pylacus import PyLacus, CaptureStatus as CaptureStatusPy, CaptureResponse a
from lookyloo import Lookyloo, CaptureSettings
from lookyloo.exceptions import LacusUnreachable
from lookyloo.default import AbstractManager, get_config
from lookyloo.default import AbstractManager, get_config, LookylooException
from lookyloo.helpers import get_captures_dir
from lookyloo.modules import FOX
@ -69,7 +69,7 @@ class AsyncCapture(AbstractManager):
elif isinstance(self.lookyloo.lacus, PyLacus):
entries = self.lookyloo.lacus.get_capture(uuid)
else:
raise Exception('Something is broken.')
raise LookylooException(f'lacus must be LacusCore or PyLacus, not {type(self.lookyloo.lacus)}.')
log = f'Got the capture for {uuid} from Lacus'
if runtime := entries.get('runtime'):
log = f'{log} - Runtime: {runtime}'

View File

@ -254,6 +254,15 @@ class CapturesIndex(Mapping): # type: ignore[type-arg]
def lru_cache_clear(self) -> None:
load_pickle_tree.cache_clear()
def uuid_exists(self, uuid: str) -> bool:
if uuid in self.__cache:
return True
if self.redis.hexists('lookup_dirs', uuid):
return True
if self.redis.hexists('lookup_dirs_archived', uuid):
return True
return False
def _quick_init(self) -> None:
'''Initialize the cache with a list of UUIDs, with less back and forth with redis.
Only get recent captures.'''

View File

@ -322,7 +322,7 @@ def is_locked(locked_dir_path: Path, /) -> bool:
max_wait_content = 5
while max_wait_content > 0:
with lock_file.open('r') as f:
if content := f.read():
if content := f.read().strip():
break
# The file is empty, we're between the creation and setting the content
logger.info(f'Lock file empty ({lock_file}), waiting...')

View File

@ -1416,6 +1416,98 @@ class Lookyloo():
return statistics
def unpack_full_capture_archive(self, archive: BytesIO, listing: bool) -> tuple[str, dict[str, list[str]]]:
unrecoverable_error = False
messages: dict[str, list[str]] = {'errors': [], 'warnings': []}
os: str | None = None
browser: str | None = None
parent: str | None = None
downloaded_filename: str | None = None
downloaded_file: bytes | None = None
error: str | None = None
har: dict[str, Any] | None = None
screenshot: bytes | None = None
html: str | None = None
last_redirected_url: str | None = None
cookies: list[Cookie] | list[dict[str, str]] | None = None
capture_settings: CaptureSettings | None = None
potential_favicons: set[bytes] | None = None
files_to_skip = ['cnames.json', 'ipasn.json', 'ips.json']
with ZipFile(archive, 'r') as lookyloo_capture:
potential_favicons = set()
for filename in lookyloo_capture.namelist():
if filename.endswith('0.har.gz'):
# new formal
har = json.loads(gzip.decompress(lookyloo_capture.read(filename)))
elif filename.endswith('0.har'):
# old format
har = json.loads(lookyloo_capture.read(filename))
elif filename.endswith('0.html'):
html = lookyloo_capture.read(filename).decode()
elif filename.endswith('0.last_redirect.txt'):
last_redirected_url = lookyloo_capture.read(filename).decode()
elif filename.endswith('0.png'):
screenshot = lookyloo_capture.read(filename)
elif filename.endswith('0.cookies.json'):
# Not required
cookies = json.loads(lookyloo_capture.read(filename))
elif filename.endswith('potential_favicons.ico'):
# We may have more than one favicon
potential_favicons.add(lookyloo_capture.read(filename))
elif filename.endswith('uuid'):
uuid = lookyloo_capture.read(filename).decode()
if self._captures_index.uuid_exists(uuid):
messages['warnings'].append(f'UUID {uuid} already exists, set a new one.')
uuid = str(uuid4())
elif filename.endswith('meta'):
meta = json.loads(lookyloo_capture.read(filename))
if 'os' in meta:
os = meta['os']
if 'browser' in meta:
browser = meta['browser']
elif filename.endswith('no_index'):
# Force it to false regardless the form
listing = False
elif filename.endswith('parent'):
parent = lookyloo_capture.read(filename).decode()
elif filename.endswith('0.data.filename'):
downloaded_filename = lookyloo_capture.read(filename).decode()
elif filename.endswith('0.data'):
downloaded_file = lookyloo_capture.read(filename)
elif filename.endswith('error.txt'):
error = lookyloo_capture.read(filename).decode()
elif filename.endswith('capture_settings.json'):
capture_settings = json.loads(lookyloo_capture.read(filename))
else:
for to_skip in files_to_skip:
if filename.endswith(to_skip):
break
else:
messages['warnings'].append(f'Unexpected file in the capture archive: {filename}')
if not har or not html or not last_redirected_url or not screenshot:
# If we don't have these 4 files, the archive is incomplete and we should not store it.
unrecoverable_error = True
if not har:
messages['errors'].append('Invalid submission: missing HAR file')
if not html:
messages['errors'].append('Invalid submission: missing HTML file')
if not last_redirected_url:
messages['errors'].append('Invalid submission: missing landing page')
if not screenshot:
messages['errors'].append('Invalid submission: missing screenshot')
if not unrecoverable_error:
self.store_capture(uuid, is_public=listing,
os=os, browser=browser, parent=parent,
downloaded_filename=downloaded_filename, downloaded_file=downloaded_file,
error=error, har=har, png=screenshot, html=html,
last_redirected_url=last_redirected_url,
cookies=cookies,
capture_settings=capture_settings,
potential_favicons=potential_favicons)
return uuid, messages
def store_capture(self, uuid: str, is_public: bool,
os: str | None=None, browser: str | None=None,
parent: str | None=None,

View File

@ -5,7 +5,6 @@ from __future__ import annotations
import base64
import calendar
import functools
import gzip
import hashlib
import http
import json
@ -1457,12 +1456,12 @@ def submit_capture() -> str | Response | WerkzeugResponse:
if request.method == 'POST':
listing = True if request.form.get('listing') else False
uuid = str(uuid4()) # NOTE: new UUID, because we do not want duplicates
har: dict[str, Any] | None = None
html: str | None = None
last_redirected_url: str | None = None
screenshot: bytes | None = None
if 'har_file' in request.files and request.files['har_file']:
uuid = str(uuid4())
har = json.loads(request.files['har_file'].stream.read())
last_redirected_url = request.form.get('landing_page')
if 'screenshot_file' in request.files:
@ -1475,44 +1474,15 @@ def submit_capture() -> str | Response | WerkzeugResponse:
return redirect(url_for('tree', tree_uuid=uuid))
elif 'full_capture' in request.files and request.files['full_capture']:
# it *only* accepts a lookyloo export.
cookies: list[dict[str, str]] | None = None
has_error = False
with ZipFile(BytesIO(request.files['full_capture'].stream.read()), 'r') as lookyloo_capture:
potential_favicons = set()
for filename in lookyloo_capture.namelist():
if filename.endswith('0.har.gz'):
# new formal
har = json.loads(gzip.decompress(lookyloo_capture.read(filename)))
elif filename.endswith('0.har'):
# old format
har = json.loads(lookyloo_capture.read(filename))
elif filename.endswith('0.html'):
html = lookyloo_capture.read(filename).decode()
elif filename.endswith('0.last_redirect.txt'):
last_redirected_url = lookyloo_capture.read(filename).decode()
elif filename.endswith('0.png'):
screenshot = lookyloo_capture.read(filename)
elif filename.endswith('0.cookies.json'):
# Not required
cookies = json.loads(lookyloo_capture.read(filename))
elif filename.endswith('potential_favicons.ico'):
# We may have more than one favicon
potential_favicons.add(lookyloo_capture.read(filename))
if not har or not html or not last_redirected_url or not screenshot:
has_error = True
if not har:
flash('Invalid submission: missing HAR file', 'error')
if not html:
flash('Invalid submission: missing HTML file', 'error')
if not last_redirected_url:
flash('Invalid submission: missing landing page', 'error')
if not screenshot:
flash('Invalid submission: missing screenshot', 'error')
if not has_error:
lookyloo.store_capture(uuid, is_public=listing, har=har,
last_redirected_url=last_redirected_url,
png=screenshot, html=html, cookies=cookies,
potential_favicons=potential_favicons)
full_capture_file = BytesIO(request.files['full_capture'].stream.read())
uuid, messages = lookyloo.unpack_full_capture_archive(full_capture_file, listing)
if 'errors' in messages and messages['errors']:
for error in messages['errors']:
flash(error, 'error')
else:
if 'warnings' in messages:
for warning in messages['warnings']:
flash(warning, 'warning')
return redirect(url_for('tree', tree_uuid=uuid))
else:
flash('Invalid submission: please submit at least an HAR file.', 'error')

View File

@ -3,16 +3,18 @@
from __future__ import annotations
import base64
import gzip
import hashlib
import json
from io import BytesIO
from typing import Any
from uuid import uuid4
from zipfile import ZipFile
import flask_login # type: ignore[import-untyped]
from flask import request, send_file, Response
from flask_restx import Namespace, Resource, abort, fields # type: ignore[import-untyped]
from flask_restx import Namespace, Resource, fields # type: ignore[import-untyped]
from werkzeug.security import check_password_hash
from lacuscore import CaptureStatus as CaptureStatusCore
@ -32,7 +34,7 @@ comparator: Comparator = Comparator()
def api_auth_check(method): # type: ignore[no-untyped-def]
if flask_login.current_user.is_authenticated or load_user_from_request(request):
return method
abort(403, 'Authentication required.')
return 'Authentication required.', 403
token_request_fields = api.model('AuthTokenFields', {
@ -441,6 +443,55 @@ class CaptureReport(Resource): # type: ignore[misc]
return lookyloo.send_mail(capture_uuid, parameters.get('email', ''), parameters.get('comment'))
@api.route('/json/upload')
@api.doc(description='Submits a capture from another instance')
class UploadCapture(Resource): # type: ignore[misc]
def post(self) -> dict[str, str | dict[str, list[str]]] | tuple[dict[str, str], int]:
parameters: dict[str, Any] = request.get_json(force=True)
listing = True if parameters['listing'] else False
har: dict[str, Any] | None = None
html: str | None = None
last_redirected_url: str | None = None
screenshot: bytes | None = None
if 'har_file' in parameters and parameters.get('har_file'):
uuid = str(uuid4())
try:
har_decoded = base64.b64decode(parameters['har_file'])
try:
# new format
har_uncompressed = gzip.decompress(har_decoded)
except gzip.BadGzipFile:
# old format
har_uncompressed = har_decoded
har = json.loads(har_uncompressed)
last_redirected_url = parameters.get('landing_page')
if 'screenshot_file' in parameters:
screenshot = base64.b64decode(parameters['screenshot_file'])
if 'html_file' in parameters:
html = base64.b64decode(parameters['html_file']).decode()
lookyloo.store_capture(uuid, is_public=listing, har=har,
last_redirected_url=last_redirected_url,
png=screenshot, html=html)
except Exception as e:
return {'error': f'Unable to process the upload: {e}'}, 400
return {'uuid': uuid}
elif 'full_capture' in parameters and parameters.get('full_capture'):
try:
zipped_capture = base64.b64decode(parameters['full_capture'].encode())
except Exception:
return {'error': 'Invalid base64-encoding'}, 400
full_capture_file = BytesIO(zipped_capture)
uuid, messages = lookyloo.unpack_full_capture_archive(full_capture_file, listing=listing)
if 'errors' in messages and messages['errors']:
return {'error': ', '.join(messages['errors'])}, 400
return {'uuid': uuid, 'messages': messages}
else:
return {'error': 'Full capture or at least har-file is required'}, 400
auto_report_model = api.model('AutoReportModel', {
'email': fields.String(description="Email of the reporter, used by the analyst to get in touch.", example=''),
'comment': fields.String(description="Description of the URL, will be given to the analyst.", example='')
@ -475,14 +526,14 @@ class SubmitCapture(Resource): # type: ignore[misc]
@api.param('referer', 'Referer to pass to the capture') # type: ignore[misc]
@api.param('proxy', 'Proxy to use for the the capture') # type: ignore[misc]
@api.produces(['text/text']) # type: ignore[misc]
def get(self) -> str | tuple[str, int]:
def get(self) -> str | tuple[dict[str, str], int]:
if flask_login.current_user.is_authenticated:
user = flask_login.current_user.get_id()
else:
user = src_request_ip(request)
if 'url' not in request.args or not request.args.get('url'):
return 'No "url" in the URL params, nothting to capture.', 400
return {'error': 'No "url" in the URL params, nothting to capture.'}, 400
to_query: CaptureSettings = {
'url': request.args['url'],
@ -664,9 +715,8 @@ class RebuildAll(Resource): # type: ignore[misc]
try:
lookyloo.rebuild_all()
except Exception as e:
return {'error': f'Unable to rebuild all captures: {e}.'}, 400
else:
return {'info': 'Captures successfully rebuilt.'}
return {'error': f'Unable to rebuild all captures: {e}'}, 400
return {'info': 'Captures successfully rebuilt.'}
@api.route('/admin/rebuild_all_cache')
@ -679,9 +729,8 @@ class RebuildAllCache(Resource): # type: ignore[misc]
try:
lookyloo.rebuild_cache()
except Exception as e:
return {'error': f'Unable to rebuild all the caches: {e}.'}, 400
else:
return {'info': 'All caches successfully rebuilt.'}
return {'error': f'Unable to rebuild all the caches: {e}'}, 400
return {'info': 'All caches successfully rebuilt.'}
@api.route('/admin/<string:capture_uuid>/rebuild')
@ -696,9 +745,8 @@ class CaptureRebuildTree(Resource): # type: ignore[misc]
lookyloo.remove_pickle(capture_uuid)
lookyloo.get_crawled_tree(capture_uuid)
except Exception as e:
return {'error': f'Unable to rebuild tree: {e}.'}, 400
else:
return {'info': f'Tree {capture_uuid} successfully rebuilt.'}
return {'error': f'Unable to rebuild tree: {e}'}, 400
return {'info': f'Tree {capture_uuid} successfully rebuilt.'}
@api.route('/admin/<string:capture_uuid>/hide')
@ -712,6 +760,5 @@ class CaptureHide(Resource): # type: ignore[misc]
try:
lookyloo.hide_capture(capture_uuid)
except Exception as e:
return {'error': f'Unable to hide the tree: {e}.'}, 400
else:
return {'info': f'Capture {capture_uuid} successfully hidden.'}
return {'error': f'Unable to hide the tree: {e}'}, 400
return {'info': f'Capture {capture_uuid} successfully hidden.'}