mirror of https://github.com/CIRCL/AIL-framework
110 lines
2.6 KiB
Python
110 lines
2.6 KiB
Python
|
#!/usr/bin/env python3
|
||
|
# -*-coding:UTF-8 -*
|
||
|
|
||
|
'''
|
||
|
Flask functions and routes for the rest api
|
||
|
'''
|
||
|
|
||
|
import os
|
||
|
import re
|
||
|
import sys
|
||
|
import json
|
||
|
import redis
|
||
|
import datetime
|
||
|
|
||
|
from flask import Flask, render_template, jsonify, request, Blueprint, redirect, url_for, Response
|
||
|
from flask_login import login_required
|
||
|
|
||
|
from functools import wraps
|
||
|
|
||
|
# ============ VARIABLES ============
|
||
|
import Flask_config
|
||
|
|
||
|
app = Flask_config.app
|
||
|
cfg = Flask_config.cfg
|
||
|
baseUrl = Flask_config.baseUrl
|
||
|
r_cache = Flask_config.r_cache
|
||
|
r_serv_db = Flask_config.r_serv_db
|
||
|
r_serv_onion = Flask_config.r_serv_onion
|
||
|
r_serv_metadata = Flask_config.r_serv_metadata
|
||
|
|
||
|
restApi = Blueprint('restApi', __name__, template_folder='templates')
|
||
|
|
||
|
# ============ AUTH FUNCTIONS ============
|
||
|
|
||
|
def check_token_format(strg, search=re.compile(r'[^a-zA-Z0-9_-]').search):
|
||
|
return not bool(search(strg))
|
||
|
|
||
|
def verify_token(token):
|
||
|
if len(token) != 55:
|
||
|
return False
|
||
|
|
||
|
if not check_token_format(token):
|
||
|
return False
|
||
|
|
||
|
if r_serv_db.hexists('user:tokens', token):
|
||
|
return True
|
||
|
else:
|
||
|
return False
|
||
|
|
||
|
# ============ DECORATOR ============
|
||
|
|
||
|
def token_required(funct):
|
||
|
@wraps(funct)
|
||
|
def api_token(*args, **kwargs):
|
||
|
data = authErrors()
|
||
|
if data:
|
||
|
return Response(json.dumps(data[0], indent=2, sort_keys=True), mimetype='application/json'), data[1]
|
||
|
else:
|
||
|
return funct(*args, **kwargs)
|
||
|
return api_token
|
||
|
|
||
|
def get_auth_from_header():
|
||
|
token = request.headers.get('Authorization').replace(' ', '') # remove space
|
||
|
return token
|
||
|
|
||
|
def authErrors():
|
||
|
# Check auth
|
||
|
if not request.headers.get('Authorization'):
|
||
|
return ({'status': 'error', 'reason': 'Authentication needed'}, 401)
|
||
|
token = get_auth_from_header()
|
||
|
data = None
|
||
|
# verify token format
|
||
|
|
||
|
try:
|
||
|
authenticated = False
|
||
|
if verify_token(token):
|
||
|
authenticated = True
|
||
|
|
||
|
if not authenticated:
|
||
|
data = ({'status': 'error', 'reason': 'Authentication failed'}, 401)
|
||
|
except Exception as e:
|
||
|
print(e)
|
||
|
data = ({'status': 'error', 'reason': 'Malformed Authentication String'}, 400)
|
||
|
if data:
|
||
|
return data
|
||
|
else:
|
||
|
return None
|
||
|
|
||
|
# ============ FUNCTIONS ============
|
||
|
|
||
|
def one():
|
||
|
return 1
|
||
|
|
||
|
# ============= ROUTES ==============
|
||
|
|
||
|
@restApi.route("/api", methods=['GET'])
|
||
|
@login_required
|
||
|
def api():
|
||
|
return 'api doc'
|
||
|
|
||
|
@restApi.route("api/items", methods=['POST'])
|
||
|
@token_required
|
||
|
def items():
|
||
|
item = request.args.get('id')
|
||
|
|
||
|
return Response(json.dumps({'test': 2}), mimetype='application/json')
|
||
|
|
||
|
# ========= REGISTRATION =========
|
||
|
app.register_blueprint(restApi, url_prefix=baseUrl)
|