mirror of https://github.com/CIRCL/lookyloo
new: Optionally deduplicate notification (UUID or redirects)
parent
b6399b0a95
commit
4975506e2e
|
@ -49,7 +49,12 @@
|
|||
"smtp_host": "localhost",
|
||||
"smtp_port": "25",
|
||||
"confirm_message": "Message the users need to confirm before they submit a notification.",
|
||||
"defang_urls": true
|
||||
"defang_urls": true,
|
||||
"deduplicate": {
|
||||
"uuid": true,
|
||||
"hostnames": false,
|
||||
"interval_in_sec": 86400
|
||||
}
|
||||
},
|
||||
"email_smtp_auth": {
|
||||
"auth": false,
|
||||
|
|
|
@ -5,6 +5,7 @@ from __future__ import annotations
|
|||
import base64
|
||||
import copy
|
||||
import gzip
|
||||
import itertools
|
||||
import json
|
||||
import logging
|
||||
import operator
|
||||
|
@ -869,12 +870,47 @@ class Lookyloo():
|
|||
|
||||
return f"Malicious capture according to {len(modules)} module(s): {', '.join(modules)}"
|
||||
|
||||
def already_sent_mail(self, capture_uuid: str, /, uuid_only: bool=True) -> bool:
|
||||
'''Check if a mail was already sent for a specific capture.
|
||||
The check is either done on the UUID only, or on the chain of redirects (if any).
|
||||
In that second case, we take the chain of redirects, keep only the hostnames,
|
||||
aggregate them if the same one is there multiple times in a row (redirect http -> https),
|
||||
and concatenate the remaining ones.
|
||||
True if the mail was already sent in the last 24h, False otherwise.
|
||||
'''
|
||||
if uuid_only:
|
||||
return bool(self.redis.exists(f'sent_mail|{capture_uuid}'))
|
||||
cache = self.capture_cache(capture_uuid)
|
||||
if not cache:
|
||||
return False
|
||||
if hasattr(cache, 'redirects') and cache.redirects:
|
||||
hostnames = [h for h, l in itertools.groupby(urlparse(redirect).hostname for redirect in cache.redirects if urlparse(redirect).hostname) if h is not None]
|
||||
return bool(self.redis.exists(f'sent_mail|{"|".join(hostnames)}'))
|
||||
return False
|
||||
|
||||
def set_sent_mail_key(self, capture_uuid: str, /, deduplicate_interval: int) -> None:
|
||||
'''Set the key for the sent mail in redis'''
|
||||
self.redis.set(f'sent_mail|{capture_uuid}', 1, ex=deduplicate_interval)
|
||||
cache = self.capture_cache(capture_uuid)
|
||||
if cache and hasattr(cache, 'redirects') and cache.redirects:
|
||||
hostnames = [h for h, l in itertools.groupby(urlparse(redirect).hostname for redirect in cache.redirects if urlparse(redirect).hostname) if h is not None]
|
||||
self.redis.set(f'sent_mail|{"|".join(hostnames)}', 1, ex=deduplicate_interval)
|
||||
|
||||
def send_mail(self, capture_uuid: str, /, as_admin: bool, email: str | None=None, comment: str | None=None) -> bool | dict[str, Any]:
|
||||
'''Send an email notification regarding a specific capture'''
|
||||
if not get_config('generic', 'enable_mail_notification'):
|
||||
return {"error": "Unable to send mail: mail notification disabled"}
|
||||
|
||||
email_config = get_config('generic', 'email')
|
||||
if email_deduplicate := email_config.get('deduplicate'):
|
||||
if email_deduplicate.get('uuid') and self.already_sent_mail(capture_uuid, uuid_only=True):
|
||||
return {"error": "Mail already sent (same UUID)"}
|
||||
if email_deduplicate.get('hostnames') and self.already_sent_mail(capture_uuid, uuid_only=False):
|
||||
return {"error": "Mail already sent (same redirect chain)"}
|
||||
deduplicate_interval = email_deduplicate.get('interval_in_sec')
|
||||
else:
|
||||
deduplicate_interval = 0
|
||||
|
||||
smtp_auth = get_config('generic', 'email_smtp_auth')
|
||||
redirects = ''
|
||||
initial_url = ''
|
||||
|
@ -953,6 +989,8 @@ class Lookyloo():
|
|||
s.starttls()
|
||||
s.login(smtp_auth['smtp_user'], smtp_auth['smtp_pass'])
|
||||
s.send_message(msg)
|
||||
if deduplicate_interval:
|
||||
self.set_sent_mail_key(capture_uuid, deduplicate_interval)
|
||||
except Exception as e:
|
||||
self.logger.exception(e)
|
||||
self.logger.warning(msg.as_string())
|
||||
|
|
|
@ -1161,8 +1161,13 @@ def send_mail(tree_uuid: str) -> WerkzeugResponse:
|
|||
# skip clearly incorrect emails
|
||||
email = ''
|
||||
comment: str = request.form['comment'] if request.form.get('comment') else ''
|
||||
lookyloo.send_mail(tree_uuid, as_admin=flask_login.current_user.is_authenticated, email=email, comment=comment)
|
||||
flash("Email notification sent", 'success')
|
||||
send_status = lookyloo.send_mail(tree_uuid, as_admin=flask_login.current_user.is_authenticated, email=email, comment=comment)
|
||||
if not send_status:
|
||||
flash("Unable to send email notification.", 'error')
|
||||
elif isinstance(send_status, dict) and 'error' in send_status:
|
||||
flash(f"Unable to send email: {send_status['error']}", 'error')
|
||||
else:
|
||||
flash("Email notification sent", 'success')
|
||||
return redirect(url_for('tree', tree_uuid=tree_uuid))
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue