chg: [Flask login] add brute force protection + log login errors

pull/359/head
Terrtia 2019-06-26 16:36:40 +02:00
parent 27ae671b3d
commit 5f157c52d8
No known key found for this signature in database
GPG Key ID: 1E1B1F50D84613D0
2 changed files with 65 additions and 0 deletions

View File

@ -30,6 +30,14 @@ Redis and ARDB overview
# Database Map:
### Redis cache
##### Brute force protection:
| Set Key | Value |
| ------ | ------ |
| failed_login_ip:**ip** | **nb login failed** | TTL
| failed_login_user_id:**user_id** | **nb login failed** | TTL
## DB0 - Core:
##### Update keys:

View File

@ -9,6 +9,8 @@ import time
import redis
import random
import logging
import logging.handlers
import configparser
from flask import Flask, render_template, jsonify, request, Request, session, redirect, url_for
@ -54,6 +56,27 @@ r_serv_tags = redis.StrictRedis(
db=cfg.getint("ARDB_Tags", "db"),
decode_responses=True)
r_cache = redis.StrictRedis(
host=cfg.get("Redis_Cache", "host"),
port=cfg.getint("Redis_Cache", "port"),
db=cfg.getint("Redis_Cache", "db"),
decode_responses=True)
# logs
log_dir = os.path.join(os.environ['AIL_HOME'], 'logs')
if not os.path.isdir(log_dir):
os.makedirs(logs_dir)
log_filename = os.path.join(log_dir, 'flask_server.logs')
logger = logging.getLogger()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler_log = logging.handlers.TimedRotatingFileHandler(log_filename, when="midnight", interval=1)
handler_log.suffix = '%Y-%m-%d.log'
handler_log.setFormatter(formatter)
handler_log.setLevel(30)
logger.addHandler(handler_log)
logger.setLevel(30)
# ========= =========#
# ========= TLS =========#
@ -156,9 +179,26 @@ def add_header(response):
response.headers['Cache-Control'] = 'public, max-age=0'
return response
# @app.route('/test', methods=['GET'])
# def test():
# for rule in app.url_map.iter_rules():
# print(rule)
# return 'o'
# ========== ROUTES ============
@app.route('/login', methods=['POST', 'GET'])
def login():
current_ip = request.remote_addr
login_failed_ip = r_cache.get('failed_login_ip:{}'.format(current_ip))
# brute force by ip
if login_failed_ip:
login_failed_ip = int(login_failed_ip)
if login_failed_ip >= 5:
error = 'Max Connection Attempts reached, Please wait {}s'.format(r_cache.ttl('failed_login_ip:{}'.format(current_ip)))
return render_template("login.html", error=error)
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
@ -166,6 +206,14 @@ def login():
if username is not None:
user = User.get(username)
login_failed_user_id = r_cache.get('failed_login_user_id:{}'.format(username))
# brute force by user_id
if login_failed_user_id:
login_failed_user_id = int(login_failed_user_id)
if login_failed_user_id >= 5:
error = 'Max Connection Attempts reached, Please wait {}s'.format(r_cache.ttl('failed_login_user_id:{}'.format(username)))
return render_template("login.html", error=error)
if user and user.check_password(password):
if not check_user_role_integrity(user.get_id()):
error = 'Incorrect User ACL, Please contact your administrator'
@ -175,7 +223,16 @@ def login():
return redirect(url_for('change_password'))
else:
return redirect(url_for('dashboard.index'))
# login failed
else:
# set brute force protection
logger.warning("Login failed, ip={}, username={}".format(current_ip, username))
r_cache.incr('failed_login_ip:{}'.format(current_ip))
r_cache.expire('failed_login_ip:{}'.format(current_ip), 300)
r_cache.incr('failed_login_user_id:{}'.format(username))
r_cache.expire('failed_login_user_id:{}'.format(username), 300)
#
error = 'Password Incorrect'
return render_template("login.html", error=error)