mirror of https://github.com/D4-project/d4-core
				
				
				
			
		
			
				
	
	
		
			185 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			185 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Python
		
	
	
| #!/usr/bin/env python3
 | |
| # -*-coding:UTF-8 -*
 | |
| 
 | |
| '''
 | |
|     Flask functions and routes for the rest api
 | |
| '''
 | |
| 
 | |
| import os
 | |
| import re
 | |
| import sys
 | |
| import redis
 | |
| 
 | |
| sys.path.append(os.path.join(os.environ['D4_HOME'], 'lib'))
 | |
| import ConfigLoader
 | |
| 
 | |
| from flask import Flask, render_template, jsonify, request, Blueprint, redirect, url_for, Response
 | |
| from flask_login import login_required, current_user
 | |
| 
 | |
| from Role_Manager import login_admin, login_user_basic
 | |
| from Role_Manager import create_user_db, edit_user_db, delete_user_db, check_password_strength, generate_new_token, gen_password, get_all_role
 | |
| 
 | |
| # ============ BLUEPRINT ============
 | |
| 
 | |
| settings = Blueprint('settings', __name__, template_folder='templates')
 | |
| 
 | |
| # ============ VARIABLES ============
 | |
| 
 | |
| email_regex = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}'
 | |
| email_regex = re.compile(email_regex)
 | |
| 
 | |
| ### Config ###
 | |
| config_loader = ConfigLoader.ConfigLoader()
 | |
| r_serv_metadata = config_loader.get_redis_conn("Redis_METADATA")
 | |
| r_serv_db = config_loader.get_redis_conn("Redis_SERV")
 | |
| config_loader = None
 | |
| ###  ###
 | |
| 
 | |
| # ============ FUNCTIONS ============
 | |
| 
 | |
| def one():
 | |
|     return 1
 | |
| 
 | |
| def check_email(email):
 | |
|     return email_regex.match(email)
 | |
| 
 | |
| def get_user_metadata(user_id):
 | |
|     user_metadata = {}
 | |
|     user_metadata['email'] = user_id
 | |
|     user_metadata['role'] = r_serv_db.hget('user_metadata:{}'.format(user_id), 'role')
 | |
|     user_metadata['api_key'] = r_serv_db.hget('user_metadata:{}'.format(user_id), 'token')
 | |
|     return user_metadata
 | |
| 
 | |
| def get_users_metadata(list_users):
 | |
|     users = []
 | |
|     for user in list_users:
 | |
|         users.append(get_user_metadata(user))
 | |
|     return users
 | |
| 
 | |
| def get_all_users():
 | |
|     return r_serv_db.hkeys('user:all')
 | |
| 
 | |
| 
 | |
| # ============= ROUTES ==============
 | |
| 
 | |
| @settings.route("/settings/", methods=['GET'])
 | |
| @login_required
 | |
| @login_user_basic
 | |
| def settings_page():
 | |
|     return redirect(url_for('settings.edit_profile'))
 | |
| 
 | |
| @settings.route("/settings/edit_profile", methods=['GET'])
 | |
| @login_required
 | |
| @login_user_basic
 | |
| def edit_profile():
 | |
|     user_metadata = get_user_metadata(current_user.get_id())
 | |
|     admin_level = current_user.is_in_role('admin')
 | |
|     return render_template("edit_profile.html", user_metadata=user_metadata,
 | |
|                             admin_level=admin_level)
 | |
| 
 | |
| @settings.route("/settings/new_token", methods=['GET'])
 | |
| @login_required
 | |
| @login_user_basic
 | |
| def new_token():
 | |
|     generate_new_token(current_user.get_id())
 | |
|     return redirect(url_for('settings.edit_profile'))
 | |
| 
 | |
| @settings.route("/settings/new_token_user", methods=['GET'])
 | |
| @login_required
 | |
| @login_admin
 | |
| def new_token_user():
 | |
|     user_id = request.args.get('user_id')
 | |
|     if r_serv_db.exists('user_metadata:{}'.format(user_id)):
 | |
|         generate_new_token(user_id)
 | |
|     return redirect(url_for('settings.users_list'))
 | |
| 
 | |
| @settings.route("/settings/create_user", methods=['GET'])
 | |
| @login_required
 | |
| @login_admin
 | |
| def create_user():
 | |
|     user_id = request.args.get('user_id')
 | |
|     error = request.args.get('error')
 | |
|     error_mail = request.args.get('error_mail')
 | |
|     role = None
 | |
|     if r_serv_db.exists('user_metadata:{}'.format(user_id)):
 | |
|         role = r_serv_db.hget('user_metadata:{}'.format(user_id), 'role')
 | |
|     else:
 | |
|         user_id = None
 | |
|     all_roles = get_all_role()
 | |
|     return render_template("create_user.html", all_roles=all_roles, user_id=user_id, user_role=role,
 | |
|                                         error=error, error_mail=error_mail,
 | |
|                                         admin_level=True)
 | |
| 
 | |
| @settings.route("/settings/create_user_post", methods=['POST'])
 | |
| @login_required
 | |
| @login_admin
 | |
| def create_user_post():
 | |
|     email = request.form.get('username')
 | |
|     role = request.form.get('user_role')
 | |
|     password1 = request.form.get('password1')
 | |
|     password2 = request.form.get('password2')
 | |
| 
 | |
|     all_roles = get_all_role()
 | |
| 
 | |
|     if email and len(email)< 300 and check_email(email) and role:
 | |
|         if role in all_roles:
 | |
|             # password set
 | |
|             if password1 and password2:
 | |
|                 if password1==password2:
 | |
|                     if check_password_strength(password1):
 | |
|                         password = password1
 | |
|                     else:
 | |
|                         return render_template("create_user.html", all_roles=all_roles, error="Incorrect Password", admin_level=True)
 | |
|                 else:
 | |
|                     return render_template("create_user.html", all_roles=all_roles, error="Passwords don't match", admin_level=True)
 | |
|             # generate password
 | |
|             else:
 | |
|                 password = gen_password()
 | |
| 
 | |
|             if current_user.is_in_role('admin'):
 | |
|                 # edit user
 | |
|                 if r_serv_db.exists('user_metadata:{}'.format(email)):
 | |
|                     if password1 and password2:
 | |
|                         edit_user_db(email, password=password, role=role)
 | |
|                         return redirect(url_for('settings.users_list', new_user=email, new_user_password=password, new_user_edited=True))
 | |
|                     else:
 | |
|                         edit_user_db(email, role=role)
 | |
|                         return redirect(url_for('settings.users_list', new_user=email, new_user_password='Password not changed', new_user_edited=True))
 | |
|                 # create user
 | |
|                 else:
 | |
|                     create_user_db(email, password, default=True, role=role)
 | |
|                     return redirect(url_for('settings.users_list', new_user=email, new_user_password=password, new_user_edited=False))
 | |
| 
 | |
|         else:
 | |
|             return render_template("create_user.html", all_roles=all_roles, admin_level=True)
 | |
|     else:
 | |
|         return render_template("create_user.html", all_roles=all_roles, error_mail=True, admin_level=True)
 | |
| 
 | |
| @settings.route("/settings/users_list", methods=['GET'])
 | |
| @login_required
 | |
| @login_admin
 | |
| def users_list():
 | |
|     all_users = get_users_metadata(get_all_users())
 | |
|     new_user = request.args.get('new_user')
 | |
|     new_user_dict = {}
 | |
|     if new_user:
 | |
|         new_user_dict['email'] = new_user
 | |
|         new_user_dict['edited'] = request.args.get('new_user_edited')
 | |
|         new_user_dict['password'] = request.args.get('new_user_password')
 | |
|     return render_template("users_list.html", all_users=all_users, new_user=new_user_dict, admin_level=True)
 | |
| 
 | |
| @settings.route("/settings/edit_user", methods=['GET'])
 | |
| @login_required
 | |
| @login_admin
 | |
| def edit_user():
 | |
|     user_id = request.args.get('user_id')
 | |
|     return redirect(url_for('settings.create_user', user_id=user_id))
 | |
| 
 | |
| @settings.route("/settings/delete_user", methods=['GET'])
 | |
| @login_required
 | |
| @login_admin
 | |
| def delete_user():
 | |
|     user_id = request.args.get('user_id')
 | |
|     delete_user_db(user_id)
 | |
|     return redirect(url_for('settings.users_list'))
 |