d4-core/server/web/blueprints/restApi.py

152 lines
4.2 KiB
Python

#!/usr/bin/env python3
# -*-coding:UTF-8 -*
'''
Flask functions and routes for the rest api
'''
import os
import re
import sys
import uuid
import json
import redis
import datetime
from flask import Flask, render_template, jsonify, request, Blueprint, redirect, url_for, Response
from flask_login import login_required
from functools import wraps
# ============ BLUEPRINT ============
restApi = Blueprint('restApi', __name__, template_folder='templates')
# ============ VARIABLES ============
host_redis_metadata = os.getenv('D4_REDIS_METADATA_HOST', "localhost")
port_redis_metadata = int(os.getenv('D4_REDIS_METADATA_PORT', 6380))
r_serv_db = redis.StrictRedis(
host=host_redis_metadata,
port=port_redis_metadata,
db=1,
decode_responses=True)
# ============ AUTH FUNCTIONS ============
def check_token_format(strg, search=re.compile(r'[^a-zA-Z0-9_-]').search):
return not bool(search(strg))
def verify_token(token):
if len(token) != 41:
return False
if not check_token_format(token):
return False
if r_serv_db.hexists('user:tokens', token):
return True
else:
return False
def get_user_from_token(token):
return r_serv_db.hget('user:tokens', token)
def verify_user_role(role, token):
user_id = get_user_from_token(token)
if user_id:
if is_in_role(user_id, role):
return True
else:
return False
else:
return False
def is_in_role(user_id, role):
if r_serv_db.sismember('user_role:{}'.format(role), user_id):
return True
else:
return False
# ============ DECORATOR ============
def token_required(user_role):
def actual_decorator(funct):
@wraps(funct)
def api_token(*args, **kwargs):
data = authErrors(user_role)
if data:
return Response(json.dumps(data[0], indent=2, sort_keys=True), mimetype='application/json'), data[1]
else:
return funct(*args, **kwargs)
return api_token
return actual_decorator
def get_auth_from_header():
token = request.headers.get('Authorization').replace(' ', '') # remove space
return token
def authErrors(user_role):
# Check auth
if not request.headers.get('Authorization'):
return ({'status': 'error', 'reason': 'Authentication needed'}, 401)
token = get_auth_from_header()
data = None
# verify token format
'''
# brute force protection
current_ip = request.remote_addr
login_failed_ip = r_cache.get('failed_login_ip_api:{}'.format(current_ip))
# brute force by ip
if login_failed_ip:
login_failed_ip = int(login_failed_ip)
if login_failed_ip >= 5:
return ({'status': 'error', 'reason': 'Max Connection Attempts reached, Please wait {}s'.format(r_cache.ttl('failed_login_ip_api:{}'.format(current_ip)))}, 401)
'''
try:
authenticated = False
if verify_token(token):
authenticated = True
# check user role
if not verify_user_role(user_role, token):
data = ({'status': 'error', 'reason': 'Access Forbidden'}, 403)
if not authenticated:
#r_cache.incr('failed_login_ip_api:{}'.format(current_ip))
#r_cache.expire('failed_login_ip_api:{}'.format(current_ip), 300)
data = ({'status': 'error', 'reason': 'Authentication failed'}, 401)
except Exception as e:
print(e)
data = ({'status': 'error', 'reason': 'Malformed Authentication String'}, 400)
if data:
return data
else:
return None
# ============ FUNCTIONS ============
def is_valid_uuid_v4(header_uuid):
try:
header_uuid=header_uuid.replace('-', '')
uuid_test = uuid.UUID(hex=header_uuid, version=4)
return uuid_test.hex == header_uuid
except:
return False
def one():
return 1
# ============= ROUTES ==============
@restApi.route("/api/v1/get/item", methods=['GET'])
@token_required('user')
def get_item_id():
data = request.get_json()
res = ({'test': 2}, 200)
return Response(json.dumps(res[0], indent=2, sort_keys=True), mimetype='application/json'), res[1]