mirror of https://github.com/CIRCL/lookyloo
chg: a user config can overwrite all the settings for a capture
parent
19f78a3a4d
commit
658fdaeaea
|
@ -116,7 +116,8 @@ class AsyncCapture(AbstractManager):
|
|||
|
||||
if send_report:
|
||||
self.lookyloo.send_mail(uuid, email=settings.get('email', ''),
|
||||
comment=settings.get('comment'), recipient_mail= settings.get("recipient_mail"))
|
||||
comment=settings.get('comment'),
|
||||
recipient_mail=settings.get("recipient_mail"))
|
||||
|
||||
lazy_cleanup = self.lookyloo.redis.pipeline()
|
||||
if queue and self.lookyloo.redis.zscore('queues', queue):
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
{
|
||||
"overwrite": true,
|
||||
"listing": false,
|
||||
"auto_report": {
|
||||
"recipient_mail": "analyst@test.de"
|
||||
}
|
||||
}
|
|
@ -1,3 +0,0 @@
|
|||
{
|
||||
"email" : "analyst@test.de"
|
||||
}
|
|
@ -29,6 +29,7 @@ from werkzeug.utils import cached_property
|
|||
|
||||
from .default import get_homedir, safe_create_dir, get_config, LookylooException
|
||||
|
||||
|
||||
logger = logging.getLogger('Lookyloo - Helpers')
|
||||
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ import time
|
|||
|
||||
from collections import defaultdict
|
||||
from datetime import date, datetime, timedelta, timezone
|
||||
from functools import lru_cache
|
||||
from email.message import EmailMessage
|
||||
from functools import cached_property
|
||||
from io import BytesIO
|
||||
|
@ -78,6 +79,22 @@ class CaptureSettings(CaptureSettingsCore, total=False):
|
|||
parent: str | None
|
||||
|
||||
|
||||
# overwrite set to True means the settings in the config file overwrite the settings
|
||||
# provided by the user. False will simply append the settings from the config file if they
|
||||
# don't exist.
|
||||
class UserCaptureSettings(CaptureSettings, total=False):
|
||||
overwrite: bool
|
||||
|
||||
|
||||
@lru_cache(64)
|
||||
def load_user_config(username: str) -> UserCaptureSettings | None:
|
||||
user_config_path = get_homedir() / 'config' / 'users' / f'{username}.json'
|
||||
if not user_config_path.exists():
|
||||
return None
|
||||
with user_config_path.open() as _c:
|
||||
return json.load(_c)
|
||||
|
||||
|
||||
class Lookyloo():
|
||||
|
||||
def __init__(self, cache_max_size: int | None=None) -> None:
|
||||
|
@ -629,6 +646,26 @@ class Lookyloo():
|
|||
query['document'] = document
|
||||
return query
|
||||
|
||||
def _apply_user_config(self, query: CaptureSettings, user_config: UserCaptureSettings) -> CaptureSettings:
|
||||
def recursive_merge(dict1: CaptureSettings | UserCaptureSettings,
|
||||
dict2: CaptureSettings | UserCaptureSettings) -> CaptureSettings:
|
||||
# dict2 overwrites dict1
|
||||
for key, value in dict2.items():
|
||||
if key in dict1 and isinstance(dict1[key], dict) and isinstance(value, dict): # type: ignore[literal-required]
|
||||
# Recursively merge nested dictionaries
|
||||
dict1[key] = recursive_merge(dict1[key], value) # type: ignore[literal-required,arg-type]
|
||||
else:
|
||||
# Merge non-dictionary values
|
||||
dict1[key] = value # type: ignore[literal-required]
|
||||
return dict1
|
||||
|
||||
# merge
|
||||
if user_config.pop('overwrite', None):
|
||||
# config from file takes priority
|
||||
return recursive_merge(query, user_config)
|
||||
else:
|
||||
return recursive_merge(user_config, query)
|
||||
|
||||
def enqueue_capture(self, query: CaptureSettings, source: str, user: str, authenticated: bool) -> str:
|
||||
'''Enqueue a query in the capture queue (used by the UI and the API for asynchronous processing)'''
|
||||
|
||||
|
@ -652,6 +689,9 @@ class Lookyloo():
|
|||
query[key] = json.dumps(value) if value else None # type: ignore[literal-required]
|
||||
|
||||
query = self._prepare_lacus_query(query)
|
||||
if authenticated:
|
||||
if user_config := load_user_config(user):
|
||||
query = self._apply_user_config(query, user_config)
|
||||
|
||||
priority = get_priority(source, user, authenticated)
|
||||
if priority < -100:
|
||||
|
@ -864,7 +904,8 @@ class Lookyloo():
|
|||
|
||||
return f"Malicious capture according to {len(modules)} module(s): {', '.join(modules)}"
|
||||
|
||||
def send_mail(self, capture_uuid: str, /, email: str='', comment: str | None=None, recipient_mail: str | None = None) -> bool | dict[str, Any]:
|
||||
def send_mail(self, capture_uuid: str, /, email: str='', comment: str | None=None,
|
||||
recipient_mail: str | None = None) -> bool | dict[str, Any]:
|
||||
'''Send an email notification regarding a specific capture'''
|
||||
if not get_config('generic', 'enable_mail_notification'):
|
||||
return {"error": "Unable to send mail: mail notification disabled"}
|
||||
|
|
|
@ -12,7 +12,6 @@ import json
|
|||
import logging
|
||||
import logging.config
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
|
||||
|
@ -42,7 +41,7 @@ from werkzeug.wrappers.response import Response as WerkzeugResponse
|
|||
|
||||
from lookyloo import Lookyloo, CaptureSettings, Indexing
|
||||
from lookyloo.capturecache import CaptureCache
|
||||
from lookyloo.default import get_config, get_homedir
|
||||
from lookyloo.default import get_config
|
||||
from lookyloo.exceptions import MissingUUID, NoValidHarFile
|
||||
from lookyloo.helpers import get_taxonomies, UserAgents, load_cookies
|
||||
|
||||
|
@ -53,7 +52,7 @@ else:
|
|||
all_timezones_set = available_timezones()
|
||||
|
||||
from .genericapi import api as generic_api
|
||||
from .helpers import (User, is_valid_username, build_users_table, get_secret_key,
|
||||
from .helpers import (User, build_users_table, get_secret_key,
|
||||
load_user_from_request, src_request_ip, sri_load,
|
||||
get_lookyloo_instance)
|
||||
from .proxied import ReverseProxied
|
||||
|
@ -107,9 +106,6 @@ def login() -> WerkzeugResponse | str | Response:
|
|||
'''
|
||||
|
||||
username = request.form['username']
|
||||
if not is_valid_username(username):
|
||||
flash('User is not permitted.', 'error')
|
||||
return redirect(url_for('login'))
|
||||
users_table = build_users_table()
|
||||
if username in users_table and check_password_hash(users_table[username]['password'], request.form['password']):
|
||||
user = User()
|
||||
|
@ -1592,7 +1588,7 @@ def capture_web() -> str | Response | WerkzeugResponse:
|
|||
if request.form.get('proxy'):
|
||||
parsed_proxy = urlparse(request.form['proxy'])
|
||||
if parsed_proxy.scheme and parsed_proxy.hostname and parsed_proxy.port:
|
||||
if parsed_proxy.scheme in ['http', 'https', 'socks5']:
|
||||
if parsed_proxy.scheme in ['http', 'https', 'socks5', 'socks5h']:
|
||||
if (parsed_proxy.username and parsed_proxy.password) or (not parsed_proxy.username and not parsed_proxy.password):
|
||||
capture_query['proxy'] = request.form['proxy']
|
||||
else:
|
||||
|
@ -1640,28 +1636,16 @@ def capture_web() -> str | Response | WerkzeugResponse:
|
|||
# render template
|
||||
return _prepare_capture_template(user_ua=request.headers.get('User-Agent'))
|
||||
|
||||
@app.route('/simple_capture', methods=['GET','POST'])
|
||||
|
||||
@app.route('/simple_capture', methods=['GET', 'POST'])
|
||||
@flask_login.login_required # type: ignore[misc]
|
||||
def simple_capture() -> str | Response | WerkzeugResponse:
|
||||
user = flask_login.current_user.get_id()
|
||||
if not is_valid_username(user):
|
||||
# Username has been manipulated
|
||||
flash('User is not permitted.', 'error')
|
||||
return redirect(url_for('submit_capture'))
|
||||
|
||||
if request.method == 'POST':
|
||||
if not (request.form.get('url') or request.form.get('urls')):
|
||||
flash('Invalid submission: please submit at least a URL.', 'error')
|
||||
return render_template('simple_capture.html')
|
||||
capture_query: CaptureSettings = {}
|
||||
capture_query['listing'] = False
|
||||
if request.form.get('auto_report'):
|
||||
path = get_homedir() /'config'/ 'users' / (user + ".json")
|
||||
if os.path.isfile(path):
|
||||
email = get_config(user, 'email')
|
||||
capture_query['auto_report'] = {"recipient_mail": email}
|
||||
else:
|
||||
capture_query['auto_report'] = True
|
||||
if request.form.get('url'):
|
||||
capture_query['url'] = request.form['url']
|
||||
perma_uuid = lookyloo.enqueue_capture(capture_query, source='web', user=user,
|
||||
|
|
|
@ -50,8 +50,10 @@ def load_user_from_request(request: Request) -> User | None:
|
|||
return user
|
||||
return None
|
||||
|
||||
|
||||
def is_valid_username(username: str) -> bool:
|
||||
return bool(re.match("^[A-Za-z0-9]+$", username))
|
||||
return bool(re.match("^[A-Za-z0-9]+$", username))
|
||||
|
||||
|
||||
@lru_cache(64)
|
||||
def build_keys_table() -> dict[str, str]:
|
||||
|
@ -75,6 +77,9 @@ def get_users() -> dict[str, str | list[str]]:
|
|||
def build_users_table() -> dict[str, dict[str, str]]:
|
||||
users_table: dict[str, dict[str, str]] = {}
|
||||
for username, authstuff in get_users().items():
|
||||
if not is_valid_username(username):
|
||||
raise Exception('Invalid username, can only contain characters and numbers.')
|
||||
|
||||
if isinstance(authstuff, str):
|
||||
# just a password, make a key
|
||||
users_table[username] = {}
|
||||
|
|
|
@ -30,15 +30,6 @@
|
|||
</center>
|
||||
{{ render_messages(container=True, dismissible=True) }}
|
||||
<form role="form" action="{{ url_for('simple_capture') }}" method=post enctype=multipart/form-data>
|
||||
<div class="row mb-3">
|
||||
<div class="col-sm-10">
|
||||
<div class="form-check">
|
||||
<input class="form-check-input" id="auto_report" type="checkbox" name="auto_report" checked=checked>
|
||||
<label for="auto_report" class="form-check-label">Auto report</label>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Submission type -->
|
||||
|
||||
<div class="tab-content" id="nav-tabContent">
|
||||
|
@ -109,4 +100,4 @@
|
|||
}
|
||||
})
|
||||
</script>
|
||||
{% endblock %}
|
||||
{% endblock %}
|
||||
|
|
Loading…
Reference in New Issue