mirror of https://github.com/CIRCL/AIL-framework
chg: [rest API] access: check token user role
parent
3ad33bcb72
commit
44d6eb8570
|
@ -36,7 +36,7 @@ def check_token_format(strg, search=re.compile(r'[^a-zA-Z0-9_-]').search):
|
||||||
return not bool(search(strg))
|
return not bool(search(strg))
|
||||||
|
|
||||||
def verify_token(token):
|
def verify_token(token):
|
||||||
if len(token) != 55:
|
if len(token) != 41:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if not check_token_format(token):
|
if not check_token_format(token):
|
||||||
|
@ -47,23 +47,41 @@ def verify_token(token):
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def verify_user_role(role, token):
|
||||||
|
user_id = r_serv_db.hget('user:tokens', token)
|
||||||
|
if user_id:
|
||||||
|
if is_in_role(user_id, role):
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def is_in_role(user_id, role):
|
||||||
|
if r_serv_db.sismember('user_role:{}'.format(role), user_id):
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
# ============ DECORATOR ============
|
# ============ DECORATOR ============
|
||||||
|
|
||||||
def token_required(funct):
|
def token_required(user_role):
|
||||||
@wraps(funct)
|
def actual_decorator(funct):
|
||||||
def api_token(*args, **kwargs):
|
@wraps(funct)
|
||||||
data = authErrors()
|
def api_token(*args, **kwargs):
|
||||||
if data:
|
data = authErrors(user_role)
|
||||||
return Response(json.dumps(data[0], indent=2, sort_keys=True), mimetype='application/json'), data[1]
|
if data:
|
||||||
else:
|
return Response(json.dumps(data[0], indent=2, sort_keys=True), mimetype='application/json'), data[1]
|
||||||
return funct(*args, **kwargs)
|
else:
|
||||||
return api_token
|
return funct(*args, **kwargs)
|
||||||
|
return api_token
|
||||||
|
return actual_decorator
|
||||||
|
|
||||||
def get_auth_from_header():
|
def get_auth_from_header():
|
||||||
token = request.headers.get('Authorization').replace(' ', '') # remove space
|
token = request.headers.get('Authorization').replace(' ', '') # remove space
|
||||||
return token
|
return token
|
||||||
|
|
||||||
def authErrors():
|
def authErrors(user_role):
|
||||||
# Check auth
|
# Check auth
|
||||||
if not request.headers.get('Authorization'):
|
if not request.headers.get('Authorization'):
|
||||||
return ({'status': 'error', 'reason': 'Authentication needed'}, 401)
|
return ({'status': 'error', 'reason': 'Authentication needed'}, 401)
|
||||||
|
@ -76,6 +94,10 @@ def authErrors():
|
||||||
if verify_token(token):
|
if verify_token(token):
|
||||||
authenticated = True
|
authenticated = True
|
||||||
|
|
||||||
|
# check user role
|
||||||
|
if not verify_user_role(user_role, token):
|
||||||
|
data = ({'status': 'error', 'reason': 'Access Forbidden'}, 403)
|
||||||
|
|
||||||
if not authenticated:
|
if not authenticated:
|
||||||
data = ({'status': 'error', 'reason': 'Authentication failed'}, 401)
|
data = ({'status': 'error', 'reason': 'Authentication failed'}, 401)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@ -98,8 +120,8 @@ def one():
|
||||||
# def api():
|
# def api():
|
||||||
# return 'api doc'
|
# return 'api doc'
|
||||||
|
|
||||||
@restApi.route("api/items", methods=['POST'])
|
@restApi.route("api/items", methods=['GET', 'POST'])
|
||||||
@token_required
|
@token_required('admin')
|
||||||
def items():
|
def items():
|
||||||
item = request.args.get('id')
|
item = request.args.get('id')
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue