From ca13a334723b1650992cc2b2c1c8b488430e0019 Mon Sep 17 00:00:00 2001 From: terrtia Date: Fri, 6 Sep 2024 15:04:28 +0200 Subject: [PATCH] chg: [access logs] add user_agent --- bin/lib/ail_logger.py | 2 +- var/www/blueprints/root.py | 28 ++++++++++++++-------------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/bin/lib/ail_logger.py b/bin/lib/ail_logger.py index ac166532..1790c793 100755 --- a/bin/lib/ail_logger.py +++ b/bin/lib/ail_logger.py @@ -34,7 +34,7 @@ def get_access_config(create=False): logger = logging.getLogger('access.log') if create: - formatter = logging.Formatter('%(asctime)s - %(ip_address)s - %(levelname)s - %(user_id)s - %(message)s') + formatter = logging.Formatter('%(asctime)s - %(ip_address)s - %(user_agent)s - %(levelname)s - %(user_id)s - %(message)s') # STDOUT handler = logging.StreamHandler() diff --git a/var/www/blueprints/root.py b/var/www/blueprints/root.py index f0ab530a..2cf4796f 100644 --- a/var/www/blueprints/root.py +++ b/var/www/blueprints/root.py @@ -65,7 +65,7 @@ def login(): username = request.form.get('username') if not username: username = '' - access_logger.warning(f'Brute Force', extra={'user_id': username, 'ip_address': current_ip}) + access_logger.warning(f'Brute Force', extra={'user_id': username, 'ip_address': current_ip, 'user_agent': request.user_agent}) logging_error = f'Max Connection Attempts reached, Please wait {wait_time}s' return render_template("login.html", error=logging_error) @@ -86,14 +86,14 @@ def login(): login_failed_user_id = int(login_failed_user_id) if login_failed_user_id >= 5: wait_time = r_cache.ttl(f'failed_login_user_id:{username}') - access_logger.warning(f'Max login attempts reached', extra={'user_id': user.get_user_id(), 'ip_address': current_ip}) + access_logger.warning(f'Max login attempts reached', extra={'user_id': user.get_user_id(), 'ip_address': current_ip, 'user_agent': request.user_agent}) logging_error = f'Max Connection Attempts reached, Please wait {wait_time}s' return render_template("login.html", error=logging_error) if user.exists() and user.check_password(password): if not check_user_role_integrity(user.get_user_id()): logging_error = 'Incorrect User ACL, Please contact your administrator' - access_logger.info(f'Login fail: Invalid ACL', extra={'user_id': user.get_user_id(), 'ip_address': current_ip}) + access_logger.info(f'Login fail: Invalid ACL', extra={'user_id': user.get_user_id(), 'ip_address': current_ip, 'user_agent': request.user_agent}) return render_template("login.html", error=logging_error) if user.is_2fa_enabled(): @@ -104,7 +104,7 @@ def login(): if not user.is_2fa_setup(): return redirect(url_for('root.setup_2fa')) else: - access_logger.info(f'First Login', extra={'user_id': user.get_user_id(), 'ip_address': current_ip}) + access_logger.info(f'First Login', extra={'user_id': user.get_user_id(), 'ip_address': current_ip, 'user_agent': request.user_agent}) if next_page and next_page != 'None' and next_page != '/': return redirect(url_for('root.verify_2fa', next=next_page)) else: @@ -115,7 +115,7 @@ def login(): user.rotate_session() login_user(user) user.update_last_login() - access_logger.info(f'Login', extra={'user_id': user.get_user_id(), 'ip_address': current_ip}) + access_logger.info(f'Login', extra={'user_id': user.get_user_id(), 'ip_address': current_ip, 'user_agent': request.user_agent}) if user.request_password_change(): return redirect(url_for('root.change_password')) @@ -138,7 +138,7 @@ def login(): r_cache.expire(f'failed_login_user_id:{username}', 300) # - access_logger.info(f'Login Failed', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0]}) + access_logger.info(f'Login Failed', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0], 'user_agent': request.user_agent}) logging_error = 'Login/Password Incorrect' return render_template("login.html", error=logging_error) @@ -166,7 +166,7 @@ def verify_2fa(): if otp_expire < int(time.time()): # TODO LOG session.pop('user_id', None) session.pop('otp_expire', None) - access_logger.info(f'First Login Expired', extra={'user_id': user_id, 'ip_address': request.access_route[0]}) + access_logger.info(f'First Login Expired', extra={'user_id': user_id, 'ip_address': request.access_route[0], 'user_agent': request.user_agent}) error = "First Login Expired" return redirect(url_for('root.login', error=error)) @@ -188,7 +188,7 @@ def verify_2fa(): login_user(user) user.update_last_login() - access_logger.info(f'2FA login', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0]}) + access_logger.info(f'2FA login', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0], 'user_agent': request.user_agent}) if user.request_password_change(): return redirect(url_for('root.change_password')) @@ -199,7 +199,7 @@ def verify_2fa(): return redirect(url_for('dashboard.index')) else: htop_counter = user.get_htop_counter() - access_logger.info(f'Invalid OTP', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0]}) + access_logger.info(f'Invalid OTP', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0], 'user_agent': request.user_agent}) error = "The OTP is incorrect or has expired" return render_template("verify_otp.html", htop_counter=htop_counter, next_page=next_page, error=error) @@ -220,7 +220,7 @@ def setup_2fa(): if otp_expire < int(time.time()): # TODO LOG session.pop('user_id', None) session.pop('otp_expire', None) - access_logger.info(f'First Login Expired', extra={'user_id': user_id, 'ip_address': request.access_route[0]}) + access_logger.info(f'First Login Expired', extra={'user_id': user_id, 'ip_address': request.access_route[0], 'user_agent': request.user_agent}) error = "First Login Expired" return redirect(url_for('root.login', error=error)) @@ -243,14 +243,14 @@ def setup_2fa(): login_user(user) user.update_last_login() - access_logger.info(f'2FA login', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0]}) + access_logger.info(f'2FA login', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0], 'user_agent': request.user_agent}) if user.request_password_change(): return redirect(url_for('root.change_password')) else: return redirect(url_for('dashboard.index')) else: - access_logger.info(f'OTP Invalid', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0]}) + access_logger.info(f'OTP Invalid', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0], 'user_agent': request.user_agent}) error = "The OTP is incorrect or has expired" return redirect(url_for('root.setup_2fa', error=error)) else: @@ -278,7 +278,7 @@ def change_password(): res = api_change_user_self_password(user_id, password1) if res[1] != 200: return create_json_response(res[0], res[1]) - access_logger.info(f'Password change', extra={'user_id': user_id, 'ip_address': request.access_route[0]}) + access_logger.info(f'Password change', extra={'user_id': user_id, 'ip_address': request.access_route[0], 'user_agent': request.user_agent}) # update Note # dashboard return redirect(url_for('dashboard.index', update_note=True)) @@ -295,7 +295,7 @@ def change_password(): @root.route('/logout') @login_required def logout(): - access_logger.info(f'Logout', extra={'user_id': current_user.get_user_id(), 'ip_address': request.access_route[0]}) + access_logger.info(f'Logout', extra={'user_id': current_user.get_user_id(), 'ip_address': request.access_route[0], 'user_agent': request.user_agent}) current_user.kill_session() logout_user() return redirect(url_for('root.login'))