2024-05-06 16:21:00 +02:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
# -*-coding:UTF-8 -*
|
|
|
|
|
|
|
|
import bcrypt
|
|
|
|
import os
|
|
|
|
import pyotp
|
|
|
|
import re
|
|
|
|
import secrets
|
|
|
|
import sys
|
|
|
|
|
2024-06-26 13:55:39 +02:00
|
|
|
import segno
|
|
|
|
|
|
|
|
from base64 import b64encode
|
2024-05-06 16:21:00 +02:00
|
|
|
from flask_login import UserMixin
|
2024-06-26 13:55:39 +02:00
|
|
|
from io import BytesIO
|
2024-05-06 16:21:00 +02:00
|
|
|
from uuid import uuid4
|
|
|
|
|
|
|
|
sys.path.append(os.environ['AIL_BIN'])
|
|
|
|
##################################
|
|
|
|
# Import Project packages
|
|
|
|
##################################
|
|
|
|
from lib.ConfigLoader import ConfigLoader
|
|
|
|
|
|
|
|
# Config
|
|
|
|
config_loader = ConfigLoader()
|
|
|
|
r_serv_db = config_loader.get_db_conn("Kvrocks_DB")
|
|
|
|
r_cache = config_loader.get_redis_conn("Redis_Cache")
|
2024-06-26 13:55:39 +02:00
|
|
|
|
|
|
|
if config_loader.get_config_boolean('Users', 'force_2fa'):
|
|
|
|
r_serv_db.hset('ail:2fa', '2fa', 1)
|
|
|
|
else:
|
|
|
|
r_serv_db.hset('ail:2fa', '2fa', 0)
|
2024-05-06 16:21:00 +02:00
|
|
|
config_loader = None
|
|
|
|
|
|
|
|
regex_password = r'^(?=(.*\d){2})(?=.*[a-z])(?=.*[A-Z]).{10,100}$'
|
|
|
|
regex_password = re.compile(regex_password)
|
|
|
|
|
|
|
|
# # TODO: migrate Role_Manager
|
|
|
|
|
|
|
|
|
|
|
|
#### SESSIONS ####
|
|
|
|
|
|
|
|
def get_sessions():
|
|
|
|
r_cache.smembers('ail:sessions')
|
|
|
|
|
|
|
|
def exists_session(session):
|
|
|
|
r_cache.hexists('ail:sessions', session)
|
|
|
|
|
|
|
|
def exists_session_user(user_id):
|
|
|
|
r_cache.hexists('ail:sessions:users', user_id)
|
|
|
|
|
|
|
|
def get_session_user(session):
|
|
|
|
return r_cache.hget('ail:sessions', session)
|
|
|
|
|
|
|
|
def get_user_session(user_id):
|
|
|
|
return r_cache.hget('ail:sessions:users', user_id)
|
|
|
|
|
|
|
|
def _generate_session_key(user_id):
|
|
|
|
return f'{user_id}:{str(uuid4())}'
|
|
|
|
|
|
|
|
def _rotate_user_session(user_id):
|
|
|
|
kill_session_user(user_id)
|
|
|
|
new_session = _generate_session_key(user_id)
|
|
|
|
r_cache.hset('ail:sessions', new_session, user_id)
|
|
|
|
r_cache.hset('ail:sessions:users', user_id, new_session)
|
|
|
|
return new_session
|
|
|
|
|
|
|
|
def kill_session_user(user_id):
|
|
|
|
session = get_user_session(user_id)
|
|
|
|
if session:
|
|
|
|
r_cache.hdel('ail:sessions', session)
|
|
|
|
r_cache.hdel('ail:sessions:users', user_id)
|
|
|
|
|
|
|
|
def kill_sessions():
|
|
|
|
r_cache.delete('ail:sessions')
|
|
|
|
r_cache.delete('ail:sessions:users')
|
|
|
|
|
|
|
|
|
|
|
|
#### PASSWORDS ####
|
|
|
|
|
|
|
|
def check_password_strength(password):
|
|
|
|
result = regex_password.match(password)
|
|
|
|
if result:
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
return False
|
|
|
|
|
|
|
|
def gen_password():
|
|
|
|
return secrets.token_urlsafe(30)
|
|
|
|
|
|
|
|
def hashing_password(password):
|
|
|
|
password = password.encode()
|
|
|
|
return bcrypt.hashpw(password, bcrypt.gensalt())
|
|
|
|
|
2024-06-26 13:55:39 +02:00
|
|
|
def get_user_passwd_hash(user_id):
|
|
|
|
return r_serv_db.hget('ail:users:all', user_id)
|
|
|
|
|
2024-05-06 16:21:00 +02:00
|
|
|
## --PASSWORDS-- ##
|
|
|
|
|
2024-05-10 11:33:27 +02:00
|
|
|
def check_email(email):
|
|
|
|
email_regex = re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}')
|
|
|
|
result = email_regex.match(email)
|
|
|
|
if result:
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
return False
|
2024-05-06 16:21:00 +02:00
|
|
|
|
|
|
|
#### TOKENS ####
|
|
|
|
|
2024-06-26 13:55:39 +02:00
|
|
|
def get_user_token(user_id):
|
|
|
|
return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'token')
|
|
|
|
|
|
|
|
def get_token_user(token):
|
|
|
|
return r_serv_db.hget('ail:users:tokens', token)
|
|
|
|
|
|
|
|
def exists_token(token):
|
|
|
|
return r_serv_db.hexists('ail:users:tokens', token)
|
|
|
|
|
2024-05-06 16:21:00 +02:00
|
|
|
def gen_token():
|
|
|
|
return secrets.token_urlsafe(41)
|
|
|
|
|
|
|
|
def _delete_user_token(user_id):
|
|
|
|
current_token = get_user_token(user_id)
|
|
|
|
if current_token:
|
|
|
|
r_serv_db.hdel('ail:users:tokens', current_token)
|
|
|
|
|
|
|
|
def _set_user_token(user_id, token):
|
|
|
|
r_serv_db.hset('ail:users:tokens', token, user_id)
|
|
|
|
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'token', token)
|
|
|
|
|
|
|
|
def generate_new_token(user_id):
|
|
|
|
# create user token
|
|
|
|
_delete_user_token(user_id)
|
|
|
|
token = gen_token()
|
|
|
|
_set_user_token(user_id, token)
|
|
|
|
|
|
|
|
def get_default_admin_token(): # TODO REMOVE ME ##############################################################################
|
|
|
|
if r_serv_db.exists('ail:user:metadata:admin@admin.test'):
|
|
|
|
return r_serv_db.hget('ail:user:metadata:admin@admin.test', 'token')
|
|
|
|
else:
|
|
|
|
return ''
|
|
|
|
|
|
|
|
## --TOKENS-- ##
|
|
|
|
|
|
|
|
|
|
|
|
#### OTP ####
|
|
|
|
|
|
|
|
def _get_totp(secret):
|
|
|
|
return pyotp.TOTP(secret)
|
|
|
|
|
|
|
|
def _verify_totp(totp, code):
|
|
|
|
return totp.verify(code)
|
|
|
|
|
|
|
|
def _get_hotp(secret):
|
|
|
|
return pyotp.HOTP(secret)
|
|
|
|
|
|
|
|
def _verify_hotp(hotp, counter, code):
|
|
|
|
return hotp.verify(code, counter)
|
|
|
|
|
2024-06-26 13:55:39 +02:00
|
|
|
def get_user_otp_secret(user_id):
|
|
|
|
return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'otp_secret')
|
|
|
|
|
|
|
|
def get_user_hotp_counter(user_id):
|
|
|
|
return int(r_serv_db.hget(f'ail:user:metadata:{user_id}', 'otp_counter'))
|
|
|
|
|
|
|
|
def verify_user_totp(user_id, code):
|
|
|
|
totp = _get_totp(get_user_otp_secret(user_id))
|
|
|
|
return _verify_totp(totp, code)
|
|
|
|
|
2024-06-27 17:06:05 +02:00
|
|
|
def verify_user_hotp(user_id, code):
|
2024-06-26 13:55:39 +02:00
|
|
|
hotp = _get_hotp(get_user_otp_secret(user_id))
|
|
|
|
counter = get_user_hotp_counter(user_id)
|
2024-06-27 17:06:05 +02:00
|
|
|
valid = _verify_hotp(hotp, counter, code)
|
|
|
|
if valid:
|
|
|
|
r_serv_db.hincrby(f'ail:user:metadata:{user_id}', 'otp_counter', 1)
|
|
|
|
return valid
|
2024-06-26 13:55:39 +02:00
|
|
|
|
|
|
|
def verify_user_otp(user_id, code):
|
|
|
|
if verify_user_totp(user_id, code):
|
|
|
|
return True
|
|
|
|
elif verify_user_hotp(user_id, code):
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
|
|
def create_user_otp(user_id):
|
|
|
|
secret = pyotp.random_base32()
|
|
|
|
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'otp_secret', secret)
|
|
|
|
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'otp_counter', 0)
|
|
|
|
enable_user_2fa(user_id)
|
|
|
|
|
|
|
|
def delete_user_otp(user_id):
|
|
|
|
r_serv_db.hdel(f'ail:user:metadata:{user_id}', 'otp_secret')
|
|
|
|
r_serv_db.hdel(f'ail:user:metadata:{user_id}', 'otp_counter')
|
|
|
|
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'otp_setup', 0)
|
|
|
|
disable_user_2fa(user_id)
|
|
|
|
|
|
|
|
def get_user_otp_uri(user_id, instance_name):
|
|
|
|
return pyotp.totp.TOTP(get_user_otp_secret(user_id)).provisioning_uri(name=user_id, issuer_name=instance_name)
|
|
|
|
|
|
|
|
def get_user_otp_qr_code(user_id, instance_name):
|
|
|
|
uri = get_user_otp_uri(user_id, instance_name)
|
|
|
|
qrcode = segno.make_qr(uri)
|
|
|
|
buff = BytesIO()
|
|
|
|
qrcode.save(buff, kind='png', scale=10)
|
|
|
|
return b64encode(buff.getvalue()).decode()
|
|
|
|
# qrcode.save('qrcode.png', scale=10)
|
|
|
|
|
|
|
|
def get_user_hotp_code(user_id):
|
|
|
|
hotp = _get_hotp(get_user_otp_secret(user_id))
|
|
|
|
counter = get_user_hotp_counter(user_id)
|
|
|
|
codes = []
|
|
|
|
for i in range(counter, counter + 20):
|
|
|
|
codes.append(f'{i}: {hotp.at(i)}')
|
|
|
|
return codes
|
|
|
|
|
|
|
|
# TODO GET USER HOTP LISTS
|
|
|
|
# TODO RESET OTP
|
|
|
|
|
|
|
|
def is_user_otp_setup(user_id):
|
|
|
|
otp_setup = r_serv_db.hget(f'ail:user:metadata:{user_id}', 'otp_setup')
|
|
|
|
if otp_setup:
|
|
|
|
return int(otp_setup) == 1
|
|
|
|
return False
|
|
|
|
|
2024-05-06 16:21:00 +02:00
|
|
|
## --OTP-- ##
|
|
|
|
|
2024-06-26 13:55:39 +02:00
|
|
|
#### 2FA ####
|
|
|
|
|
|
|
|
# Global 2fa option
|
|
|
|
def is_2fa_enabled():
|
|
|
|
fa2 = r_serv_db.hget('ail:2fa', '2fa')
|
|
|
|
if fa2:
|
|
|
|
return int(fa2) == 1
|
|
|
|
return False
|
|
|
|
|
|
|
|
def is_user_2fa_enabled(user_id):
|
|
|
|
fa2 = r_serv_db.hget(f'ail:user:metadata:{user_id}', '2fa')
|
|
|
|
if fa2:
|
|
|
|
return int(fa2) == 1
|
|
|
|
return False
|
|
|
|
|
|
|
|
def enable_user_2fa(user_id):
|
|
|
|
return r_serv_db.hset(f'ail:user:metadata:{user_id}', '2fa', 1)
|
|
|
|
|
|
|
|
def disable_user_2fa(user_id):
|
|
|
|
return r_serv_db.hset(f'ail:user:metadata:{user_id}', '2fa', 0)
|
|
|
|
|
|
|
|
## --2FA-- ##
|
|
|
|
|
2024-05-06 16:21:00 +02:00
|
|
|
#### USERS ####
|
|
|
|
|
|
|
|
def get_users():
|
|
|
|
return r_serv_db.hkeys('ail:users:all')
|
|
|
|
|
|
|
|
def get_user_role(user_id):
|
|
|
|
return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'role')
|
|
|
|
|
|
|
|
|
|
|
|
########################################################################################################################
|
|
|
|
########################################################################################################################
|
|
|
|
|
|
|
|
# TODO USER LAST LOGIN TIME
|
|
|
|
# TODO Check if logged
|
2024-05-08 14:09:53 +02:00
|
|
|
|
|
|
|
# TODO USER: - Creation Date
|
|
|
|
# - Last Login
|
|
|
|
# - Last Request
|
|
|
|
# - Last API Usage
|
|
|
|
# - Organisation ???
|
|
|
|
# - Disabled / Lock
|
|
|
|
|
2024-05-06 16:21:00 +02:00
|
|
|
class AILUser(UserMixin):
|
|
|
|
def __init__(self, user_id):
|
|
|
|
self.user_id = user_id
|
|
|
|
session = get_user_session(self.user_id)
|
|
|
|
if session:
|
|
|
|
self.id = session
|
|
|
|
else:
|
|
|
|
self.id = "__anonymous__"
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def get(self_class, id):
|
|
|
|
return self_class(id)
|
|
|
|
|
|
|
|
# @property
|
|
|
|
# def is_anonymous(self):
|
|
|
|
# return self.id == "__anonymous__"
|
|
|
|
|
|
|
|
# def get_id(self):
|
|
|
|
# return self.id
|
|
|
|
|
|
|
|
def get_user_id(self):
|
|
|
|
return self.user_id
|
|
|
|
|
|
|
|
def exists(self): # TODO CHECK USAGE
|
|
|
|
return r_serv_db.exists(f'ail:user:metadata:{self.user_id}')
|
|
|
|
|
2024-05-08 14:09:53 +02:00
|
|
|
def get_meta(self, options=set()): # TODO user creation date
|
|
|
|
meta = {'id': self.user_id}
|
|
|
|
if 'api_key' in options: # TODO add option to censor key
|
|
|
|
meta['api_key'] = self.get_api_key()
|
|
|
|
if 'role' in options:
|
|
|
|
meta['role'] = get_user_role(self.user_id)
|
2024-06-27 17:06:05 +02:00
|
|
|
if '2fa' in options:
|
|
|
|
meta['2fa'] = self.is_2fa_enabled()
|
|
|
|
if 'otp_setup' in options:
|
|
|
|
meta['otp_setup'] = self.is_2fa_setup()
|
2024-05-08 14:09:53 +02:00
|
|
|
return meta
|
2024-05-06 16:21:00 +02:00
|
|
|
|
|
|
|
## SESSION ##
|
|
|
|
|
|
|
|
def is_logged(self): #####################################################################################################
|
|
|
|
pass
|
|
|
|
|
|
|
|
def get_session(self):
|
|
|
|
return self.id
|
|
|
|
|
|
|
|
def rotate_session(self):
|
|
|
|
self.id = _rotate_user_session(self.user_id)
|
|
|
|
print('rotate session:', self.id)
|
|
|
|
return self.id
|
|
|
|
|
|
|
|
def kill_session(self):
|
|
|
|
kill_session_user(self.user_id)
|
|
|
|
self.id = None
|
|
|
|
|
|
|
|
## PASSWORD ##
|
|
|
|
|
|
|
|
def is_password_change_requested(self):
|
|
|
|
if r_serv_db.hget(f'ail:user:metadata:{self.user_id}', 'change_passwd') == 1:
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
return False
|
|
|
|
|
|
|
|
def request_password_change(self):
|
|
|
|
r_serv_db.hset(f'ail:user:metadata:{self.user_id}', 'change_passwd', 1)
|
|
|
|
|
|
|
|
def check_password(self, password):
|
|
|
|
password = password.encode()
|
|
|
|
hashed_password = r_serv_db.hget('ail:users:all', self.user_id).encode()
|
|
|
|
if bcrypt.checkpw(password, hashed_password):
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
return False
|
|
|
|
|
|
|
|
def edit_password(self, password_hash, chg_passwd=False): # TODO REPLACE BY PASSWORD
|
|
|
|
if chg_passwd:
|
|
|
|
r_serv_db.hset(f'ail:user:metadata:{self.user_id}', 'change_passwd', 1)
|
|
|
|
else:
|
|
|
|
r_serv_db.hdel(f'ail:user:metadata:{self.user_id}', 'change_passwd')
|
|
|
|
# remove default user password file ########################################################################## TODO MOVE ME
|
|
|
|
if self.user_id == 'admin@admin.test':
|
|
|
|
default_passwd_file = os.path.join(os.environ['AIL_HOME'], 'DEFAULT_PASSWORD')
|
|
|
|
if os.path.isfile(default_passwd_file):
|
|
|
|
os.remove(default_passwd_file)
|
|
|
|
r_serv_db.hset('ail:users:all', self.user_id, password_hash)
|
|
|
|
# create new token
|
|
|
|
generate_new_token(self.user_id)
|
|
|
|
|
2024-05-08 14:09:53 +02:00
|
|
|
## TOKEN ##
|
|
|
|
|
|
|
|
def get_api_key(self):
|
|
|
|
return get_user_token(self.user_id)
|
|
|
|
|
|
|
|
def new_api_key(self):
|
|
|
|
_delete_user_token(self.user_id)
|
|
|
|
new_api_key = gen_token()
|
|
|
|
_set_user_token(self.user_id, new_api_key)
|
|
|
|
return new_api_key
|
|
|
|
|
2024-06-26 13:55:39 +02:00
|
|
|
## OTP ##
|
|
|
|
|
|
|
|
def is_2fa_setup(self):
|
|
|
|
return is_user_otp_setup(self.user_id)
|
|
|
|
|
|
|
|
def is_2fa_enabled(self):
|
|
|
|
if is_2fa_enabled():
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
return is_user_2fa_enabled(self.user_id)
|
|
|
|
|
|
|
|
def get_htop_counter(self):
|
|
|
|
return get_user_hotp_counter(self.user_id)
|
|
|
|
|
|
|
|
def is_valid_otp(self, code):
|
|
|
|
return verify_user_otp(self.user_id, code)
|
|
|
|
|
|
|
|
def init_setup_2fa(self, create=True):
|
|
|
|
if create:
|
|
|
|
create_user_otp(self.user_id)
|
2024-06-27 17:06:05 +02:00
|
|
|
instance_name = 'AIL TEST'
|
|
|
|
return get_user_otp_qr_code(self.user_id, instance_name), get_user_otp_uri(self.user_id, instance_name), get_user_hotp_code(self.user_id)
|
2024-06-26 13:55:39 +02:00
|
|
|
|
|
|
|
def setup_2fa(self):
|
|
|
|
r_serv_db.hset(f'ail:user:metadata:{self.user_id}', 'otp_setup', 1)
|
|
|
|
|
2024-05-06 16:21:00 +02:00
|
|
|
## ROLE ##
|
|
|
|
|
|
|
|
def is_in_role(self, role): # TODO Get role via user alternative ID
|
|
|
|
print('is_in_role')
|
|
|
|
print(f'ail:users:role:{role}', self.user_id)
|
|
|
|
if r_serv_db.sismember(f'ail:users:role:{role}', self.user_id):
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
return False
|
|
|
|
|
|
|
|
def get_role(self):
|
|
|
|
return r_serv_db.hget(f'ail:user:metadata:{self.user_id}', 'role')
|
|
|
|
|
2024-05-08 14:09:53 +02:00
|
|
|
## ##
|
2024-05-06 16:21:00 +02:00
|
|
|
|
2024-05-08 14:09:53 +02:00
|
|
|
def delete(self):
|
2024-05-06 16:21:00 +02:00
|
|
|
kill_session_user(self.user_id)
|
2024-05-08 14:09:53 +02:00
|
|
|
for role_id in get_all_roles():
|
|
|
|
r_serv_db.srem(f'ail:users:role:{role_id}', self.user_id)
|
|
|
|
user_token = self.get_api_key()
|
|
|
|
if user_token:
|
|
|
|
r_serv_db.hdel('ail:users:tokens', user_token)
|
|
|
|
r_serv_db.delete(f'ail:user:metadata:{self.user_id}')
|
|
|
|
r_serv_db.hdel('ail:users:all', self.user_id)
|
2024-05-06 16:21:00 +02:00
|
|
|
|
|
|
|
|
|
|
|
# def create_user(user_id):
|
|
|
|
|
2024-05-08 14:09:53 +02:00
|
|
|
#### API ####
|
|
|
|
|
|
|
|
def api_get_users_meta():
|
|
|
|
meta = {'users': []}
|
2024-06-27 17:06:05 +02:00
|
|
|
options = {'api_key', 'role', '2fa', 'otp_setup'}
|
2024-05-08 14:09:53 +02:00
|
|
|
for user_id in get_users():
|
|
|
|
user = AILUser(user_id)
|
|
|
|
meta['users'].append(user.get_meta(options=options))
|
|
|
|
return meta
|
|
|
|
|
2024-05-10 11:33:27 +02:00
|
|
|
def api_get_user_profile(user_id):
|
2024-06-27 17:06:05 +02:00
|
|
|
options = {'api_key', 'role', '2fa'}
|
2024-05-10 11:33:27 +02:00
|
|
|
user = AILUser(user_id)
|
|
|
|
if not user.exists():
|
|
|
|
return {'status': 'error', 'reason': 'User not found'}, 404
|
|
|
|
meta = user.get_meta(options=options)
|
|
|
|
return meta, 200
|
|
|
|
|
2024-06-27 17:06:05 +02:00
|
|
|
def api_get_user_hotp(user_id):
|
|
|
|
user = AILUser(user_id)
|
|
|
|
if not user.exists():
|
|
|
|
return {'status': 'error', 'reason': 'User not found'}, 404
|
|
|
|
hotp = get_user_hotp_code(user_id)
|
|
|
|
return hotp, 200
|
|
|
|
|
|
|
|
def api_enable_user_otp(user_id):
|
|
|
|
user = AILUser(user_id)
|
|
|
|
if not user.exists():
|
|
|
|
return {'status': 'error', 'reason': 'User not found'}, 404
|
|
|
|
if user.is_2fa_enabled():
|
|
|
|
return {'status': 'error', 'reason': 'User OTP is already setup'}, 400
|
|
|
|
delete_user_otp(user_id)
|
|
|
|
enable_user_2fa(user_id)
|
|
|
|
return user_id, 200
|
|
|
|
|
|
|
|
def api_disable_user_otp(user_id):
|
|
|
|
user = AILUser(user_id)
|
|
|
|
if not user.exists():
|
|
|
|
return {'status': 'error', 'reason': 'User not found'}, 404
|
|
|
|
if not user.is_2fa_enabled():
|
|
|
|
return {'status': 'error', 'reason': 'User OTP is not enabled'}, 400
|
|
|
|
if is_2fa_enabled():
|
|
|
|
return {'status': 'error', 'reason': '2FA is enforced on this instance'}, 400
|
|
|
|
disable_user_2fa(user_id)
|
|
|
|
delete_user_otp(user_id)
|
|
|
|
return user_id, 200
|
|
|
|
|
|
|
|
def api_reset_user_otp(admin_id, user_id):
|
|
|
|
user = AILUser(user_id)
|
|
|
|
if not user.exists():
|
|
|
|
return {'status': 'error', 'reason': 'User not found'}, 404
|
|
|
|
admin = AILUser(admin_id)
|
|
|
|
if not admin.is_in_role('admin'):
|
|
|
|
return {'status': 'error', 'reason': 'Access Denied'}, 403
|
|
|
|
if not user.is_2fa_setup():
|
|
|
|
return {'status': 'error', 'reason': 'User OTP is not setup'}, 400
|
|
|
|
delete_user_otp(user_id)
|
|
|
|
enable_user_2fa(user_id)
|
|
|
|
return user_id, 200
|
|
|
|
|
2024-05-10 11:33:27 +02:00
|
|
|
def api_create_user_api_key_self(user_id): # TODO LOG USER ID
|
|
|
|
user = AILUser(user_id)
|
|
|
|
if not user.exists():
|
|
|
|
return {'status': 'error', 'reason': 'User not found'}, 404
|
|
|
|
return user.new_api_key(), 200
|
|
|
|
|
2024-05-08 14:09:53 +02:00
|
|
|
def api_create_user_api_key(user_id, admin_id): # TODO LOG ADMIN ID
|
|
|
|
user = AILUser(user_id)
|
|
|
|
if not user.exists():
|
|
|
|
return {'status': 'error', 'reason': 'User not found'}, 404
|
|
|
|
print(admin_id)
|
|
|
|
return user.new_api_key(), 200
|
|
|
|
|
|
|
|
def api_delete_user(user_id, admin_id): # TODO LOG ADMIN ID
|
|
|
|
user = AILUser(user_id)
|
|
|
|
if not user.exists():
|
|
|
|
return {'status': 'error', 'reason': 'User not found'}, 404
|
|
|
|
print(admin_id)
|
|
|
|
return user.delete(), 200
|
|
|
|
|
2024-05-06 16:21:00 +02:00
|
|
|
########################################################################################################################
|
|
|
|
########################################################################################################################
|
|
|
|
|
|
|
|
|
|
|
|
def exists_user(user_id):
|
|
|
|
return r_serv_db.exists(f'ail:user:metadata:{user_id}')
|
|
|
|
|
|
|
|
def get_user_metadata(user_id):
|
|
|
|
user_metadata = {'email': user_id,
|
|
|
|
'role': r_serv_db.hget(f'ail:user:metadata:{user_id}', 'role'),
|
|
|
|
'api_key': r_serv_db.hget(f'ail:user:metadata:{user_id}', 'token')}
|
|
|
|
return user_metadata
|
|
|
|
|
|
|
|
def get_users_metadata(list_users):
|
|
|
|
users = []
|
|
|
|
for user in list_users:
|
|
|
|
users.append(get_user_metadata(user))
|
|
|
|
return users
|
|
|
|
|
2024-06-27 17:06:05 +02:00
|
|
|
def create_user(user_id, password=None, chg_passwd=True, role=None, otp=False): # TODO ###############################################################
|
2024-05-06 16:21:00 +02:00
|
|
|
# # TODO: check password strength
|
|
|
|
if password:
|
|
|
|
new_password = password
|
|
|
|
else:
|
|
|
|
new_password = gen_password()
|
|
|
|
password_hash = hashing_password(new_password)
|
|
|
|
|
|
|
|
# EDIT
|
|
|
|
if exists_user(user_id):
|
|
|
|
if password or chg_passwd:
|
2024-06-27 17:06:05 +02:00
|
|
|
edit_user(user_id, password_hash, chg_passwd=chg_passwd)
|
2024-05-06 16:21:00 +02:00
|
|
|
if role:
|
|
|
|
edit_user_role(user_id, role)
|
|
|
|
# CREATE USER
|
|
|
|
else:
|
|
|
|
# Role
|
|
|
|
if not role:
|
|
|
|
role = get_default_role()
|
|
|
|
|
|
|
|
if role in get_all_roles():
|
|
|
|
for role_to_add in get_all_user_role(role):
|
|
|
|
r_serv_db.sadd(f'ail:users:role:{role_to_add}', user_id)
|
|
|
|
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'role', role)
|
|
|
|
|
|
|
|
r_serv_db.hset('ail:users:all', user_id, password_hash)
|
|
|
|
if chg_passwd:
|
|
|
|
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'change_passwd', 'True')
|
|
|
|
|
|
|
|
# create user token
|
|
|
|
generate_new_token(user_id)
|
|
|
|
|
2024-06-27 17:06:05 +02:00
|
|
|
if otp or is_2fa_enabled():
|
|
|
|
enable_user_2fa(user_id)
|
|
|
|
|
|
|
|
def edit_user(user_id, password_hash, chg_passwd=False, otp=False): # TODO ######################################################3333
|
2024-05-06 16:21:00 +02:00
|
|
|
if chg_passwd:
|
|
|
|
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'change_passwd', 'True')
|
|
|
|
else:
|
|
|
|
r_serv_db.hdel(f'ail:user:metadata:{user_id}', 'change_passwd')
|
|
|
|
# remove default user password file
|
|
|
|
if user_id == 'admin@admin.test':
|
|
|
|
default_passwd_file = os.path.join(os.environ['AIL_HOME'], 'DEFAULT_PASSWORD')
|
|
|
|
if os.path.isfile(default_passwd_file):
|
|
|
|
os.remove(default_passwd_file)
|
|
|
|
r_serv_db.hset('ail:users:all', user_id, password_hash)
|
|
|
|
# create new token
|
|
|
|
generate_new_token(user_id)
|
|
|
|
|
2024-06-27 17:06:05 +02:00
|
|
|
if otp or is_2fa_enabled():
|
|
|
|
enable_user_2fa(user_id)
|
|
|
|
else:
|
|
|
|
disable_user_2fa(user_id)
|
|
|
|
|
2024-05-06 16:21:00 +02:00
|
|
|
# # TODO: solve edge_case self delete
|
|
|
|
def delete_user(user_id):
|
|
|
|
if exists_user(user_id):
|
|
|
|
for role_id in get_all_roles():
|
|
|
|
r_serv_db.srem(f'ail:users:role:{role_id}', user_id)
|
|
|
|
user_token = get_user_token(user_id)
|
|
|
|
if user_token:
|
|
|
|
r_serv_db.hdel('ail:users:tokens', user_token)
|
|
|
|
r_serv_db.delete(f'ail:user:metadata:{user_id}')
|
|
|
|
r_serv_db.hdel('ail:users:all', user_id)
|
|
|
|
|
|
|
|
# # TODO: raise Exception
|
|
|
|
else:
|
|
|
|
print(f'Error: user {user_id} do not exist')
|
|
|
|
|
|
|
|
## --USERS-- ##
|
|
|
|
|
|
|
|
#### ROLES ####
|
|
|
|
|
|
|
|
def get_all_roles():
|
|
|
|
return r_serv_db.zrange('ail:roles:all', 0, -1)
|
|
|
|
|
|
|
|
# create role_list
|
|
|
|
def _create_roles_list():
|
|
|
|
if not r_serv_db.exists('ail:roles:all'):
|
|
|
|
r_serv_db.zadd('ail:roles:all', {'admin': 1})
|
|
|
|
r_serv_db.zadd('ail:roles:all', {'analyst': 2})
|
|
|
|
r_serv_db.zadd('ail:roles:all', {'user': 3})
|
|
|
|
r_serv_db.zadd('ail:roles:all', {'user_no_api': 4})
|
|
|
|
r_serv_db.zadd('ail:roles:all', {'read_only': 5})
|
|
|
|
|
|
|
|
def get_role_level(role):
|
|
|
|
return int(r_serv_db.zscore('ail:roles:all', role))
|
|
|
|
|
|
|
|
def get_user_role_by_range(inf, sup):
|
|
|
|
return r_serv_db.zrange('ail:roles:all', inf, sup)
|
|
|
|
|
|
|
|
def get_all_user_role(user_role):
|
|
|
|
current_role_val = get_role_level(user_role)
|
|
|
|
return r_serv_db.zrange('ail:roles:all', current_role_val - 1, -1)
|
|
|
|
|
|
|
|
def get_all_user_upper_role(user_role):
|
|
|
|
current_role_val = get_role_level(user_role)
|
|
|
|
# remove one rank
|
|
|
|
if current_role_val > 1:
|
|
|
|
return r_serv_db.zrange('ail:roles:all', 0, current_role_val -2)
|
|
|
|
else:
|
|
|
|
return []
|
|
|
|
|
|
|
|
def get_default_role():
|
|
|
|
return 'read_only'
|
|
|
|
|
|
|
|
def is_in_role(user_id, role):
|
|
|
|
return r_serv_db.sismember(f'ail:users:role:{role}', user_id)
|
|
|
|
|
|
|
|
def edit_user_role(user_id, role):
|
|
|
|
current_role = get_user_role(user_id)
|
|
|
|
if role != current_role:
|
|
|
|
request_level = get_role_level(role)
|
|
|
|
current_role = get_role_level(current_role)
|
|
|
|
|
|
|
|
if current_role < request_level:
|
|
|
|
role_to_remove = get_user_role_by_range(current_role - 1, request_level - 2)
|
|
|
|
for role_id in role_to_remove:
|
|
|
|
r_serv_db.srem(f'ail:users:role:{role_id}', user_id)
|
|
|
|
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'role', role)
|
|
|
|
else:
|
|
|
|
role_to_add = get_user_role_by_range(request_level - 1, current_role)
|
|
|
|
for role_id in role_to_add:
|
|
|
|
r_serv_db.sadd(f'ail:users:role:{role_id}', user_id)
|
|
|
|
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'role', role)
|
|
|
|
|
|
|
|
def check_user_role_integrity(user_id):
|
|
|
|
user_role = get_user_role(user_id)
|
|
|
|
all_user_role = get_all_user_role(user_role)
|
|
|
|
res = True
|
|
|
|
for role in all_user_role:
|
|
|
|
if not r_serv_db.sismember(f'ail:users:role:{role}', user_id):
|
|
|
|
res = False
|
|
|
|
upper_role = get_all_user_upper_role(user_role)
|
|
|
|
for role in upper_role:
|
|
|
|
if r_serv_db.sismember(f'ail:users:role:{role}', user_id):
|
|
|
|
res = False
|
|
|
|
return res
|
|
|
|
|
|
|
|
## --ROLES-- ##
|
2024-06-26 13:55:39 +02:00
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
user_id = 'admin@admin.test'
|
|
|
|
instance_name = 'AIL TEST'
|
|
|
|
delete_user_otp(user_id)
|
|
|
|
# q = get_user_otp_qr_code(user_id, instance_name)
|
|
|
|
# print(q)
|