chg: [webiste] history

pull/654/head
David Cruciani 2024-02-07 15:53:43 +01:00
parent 4898900033
commit 8276650165
No known key found for this signature in database
GPG Key ID: 8690CDE1E3994B9B
11 changed files with 27 additions and 3356 deletions

3
.gitignore vendored
View File

@ -19,4 +19,5 @@ site*
venv*
#vscode
.vscode*
.vscode*
*.sqlite

View File

@ -1,591 +0,0 @@
import json
from flask import Blueprint, render_template, redirect, jsonify, request, flash
from .form import CaseForm, CaseEditForm, AddOrgsCase, RecurringForm
from flask_login import login_required, current_user
from . import case_core as CaseModel
from . import common_core as CommonModel
from . import task_core as TaskModel
from ..db_class.db import Task_Template, Case_Template
from ..decorators import editor_required
from ..utils.utils import form_to_dict, check_tag
case_blueprint = Blueprint(
'case',
__name__,
template_folder='templates',
static_folder='static'
)
from .task import task_blueprint
case_blueprint.register_blueprint(task_blueprint)
##########
# Render #
##########
@case_blueprint.route("/", methods=['GET', 'POST'])
@login_required
def index():
"""List all cases"""
return render_template("case/case_index.html")
@case_blueprint.route("/create_case", methods=['GET', 'POST'])
@login_required
def create_case():
"""Create a case"""
form = CaseForm()
form.template_select.choices = [(template.id, template.title) for template in Case_Template.query.all()]
form.template_select.choices.insert(0, (0," "))
form.tasks_templates.choices = [(template.id, template.title) for template in Task_Template.query.all()]
form.tasks_templates.choices.insert(0, (0," "))
if form.validate_on_submit():
tag_list = request.form.getlist("tags_select")
cluster_list = request.form.getlist("clusters_select")
if CommonModel.check_tag(tag_list):
if CommonModel.check_cluster(cluster_list):
form_dict = form_to_dict(form)
form_dict["tags"] = tag_list
form_dict["clusters"] = cluster_list
case = CaseModel.create_case(form_dict, current_user)
flash("Case created", "success")
return redirect(f"/case/{case.id}")
return render_template("case/create_case.html", form=form)
return render_template("case/create_case.html", form=form)
return render_template("case/create_case.html", form=form)
@case_blueprint.route("/<cid>", methods=['GET', 'POST'])
@login_required
def view(cid):
"""View a case"""
case = CommonModel.get_case(cid)
if case:
present_in_case = CaseModel.get_present_in_case(cid, current_user)
return render_template("case/case_view.html", case=case.to_json(), present_in_case=present_in_case)
return render_template("404.html")
@case_blueprint.route("/edit/<cid>", methods=['GET','POST'])
@login_required
@editor_required
def edit_case(cid):
"""Edit the case"""
if CommonModel.get_case(cid):
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
form = CaseEditForm()
if form.validate_on_submit():
tag_list = request.form.getlist("tags_select")
cluster_list = request.form.getlist("clusters_select")
if CommonModel.check_tag(tag_list):
if CommonModel.check_cluster(cluster_list):
form_dict = form_to_dict(form)
form_dict["tags"] = tag_list
form_dict["clusters"] = cluster_list
CaseModel.edit_case(form_dict, cid, current_user)
flash("Case edited", "success")
return redirect(f"/case/{cid}")
return render_template("case/edit_case.html", form=form)
return render_template("case/edit_case.html", form=form)
else:
case_modif = CommonModel.get_case(cid)
form.description.data = case_modif.description
form.title.data = case_modif.title
form.deadline_date.data = case_modif.deadline
form.deadline_time.data = case_modif.deadline
return render_template("case/edit_case.html", form=form)
else:
flash("Access denied", "error")
else:
return render_template("404.html")
return redirect(f"/case/{id}")
@case_blueprint.route("/<cid>/add_orgs", methods=['GET', 'POST'])
@login_required
@editor_required
def add_orgs(cid):
"""Add orgs to the case"""
if CommonModel.get_case(cid):
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
form = AddOrgsCase()
case_org = CommonModel.get_org_in_case_by_case_id(cid)
org_list = list()
for org in CommonModel.get_org_order_by_name():
if case_org:
flag = False
for c_o in case_org:
if c_o.org_id == org.id:
flag = True
if not flag:
org_list.append((org.id, f"{org.name}"))
else:
org_list.append((org.id, f"{org.name}"))
form.org_id.choices = org_list
form.case_id.data = cid
if form.validate_on_submit():
form_dict = form_to_dict(form)
CaseModel.add_orgs_case(form_dict, cid, current_user)
flash("Orgs added", "success")
return redirect(f"/case/{cid}")
return render_template("case/add_orgs.html", form=form)
else:
flash("Access denied", "error")
else:
return render_template("404.html")
return redirect(f"/case/{cid}")
@case_blueprint.route("/<cid>/recurring", methods=['GET', 'POST'])
@login_required
@editor_required
def recurring(cid):
"""Recurring form"""
if CommonModel.get_case(cid):
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
form = RecurringForm()
form.case_id.data = cid
# List orgs and users in and verify if all users of an org are currently notify
orgs_in_case = CommonModel.get_orgs_in_case(cid)
orgs_to_return = list()
for org in orgs_in_case:
loc = org.to_json()
loc["users"] = list()
cp_checked_user = 0
cp_users = 0
for user in org.users:
cp_users += 1
loc_user = user.to_json()
if CommonModel.get_recu_notif_user(cid, user.id):
loc_user["checked"] = True
cp_checked_user += 1
else:
loc_user["checked"] = False
loc["users"].append(loc_user)
# if all users in an org are notify, then check the org checkbox
if cp_checked_user == cp_users:
loc["checked"] = True
else:
loc["checked"] = False
orgs_to_return.append(loc)
if form.validate_on_submit():
form_dict = form_to_dict(form)
if not CaseModel.change_recurring(form_dict, cid, current_user):
flash("Recurring empty", "error")
return redirect(f"/case/{cid}/recurring")
if not form_dict["remove"]:
CaseModel.notify_user_recurring(request.form.to_dict(), cid, orgs_in_case)
flash("Recurring set", "success")
return redirect(f"/case/{cid}")
return render_template("case/case_recurring.html", form=form, orgs=orgs_to_return)
flash("Action not allowed", "warning")
return redirect(f"/case/{cid}")
return render_template("404.html")
############
# Function #
# Route #
############
@case_blueprint.route("/get_cases_page", methods=['GET'])
@login_required
def get_cases():
"""Return all cases"""
page = request.args.get('page', 1, type=int)
tags = request.args.get('tags')
taxonomies = request.args.get('taxonomies')
or_and = request.args.get("or_and")
cases = CaseModel.sort_by_status(page, tags, taxonomies, or_and, completed=False)
role = CommonModel.get_role(current_user).to_json()
loc = CaseModel.regroup_case_info(cases, current_user)
return jsonify({"cases": loc["cases"], "role": role, "nb_pages": cases.pages}), 200
@case_blueprint.route("/search", methods=['GET'])
@login_required
def search():
"""Return cases matching search terms"""
text_search = request.args.get("text")
cases = CommonModel.search(text_search)
if cases:
return {"cases": [case.to_json() for case in cases]}, 200
return {"message": "No case", 'toast_class': "danger-subtle"}, 404
@case_blueprint.route("/<cid>/delete", methods=['GET'])
@login_required
@editor_required
def delete(cid):
"""Delete the case"""
if CommonModel.get_case(cid):
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
if CaseModel.delete_case(cid, current_user):
return {"message": "Case deleted", "toast_class": "success-subtle"}, 200
else:
return {"message": "Error case deleted", 'toast_class': "danger-subtle"}, 400
return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401
return {"message": "Case no found", 'toast_class': "danger-subtle"}, 404
@case_blueprint.route("/<cid>/get_case_info", methods=['GET'])
@login_required
def get_case_info(cid):
"""Return all info of the case"""
case = CommonModel.get_case(cid)
if case:
tasks = TaskModel.sort_by_status_task_core(case, current_user, completed=False)
o_in_c = CommonModel.get_orgs_in_case(case.id)
orgs_in_case = [o_c.to_json() for o_c in o_in_c]
permission = CommonModel.get_role(current_user).to_json()
present_in_case = CaseModel.get_present_in_case(cid, current_user)
return jsonify({"case": case.to_json(), "tasks": tasks, "orgs_in_case": orgs_in_case, "permission": permission, "present_in_case": present_in_case, "current_user": current_user.to_json()}), 200
return {"message": "Case not found", 'toast_class': "danger-subtle"}, 404
@case_blueprint.route("/<cid>/complete_case", methods=['GET'])
@login_required
@editor_required
def complete_case(cid):
"""Complete the case"""
if CommonModel.get_case(cid):
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
if CaseModel.complete_case(cid, current_user):
flash("Case Completed")
if request.args.get('revived', 1) == "true":
return {"message": "Case Revived", "toast_class": "success-subtle"}, 200
return {"message": "Case completed", "toast_class": "success-subtle"}, 200
else:
if request.args.get('revived', 1) == "true":
return {"message": "Error case revived", 'toast_class': "danger-subtle"}, 400
return {"message": "Error case completed", 'toast_class': "danger-subtle"}, 400
return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401
return {"message": "Case not found", 'toast_class': "danger-subtle"}, 404
@case_blueprint.route("/<cid>/remove_org/<oid>", methods=['GET'])
@login_required
@editor_required
def remove_org_case(cid, oid):
"""Remove an org to the case"""
if CommonModel.get_case(cid):
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
if CaseModel.remove_org_case(cid, oid, current_user):
return {"message": "Org removed from case", "toast_class": "success-subtle"}, 200
return {"message": "Error removing org from case", "toast_class": "danger-subtle"}, 400
return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401
return {"message": "Case not found", 'toast_class': "danger-subtle"}, 404
@case_blueprint.route("/<cid>/change_status", methods=['POST'])
@login_required
@editor_required
def change_status(cid):
"""Change the status of the case"""
status = request.json["status"]
case = CommonModel.get_case(cid)
if CommonModel.get_case(cid):
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
CaseModel.change_status_core(status, case, current_user)
return {"message": "Status changed", "toast_class": "success-subtle"}, 200
return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401
return {"message": "Case not found", 'toast_class': "danger-subtle"}, 404
@case_blueprint.route("/get_status", methods=['GET'])
@login_required
def get_status():
"""Get status"""
status = CommonModel.get_all_status()
status_list = list()
for s in status:
status_list.append(s.to_json())
return jsonify({"status": status_list}), 200
@case_blueprint.route("/sort_by_ongoing", methods=['GET'])
@login_required
def sort_by_ongoing():
"""Sort Case by living one"""
page = request.args.get('page', 1, type=int)
tags = request.args.get('tags')
taxonomies = request.args.get('taxonomies')
or_and_taxo = request.args.get("or_and_taxo")
galaxies = request.args.get('galaxies')
clusters = request.args.get('clusters')
or_and_galaxies = request.args.get("or_and_galaxies")
cases_list = CaseModel.sort_by_status(page, taxonomies, galaxies, tags, clusters, or_and_taxo, or_and_galaxies, completed=False)
return CaseModel.regroup_case_info(cases_list, current_user)
@case_blueprint.route("/sort_by_finished", methods=['GET'])
@login_required
def sort_by_finished():
"""Sort Case by finished one"""
page = request.args.get('page', 1, type=int)
tags = request.args.get('tags')
taxonomies = request.args.get('taxonomies')
or_and_taxo = request.args.get("or_and_taxo")
galaxies = request.args.get('galaxies')
clusters = request.args.get('clusters')
or_and_galaxies = request.args.get("or_and_galaxies")
cases_list = CaseModel.sort_by_status(page, taxonomies, galaxies, tags, clusters, or_and_taxo, or_and_galaxies, completed=True)
return CaseModel.regroup_case_info(cases_list, current_user)
@case_blueprint.route("/ongoing", methods=['GET'])
@login_required
def ongoing_sort_by_filter():
"""Sort by filter for living case"""
page = request.args.get('page', 1, type=int)
filter = request.args.get('filter')
tags = request.args.get('tags')
taxonomies = request.args.get('taxonomies')
or_and_taxo = request.args.get("or_and_taxo")
galaxies = request.args.get('galaxies')
clusters = request.args.get('clusters')
or_and_galaxies = request.args.get("or_and_galaxies")
if filter:
cases_list, nb_pages = CaseModel.sort_by_filter(filter, page, taxonomies, galaxies, tags, clusters, or_and_taxo, or_and_galaxies, completed=False)
return CaseModel.regroup_case_info(cases_list, current_user, nb_pages)
return {"message": "No filter pass"}
@case_blueprint.route("/finished", methods=['GET'])
@login_required
def finished_sort_by_filter():
"""Sort by filter for finished task"""
page = request.args.get('page', 1, type=int)
filter = request.args.get('filter')
tags = request.args.get('tags')
taxonomies = request.args.get('taxonomies')
or_and_taxo = request.args.get("or_and_taxo")
galaxies = request.args.get('galaxies')
clusters = request.args.get('clusters')
or_and_galaxies = request.args.get("or_and_galaxies")
if filter:
cases_list, nb_pages = CaseModel.sort_by_filter(filter, page, taxonomies, galaxies, tags, clusters, or_and_taxo, or_and_galaxies, completed=True)
return CaseModel.regroup_case_info(cases_list, current_user, nb_pages)
return {"message": "No filter pass"}
@case_blueprint.route("/<cid>/get_all_users", methods=['GET'])
@login_required
def get_all_users(cid):
"""Get all user in case"""
case = CommonModel.get_case(cid)
if case:
users_list = list()
orgs = CommonModel.get_all_users_core(case)
for org in orgs:
for user in org.users:
if not user == current_user:
users_list.append(user.to_json())
return {"users_list": users_list}
return {"message": "Case not found"}, 404
@case_blueprint.route("/<cid>/get_assigned_users/<tid>", methods=['GET'])
@login_required
def get_assigned_users(cid, tid):
"""Get assigned users to the task"""
if CommonModel.get_case(cid):
users, _ = TaskModel.get_users_assign_task(tid, current_user)
return users
return {"message": "Case not found", 'toast_class': "danger-subtle"}, 404
@case_blueprint.route("/<cid>/download", methods=['GET'])
@login_required
def download_case(cid):
"""Download a case"""
case = CommonModel.get_case(cid)
if case:
task_list = list()
for task in case.tasks:
task_list.append(task.download())
return_dict = case.download()
return_dict["tasks"] = task_list
return jsonify(return_dict), 200, {'Content-Disposition': f'attachment; filename=case_{case.title}.json'}
return {"message": "Case not found", 'toast_class': "danger-subtle"}, 404
@case_blueprint.route("/<cid>/fork", methods=['POST'])
@login_required
def fork_case(cid):
"""Assign current user to the task"""
if CommonModel.get_case(cid):
if "case_title_fork" in request.json:
case_title_fork = request.json["case_title_fork"]
new_case = CaseModel.fork_case_core(cid, case_title_fork, current_user)
if type(new_case) == dict:
return new_case
return {"new_case_id": new_case.id}, 201
return {"message": "'case_title_fork' is missing", 'toast_class': "danger-subtle"}, 400
return {"message": "Case not found", 'toast_class': "danger-subtle"}, 404
@case_blueprint.route("/get_all_case_title", methods=['GET'])
@login_required
def get_all_case_title():
data_dict = dict(request.args)
if CommonModel.get_case_by_title(data_dict["title"]):
flag = True
else:
flag = False
return {"title_already_exist": flag}
@case_blueprint.route("/<cid>/create_template", methods=['POST'])
@login_required
@editor_required
def create_template(cid):
if CommonModel.get_case(cid):
if "case_title_template" in request.json:
case_title_template = request.json["case_title_template"]
new_template = CaseModel.create_template_from_case(cid, case_title_template)
if type(new_template) == dict:
return new_template
return {"template_id": new_template.id}, 201
return {"message": "'case_title_template' is missing", 'toast_class': "danger-subtle"}, 400
return {"message": "Case not found", 'toast_class': "danger-subtle"}, 404
@case_blueprint.route("/get_all_case_template_title", methods=['GET'])
@login_required
def get_all_case_template_title():
data_dict = dict(request.args)
if CommonModel.get_case_template_by_title(data_dict["title"]):
flag = True
else:
flag = False
return {"title_already_exist": flag}
@case_blueprint.route("/history/<cid>", methods=['GET'])
@login_required
def history(cid):
case = CommonModel.get_case(cid)
if case:
history = CommonModel.get_history(case.uuid)
if history:
return {"history": history}
return {"history": None}
return {"message": "Case Not found", 'toast_class': "danger-subtle"}, 404
@case_blueprint.route("/get_taxonomies", methods=['GET'])
@login_required
def get_taxonomies():
return {"taxonomies": CommonModel.get_taxonomies()}, 200
@case_blueprint.route("/get_tags", methods=['GET'])
@login_required
def get_tags():
data_dict = dict(request.args)
if "taxonomies" in data_dict:
taxos = json.loads(data_dict["taxonomies"])
return {"tags": CommonModel.get_tags(taxos)}, 200
return {"message": "'taxonomies' is missing", 'toast_class': "warning-subtle"}, 400
@case_blueprint.route("/get_taxonomies_case/<cid>", methods=['GET'])
@login_required
def get_taxonomies_case(cid):
case = CommonModel.get_case(cid)
if case:
tags = CommonModel.get_case_tags(case.id)
taxonomies = []
if tags:
taxonomies = [tag.split(":")[0] for tag in tags]
return {"tags": tags, "taxonomies": taxonomies}
return {"message": "Case Not found", 'toast_class': "danger-subtle"}, 404
@case_blueprint.route("/get_galaxies", methods=['GET'])
@login_required
def get_galaxies():
return {"galaxies": CommonModel.get_galaxies()}, 200
@case_blueprint.route("/get_clusters", methods=['GET'])
@login_required
def get_clusters():
if "galaxies" in request.args:
galaxies = request.args.get("galaxies")
galaxies = json.loads(galaxies)
return {"clusters": CommonModel.get_clusters_galaxy(galaxies)}, 200
return {"message": "'galaxies' is missing", 'toast_class': "warning-subtle"}, 400
@case_blueprint.route("/get_galaxies_case/<cid>", methods=['GET'])
@login_required
def get_galaxies_case(cid):
case = CommonModel.get_case(cid)
if case:
clusters = CommonModel.get_case_clusters(case.id)
galaxies = []
if clusters:
for cluster in clusters:
loc_g = CommonModel.get_galaxy(cluster.galaxy_id)
if not loc_g.name in galaxies:
galaxies.append(loc_g.name)
index = clusters.index(cluster)
clusters[index] = cluster.tag
return {"clusters": clusters, "galaxies": galaxies}
return {"message": "Case Not found", 'toast_class': "danger-subtle"}, 404
@case_blueprint.route("/get_modules", methods=['GET'])
@login_required
def get_modules():
return {"modules": CaseModel.get_modules()}, 200
# return {"message": "'galaxies' is missing", 'toast_class': "warning-subtle"}, 400
@case_blueprint.route("/get_instance_module", methods=['GET'])
@login_required
def get_instance_module():
if "module" in request.args:
module = request.args.get("module")
return {"instances": CaseModel.get_instance_module_core(module, current_user.id)}, 200

View File

@ -1,602 +0,0 @@
from flask import Blueprint, request
from . import case_core as CaseModel
from . import common_core as CommonModel
from . import task_core as TaskModel
from . import case_core_api as CaseModelApi
from flask_restx import Api, Resource
from ..decorators import api_required, editor_required
api_case_blueprint = Blueprint('api_case', __name__)
api = Api(api_case_blueprint,
title='Flowintel-cm API',
description='API to manage a case management instance.',
version='0.1',
default='GenericAPI',
default_label='Generic Flowintel-cm API',
doc='/doc/'
)
@api.route('/all')
@api.doc(description='Get all cases')
class GetCases(Resource):
method_decorators = [api_required]
def get(self):
cases = CommonModel.get_all_cases()
return {"cases": [case.to_json() for case in cases]}, 200
@api.route('/not_completed')
@api.doc(description='Get all not completed cases')
class GetCases_not_completed(Resource):
method_decorators = [api_required]
def get(self):
cases = CommonModel.get_case_by_completed(False)
return {"cases": [case.to_json() for case in cases]}, 200
@api.route('/completed')
@api.doc(description='Get all completed cases')
class GetCases_not_completed(Resource):
method_decorators = [api_required]
def get(self):
cases = CommonModel.get_case_by_completed(True)
return {"cases": [case.to_json() for case in cases]}, 200
@api.route('/<cid>')
@api.doc(description='Get a case', params={'cid': 'id of a case'})
class GetCase(Resource):
method_decorators = [api_required]
def get(self, cid):
case = CommonModel.get_case(cid)
if case:
case_json = case.to_json()
orgs = CommonModel.get_orgs_in_case(cid)
case_json["orgs"] = list()
for org in orgs:
case_json["orgs"].append({"id": org.id, "uuid": org.uuid, "name": org.name})
return case_json, 200
return {"message": "Case not found"}, 404
@api.route('/title', methods=["POST"])
@api.doc(description='Get a case by title')
class GetCaseTitle(Resource):
method_decorators = [api_required]
@api.doc(params={"title": "Title of a case"})
def post(self):
if "title" in request.json:
case = CommonModel.get_case_by_title(request.json["title"])
if case:
case_json = case.to_json()
orgs = CommonModel.get_orgs_in_case(case.id)
case_json["orgs"] = [{"id": org.id, "uuid": org.uuid, "name": org.name} for org in orgs]
return case_json, 200
return {"message": "Case not found"}, 404
return {"message": "Need to pass a title"}, 404
@api.route('/<cid>/complete')
@api.doc(description='Complete a case', params={'cid': 'id of a case'})
class CompleteCase(Resource):
method_decorators = [editor_required, api_required]
def get(self, cid):
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
case = CommonModel.get_case(cid)
if case:
if CaseModel.complete_case(cid, current_user):
return {"message": f"Case {cid} completed"}, 200
return {"message": f"Error case {cid} completed"}, 400
return {"message": "Case not found"}, 404
return {"message": "Permission denied"}, 403
@api.route('/<cid>/create_template', methods=["POST"])
@api.doc(description='Create a template form case', params={'cid': 'id of a case'})
class CreateTemplate(Resource):
method_decorators = [editor_required, api_required]
@api.doc(params={"title_template": "Title for the template that will be create"})
def post(self, cid):
if "title_template" in request.json:
if CommonModel.get_case(cid):
new_template = CaseModel.create_template_from_case(cid, request.json["title_template"])
if type(new_template) == dict:
return new_template
return {"template_id": new_template.id}, 201
return {"message": "Case not found"}, 404
return {"message": "'title_template' is missing"}, 400
@api.route('/<cid>/recurring', methods=['POST'])
@api.doc(description='Set a case recurring')
class RecurringCase(Resource):
method_decorators = [editor_required, api_required]
@api.doc(params={
"once": "Date(%Y-%m-%d)",
"daily": "Boolean",
"weekly": "Date(%Y-%m-%d). Start date.",
"monthly": "Date(%Y-%m-%d). Start date.",
"remove": "Boolean"
})
def post(self, cid):
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
if request.json:
verif_dict = CaseModelApi.verif_set_recurring(request.json)
if "message" not in verif_dict:
CaseModel.change_recurring(verif_dict, cid, current_user)
return {"message": "Recurring changed"}, 200
return verif_dict
return {"message": "Please give data"}, 400
return {"message": "Permission denied"}, 403
@api.route('/<cid>/tasks')
@api.doc(description='Get all tasks for a case', params={'cid': 'id of a case'})
class GetTasks(Resource):
method_decorators = [api_required]
def get(self, cid):
case = CommonModel.get_case(cid)
if case:
tasks = list()
for task in case.tasks:
tasks.append(task.to_json())
return tasks, 200
return {"message": "Case not found"}, 404
@api.route('/<cid>/task/<tid>')
@api.doc(description='Get a specific task for a case', params={"cid": "id of a case", "tid": "id of a task"})
class GetTask(Resource):
method_decorators = [api_required]
def get(self, cid, tid):
task = CommonModel.get_task(tid)
if task:
if int(cid) == task.case_id:
loc = dict()
loc["users_assign"], loc["is_current_user_assign"] = TaskModel.get_users_assign_task(task.id, CaseModelApi.get_user_api(request.headers["X-API-KEY"]))
loc["task"] = task.to_json()
return loc, 200
else:
return {"message": "Task not in this case"}, 404
return {"message": "Task not found"}, 404
@api.route('/<cid>/delete')
@api.doc(description='Delete a case', params={'cid': 'id of a case'})
class DeleteCase(Resource):
method_decorators = [editor_required, api_required]
def get(self, cid):
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
if CaseModel.delete_case(cid, current_user):
return {"message": "Case deleted"}, 200
return {"message": "Error case deleted"}, 400
return {"message": "Permission denied"}, 403
@api.route('/<cid>/task/<tid>/delete')
@api.doc(description='Delete a specific task in a case', params={'cid': 'id of a case', "tid": "id of a task"})
class DeleteTask(Resource):
method_decorators = [editor_required, api_required]
def get(self, cid, tid):
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
task = CommonModel.get_task(tid)
if task:
if int(cid) == task.case_id:
if TaskModel.delete_task(tid, current_user):
return {"message": "Task deleted"}, 200
else:
return {"message": "Error task deleted"}, 400
else:
return {"message": "Task not in this case"}, 404
return {"message": "Task not found"}, 404
return {"message": "Permission denied"}, 403
@api.route('/create', methods=['POST'])
@api.doc(description='Create a case')
class CreateCase(Resource):
method_decorators = [editor_required, api_required]
@api.doc(params={
"title": "Required. Title for a case",
"description": "Description of a case",
"deadline_date": "Date(%Y-%m-%d)",
"deadline_time": "Time(%H-%M)"
})
def post(self):
user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if request.json:
verif_dict = CaseModelApi.verif_create_case_task(request.json, True)
if "message" not in verif_dict:
case = CaseModel.create_case(verif_dict, user)
return {"message": f"Case created, id: {case.id}"}, 201
return verif_dict, 400
return {"message": "Please give data"}, 400
@api.route('/<cid>/create_task', methods=['POST'])
@api.doc(description='Create a new task to a case', params={'cid': 'id of a case'})
class CreateTask(Resource):
method_decorators = [editor_required, api_required]
@api.doc(params={
"title": "Required. Title for a task",
"description": "Description of a task",
"url": "Link to a tool or a ressource",
"deadline_date": "Date(%Y-%m-%d)",
"deadline_time": "Time(%H-%M)"
})
def post(self, cid):
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
if request.json:
verif_dict = CaseModelApi.verif_create_case_task(request.json, False)
if "message" not in verif_dict:
task = TaskModel.create_task(verif_dict, cid, current_user)
return {"message": f"Task created for case id: {cid}"}, 201
return verif_dict, 400
return {"message": "Please give data"}, 400
return {"message": "Permission denied"}, 403
@api.route('/<id>/edit', methods=['POST'])
@api.doc(description='Edit a case', params={'id': 'id of a case'})
class EditCase(Resource):
method_decorators = [editor_required, api_required]
@api.doc(params={"title": "Title for a case", "description": "Description of a case", "deadline_date": "Date(%Y-%m-%d)", "deadline_time": "Time(%H-%M)"})
def post(self, id):
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(id, current_user) or current_user.is_admin():
if request.json:
verif_dict = CaseModelApi.verif_edit_case(request.json, id)
if "message" not in verif_dict:
CaseModel.edit_case(verif_dict, id, current_user)
return {"message": f"Case {id} edited"}, 200
return verif_dict, 400
return {"message": "Please give data"}, 400
return {"message": "Permission denied"}, 403
@api.route('/<cid>/task/<tid>/edit', methods=['POST'])
@api.doc(description='Edit a task in a case', params={'cid': 'id of a case', "tid": "id of a task"})
class EditTake(Resource):
method_decorators = [editor_required, api_required]
@api.doc(params={"title": "Title for a case", "description": "Description of a case", "deadline_date": "Date(%Y-%m-%d)", "deadline_time": "Time(%H-%M)"})
def post(self, cid, tid):
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
if request.json:
task = CommonModel.get_task(tid)
if task:
if int(cid) == task.case_id:
verif_dict = CaseModelApi.verif_edit_task(request.json, tid)
if "message" not in verif_dict:
TaskModel.edit_task_core(verif_dict, tid, current_user)
return {"message": f"Task {tid} edited"}, 200
return verif_dict, 400
else:
return {"message": "Task not in this case"}, 404
else:
return {"message": "Task not found"}, 404
return {"message": "Please give data"}, 400
return {"message": "Permission denied"}, 403
@api.route('/<cid>/task/<tid>/complete')
@api.doc(description='Complete a task in a case', params={'cid': 'id of a case', "tid": "id of a task"})
class CompleteTake(Resource):
method_decorators = [editor_required, api_required]
def get(self, cid, tid):
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
task = CommonModel.get_task(tid)
if task:
if int(cid) == task.case_id:
if TaskModel.complete_task(tid, current_user):
return {"message": f"Task {tid} completed"}, 200
return {"message": f"Error task {tid} completed"}, 400
else:
return {"message": "Task not in this case"}, 404
return {"message": "Task not found"}, 404
return {"message": "Permission denied"}, 403
@api.route('/<cid>/task/<tid>/get_note')
@api.doc(description='Get note of a task in a case', params={'cid': 'id of a case', "tid": "id of a task"})
class GetNoteTask(Resource):
method_decorators = [api_required]
def get(self, cid, tid):
task = CommonModel.get_task(tid)
if task:
if int(cid) == task.case_id:
return {"note": task.notes}, 200
else:
return {"message": "Task not in this case"}, 404
return {"message": "Task not found"}, 404
@api.route('/<cid>/task/<tid>/modif_note', methods=['POST'])
@api.doc(description='Edit note of a task in a case', params={'cid': 'id of a case', "tid": "id of a task"})
class ModifNoteTask(Resource):
method_decorators = [editor_required, api_required]
@api.doc(params={"note": "note to create or modify"})
def post(self, cid, tid):
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
if "note" in request.json:
task = CommonModel.get_task(tid)
if task:
if int(cid) == task.case_id:
if TaskModel.modif_note_core(tid, current_user, request.json["note"]):
return {"message": f"Note for task {tid} edited"}, 200
return {"message": f"Error Note for task {tid} edited"}, 400
else:
return {"message": "Task not in this case"}, 404
return {"message": "Task not found"}, 404
return {"message": "Key 'note' not found"}, 400
return {"message": "Permission denied"}, 403
@api.route('/<cid>/add_org', methods=['POST'])
@api.doc(description='Add an org to the case', params={'cid': 'id of a case'})
class AddOrgCase(Resource):
method_decorators = [editor_required, api_required]
@api.doc(params={"name": "Name of the organisation", "oid": "id of the organisation"})
def post(self, cid):
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
if "name" in request.json:
org = CommonModel.get_org_by_name(request.json["name"])
elif "oid" in request.json:
org = CommonModel.get_org(request.json["oid"])
else:
return {"message": "Required an id or a name of an Org"}, 400
if org:
if not CommonModel.get_org_in_case(org.id, cid):
if CaseModel.add_orgs_case({"org_id": [org.id]}, cid, current_user):
return {"message": f"Org added to case {cid}"}, 200
return {"message": f"Error Org added to case {cid}"}, 400
return {"message": "Org already in case"}, 400
return {"message": "Org not found"}, 404
return {"message": "Permission denied"}, 403
@api.route('/<cid>/remove_org/<oid>', methods=['GET'])
@api.doc(description='Add an org to the case', params={'cid': 'id of a case', "oid": "id of an org"})
class RemoveOrgCase(Resource):
method_decorators = [editor_required, api_required]
def get(self, cid, oid):
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
org = CommonModel.get_org(oid)
if org:
if CommonModel.get_org_in_case(org.id, cid):
if CaseModel.remove_org_case(cid, org.id, current_user):
return {"message": f"Org deleted from case {cid}"}, 200
return {"message": f"Error Org deleted from case {cid}"}, 400
return {"message": "Org not in case"}, 404
return {"message": "Org not found"}, 404
return {"message": "Permission denied"}, 403
@api.route('/<cid>/take_task/<tid>', methods=['GET'])
@api.doc(description='Assign current user to the task', params={'cid': 'id of a case', "tid": "id of a task"})
class AssignTask(Resource):
method_decorators = [editor_required, api_required]
def get(self, cid, tid):
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
task = CommonModel.get_task(tid)
if task:
if int(cid) == task.case_id:
if TaskModel.assign_task(tid, user=current_user, current_user=current_user, flag_current_user=True):
return {"message": f"Task Take"}, 200
return {"message": f"Error Task Take"}, 400
return {"message": "Task not in this case"}, 404
return {"message": "Task not found"}, 404
return {"message": "Permission denied"}, 403
@api.route('/<cid>/remove_assignment/<tid>', methods=['GET'])
@api.doc(description='Remove assigment of current user to the task', params={'cid': 'id of a case', "tid": "id of a task"})
class RemoveOrgCase(Resource):
method_decorators = [editor_required, api_required]
def get(self, cid, tid):
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
task = CommonModel.get_task(tid)
if task:
if int(cid) == task.case_id:
if TaskModel.remove_assign_task(tid, user=current_user, current_user=current_user, flag_current_user=True):
return {"message": f"Removed from assignment"}, 200
return {"message": f"Error Removed from assignment"}, 400
return {"message": "Task not in this case"}, 404
return {"message": "Task not found"}, 404
return {"message": "Permission denied"}, 403
@api.route('/<cid>/get_all_users', methods=['GET'])
@api.doc(description='Get list of user that can be assign', params={'cid': 'id of a case'})
class GetAllUsers(Resource):
method_decorators = [api_required]
def get(self, cid):
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
case = CommonModel.get_case(cid)
if case:
users_list = list()
for org in CommonModel.get_all_users_core(case):
for user in org.users:
if not user == current_user:
users_list.append(user.to_json())
return {"users": users_list}, 200
return {"message": "Case not found"}, 404
@api.route('/<cid>/task/<tid>/assign_users', methods=['POST'])
@api.doc(description='Assign users to a task', params={'cid': 'id of a case', "tid": "id of a task"})
class AssignUser(Resource):
method_decorators = [editor_required, api_required]
@api.doc(params={"users_id": "List of user id"})
def post(self, cid, tid):
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
task = CommonModel.get_task(tid)
if task:
if int(cid) == task.case_id:
users_list = request.json["users_id"]
for user in users_list:
TaskModel.assign_task(tid, user=user, current_user=current_user, flag_current_user=False)
return {"message": "Users Assigned"}, 200
return {"message": "Task not in this case"}, 404
return {"message": "Task not found"}, 404
return {"message": "Permission denied"}, 403
@api.route('/<cid>/task/<tid>/remove_assign_user', methods=['POST'])
@api.doc(description='Remove an assign user to a task', params={'cid': 'id of a case', "tid": "id of a task"})
class AssignUser(Resource):
method_decorators = [editor_required, api_required]
@api.doc(params={"user_id": "Id of a user"})
def post(self, cid, tid):
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
task = CommonModel.get_task(tid)
if task:
if int(cid) == task.case_id:
user_id = request.json["user_id"]
if TaskModel.remove_assign_task(tid, user=user_id, current_user=current_user, flag_current_user=False):
return {"message": "User Removed from assignment"}, 200
return {"message": "Task not in this case"}, 404
return {"message": "Task not found"}, 404
return {"message": "Permission denied"}, 403
@api.route('/<cid>/task/<tid>/change_status', methods=['POST'])
@api.doc(description='Change status of a task', params={'cid': 'id of a case', "tid": "id of a task"})
class ChangeStatus(Resource):
method_decorators = [editor_required, api_required]
@api.doc(params={"status_id": "Id of the new status"})
def post(self, cid, tid):
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
task = CommonModel.get_task(tid)
if task:
if int(cid) == task.case_id:
if TaskModel.change_task_status(request.json["status_id"], task, current_user):
return {"message": "Status changed"}, 200
return {"message": "Task not in this case"}, 404
return {"message": "Task not found"}, 404
return {"message": "Permission denied"}, 403
@api.route('/list_status', methods=['GET'])
@api.doc(description='List all status')
class ChangeStatus(Resource):
method_decorators = [api_required]
def get(self):
return [status.to_json() for status in CommonModel.get_all_status()], 200
@api.route('/<cid>/history', methods=['GET'])
@api.doc(description='Get history of a case', params={'cid': 'id of a case'})
class ChangeStatus(Resource):
method_decorators = [api_required]
def get(self, cid):
case = CommonModel.get_case(cid)
if case:
history = CommonModel.get_history(case.uuid)
if history:
return {"history": history}
return {"history": None}
return {"message": "Case Not found"}, 404
@api.route('/<cid>/task/<tid>/files')
@api.doc(description='Get list of files', params={"cid": "id of a case", "tid": "id of a task"})
class DownloadFile(Resource):
method_decorators = [api_required]
def get(self, cid, tid):
case = CommonModel.get_case(cid)
if case:
task = CommonModel.get_task(tid)
if task:
file_list = [file.to_json() for file in task.files]
return {"files": file_list}, 200
return {"message": "Task Not found"}, 404
return {"message": "Case Not found"}, 404
@api.route('/<cid>/task/<tid>/upload_file')
@api.doc(description='Upload a file')
class UploadFile(Resource):
method_decorators = [api_required]
@api.doc(params={})
def post(self, cid, tid):
case = CommonModel.get_case(cid)
if case:
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
task = CommonModel.get_task(tid)
if task:
if TaskModel.add_file_core(task, request.files, current_user):
return {"message": "File added"}, 200
return {"message": "Task Not found"}, 404
return {"message": "Permission denied"}, 403
return {"message": "Case Not found"}, 404
@api.route('/<cid>/task/<tid>/download_file/<fid>')
@api.doc(description='Download a file', params={"cid": "id of a case", "tid": "id of a task", "fid": "id of a file"})
class DownloadFile(Resource):
method_decorators = [api_required]
def get(self, cid, tid, fid):
case = CommonModel.get_case(cid)
if case:
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
task = CommonModel.get_task(tid)
if task:
file = CommonModel.get_file(fid)
if file and file in task.files:
return TaskModel.download_file(file)
return {"message": "Task Not found"}, 404
return {"message": "Permission denied"}, 403
return {"message": "Case Not found"}, 404
@api.route('/<cid>/task/<tid>/delete_file/<fid>')
@api.doc(description='Delete a file', params={"cid": "id of a case", "tid": "id of a task", "fid": "id of a file"})
class DeleteFile(Resource):
method_decorators = [api_required]
@api.doc(params={
})
def get(self, cid, tid, fid):
case = CommonModel.get_case(cid)
if case:
current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"])
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
task = CommonModel.get_task(tid)
if task:
file = CommonModel.get_file(fid)
if file and file in task.files:
if TaskModel.delete_file(file, task, current_user):
return {"message": "File Deleted"}, 200
return {"message": "Task Not found"}, 404
return {"message": "Permission denied"}, 403
return {"message": "Case Not found"}, 404

View File

@ -1,610 +0,0 @@
import os
import ast
import uuid
import datetime
from app.utils.utils import get_modules_list, MODULES, MODULES_CONFIG
from .. import db
from ..db_class.db import *
from sqlalchemy import desc, and_
from ..notification import notification_core as NotifModel
from dateutil import relativedelta
from ..tools.tools_core import create_case_from_template
from . import common_core as CommonModel
from . import task_core as TaskModel
def delete_case(cid, current_user):
"""Delete a case by is id"""
case = CommonModel.get_case(cid)
if case:
# Delete all tasks in the case
for task in case.tasks:
TaskModel.delete_task(task.id, current_user)
history_path = os.path.join(CommonModel.HISTORY_FOLDER, str(case.uuid))
if os.path.isfile(history_path):
try:
os.remove(history_path)
except:
return False
NotifModel.create_notification_all_orgs(f"Case: '{case.id}-{case.title}' was deleted", cid, html_icon="fa-solid fa-trash", current_user=current_user)
Case_Tags.query.filter_by(case_id=case.id).delete()
Case_Galaxy_Tags.query.filter_by(case_id=case.id).delete()
Case_Org.query.filter_by(case_id=case.id).delete()
db.session.delete(case)
db.session.commit()
return True
return False
def complete_case(cid, current_user):
"""Complete case by is id"""
case = CommonModel.get_case(cid)
if case is not None:
case.completed = not case.completed
if case.completed:
case.status_id = Status.query.filter_by(name="Finished").first().id
for task in case.tasks:
TaskModel.complete_task(task.id, current_user)
NotifModel.create_notification_all_orgs(f"Case: '{case.id}-{case.title}' is now completed", cid, html_icon="fa-solid fa-square-check", current_user=current_user)
else:
case.status_id = Status.query.filter_by(name="Created").first().id
NotifModel.create_notification_all_orgs(f"Case: '{case.id}-{case.title}' is now revived", cid, html_icon="fa-solid fa-heart-circle-plus", current_user=current_user)
CommonModel.update_last_modif(cid)
db.session.commit()
CommonModel.save_history(case.uuid, current_user, "Case completed")
return True
return False
def create_case(form_dict, user):
"""Add a case to the DB"""
if "template_select" in form_dict and not 0 in form_dict["template_select"]:
for template in form_dict["template_select"]:
if Case_Template.query.get(template):
case = create_case_from_template(template, form_dict["title_template"], user)
else:
deadline = CommonModel.deadline_check(form_dict["deadline_date"], form_dict["deadline_time"])
case = Case(
title=form_dict["title"],
description=form_dict["description"],
uuid=str(uuid.uuid4()),
creation_date=datetime.datetime.now(tz=datetime.timezone.utc),
last_modif=datetime.datetime.now(tz=datetime.timezone.utc),
deadline=deadline,
status_id=1,
owner_org_id=user.org_id
)
db.session.add(case)
db.session.commit()
for tags in form_dict["tags"]:
tag = CommonModel.get_tag(tags)
case_tag = Case_Tags(
tag_id=tag.id,
case_id=case.id
)
db.session.add(case_tag)
db.session.commit()
for clusters in form_dict["clusters"]:
cluster = CommonModel.get_cluster_by_name(clusters)
case_galaxy = Case_Galaxy_Tags(
cluster_id=cluster.id,
case_id=case.id
)
db.session.add(case_galaxy)
db.session.commit()
if "tasks_templates" in form_dict and not 0 in form_dict["tasks_templates"]:
for tid in form_dict["tasks_templates"]:
task = Task_Template.query.get(tid)
t = Task(
uuid=str(uuid.uuid4()),
title=task.title,
description=task.description,
url=task.url,
creation_date=datetime.datetime.now(tz=datetime.timezone.utc),
last_modif=datetime.datetime.now(tz=datetime.timezone.utc),
case_id=case.id,
status_id=1
)
db.session.add(t)
db.session.commit()
for t_t in Task_Template_Tags.query.filter_by(task_id=task.id).all():
task_tag = Task_Tags(
task_id=t.id,
tag_id=t_t.tag_id
)
db.session.add(task_tag)
db.session.commit()
for t_t in Task_Template_Galaxy_Tags.query.filter_by(task_id=task.id).all():
task_galaxy = Task_Galaxy_Tags(
task_id=t.id,
cluster_id=t_t.cluster_id
)
db.session.add(task_galaxy)
db.session.commit()
# Add the current user's org to the case
case_org = Case_Org(
case_id=case.id,
org_id=user.org_id
)
db.session.add(case_org)
db.session.commit()
CommonModel.save_history(case.uuid, user, "Case Created")
return case
def edit_case(form_dict, cid, current_user):
"""Edit a case to the DB"""
case = CommonModel.get_case(cid)
deadline = CommonModel.deadline_check(form_dict["deadline_date"], form_dict["deadline_time"])
case.title = form_dict["title"]
case.description=form_dict["description"]
case.deadline=deadline
## Tags
case_tag_db = Case_Tags.query.filter_by(case_id=case.id).all()
for tags in form_dict["tags"]:
tag = CommonModel.get_tag(tags)
if not tags in case_tag_db:
case_tag = Case_Tags(
tag_id=tag.id,
case_id=case.id
)
db.session.add(case_tag)
db.session.commit()
for c_t_db in case_tag_db:
if not c_t_db in form_dict["tags"]:
Case_Tags.query.filter_by(id=c_t_db.id).delete()
db.session.commit()
## Clusters
case_cluster_db = Case_Galaxy_Tags.query.filter_by(case_id=case.id).all()
for clusters in form_dict["clusters"]:
cluster = CommonModel.get_cluster_by_name(clusters)
if not clusters in case_cluster_db:
case_galaxy_tag = Case_Galaxy_Tags(
cluster_id=cluster.id,
case_id=case.id
)
db.session.add(case_galaxy_tag)
db.session.commit()
for c_t_db in case_cluster_db:
if not c_t_db in form_dict["clusters"]:
Case_Galaxy_Tags.query.filter_by(id=c_t_db.id).delete()
db.session.commit()
CommonModel.update_last_modif(cid)
db.session.commit()
CommonModel.save_history(case.uuid, current_user, f"Case edited")
def add_orgs_case(form_dict, cid, current_user):
"""Add orgs to case in th DB"""
for org_id in form_dict["org_id"]:
case_org = Case_Org(
case_id=cid,
org_id=org_id
)
db.session.add(case_org)
case = CommonModel.get_case(cid)
NotifModel.create_notification_org(f"{CommonModel.get_org(org_id).name} add to case: '{case.id}-{case.title}'", cid, org_id, html_icon="fa-solid fa-sitemap", current_user=current_user)
CommonModel.update_last_modif(cid)
db.session.commit()
case = CommonModel.get_case(cid)
CommonModel.save_history(case.uuid, current_user, f"Org {org_id} added")
return True
def remove_org_case(case_id, org_id, current_user):
"""Remove an org from a case"""
case_org = Case_Org.query.filter_by(case_id=case_id, org_id=org_id).first()
if case_org:
db.session.delete(case_org)
case = CommonModel.get_case(case_id)
NotifModel.create_notification_org(f"{CommonModel.get_org(org_id).name} removed from case: '{case.id}-{case.title}'", case_id, org_id, html_icon="fa-solid fa-door-open", current_user=current_user)
CommonModel.update_last_modif(case_id)
db.session.commit()
case = CommonModel.get_case(case_id)
CommonModel.save_history(case.uuid, current_user, f"Org {org_id} removed")
return True
return False
def get_present_in_case(case_id, user):
"""Return if current user is present in a case"""
orgs_in_case = CommonModel.get_orgs_in_case(case_id)
present_in_case = False
for org in orgs_in_case:
if org.id == user.org_id:
present_in_case = True
return present_in_case
def change_status_core(status, case, current_user):
case.status_id = status
CommonModel.update_last_modif(case.id)
db.session.commit()
CommonModel.save_history(case.uuid, current_user, "Case Status changed")
return True
def regroup_case_info(cases, user, nb_pages=None):
loc = dict()
loc["cases"] = list()
for case in cases:
present_in_case = get_present_in_case(case.id, user)
case_loc = case.to_json()
case_loc["present_in_case"] = present_in_case
case_loc["current_user_permission"] = CommonModel.get_role(user).to_json()
loc["cases"].append(case_loc)
if nb_pages:
loc["nb_pages"] = nb_pages
else:
try:
loc["nb_pages"] = cases.pages
except:
pass
return loc
def build_case_query(page, completed, tags=None, taxonomies=None, galaxies=None, clusters=None, filter=None):
query = Case.query
conditions = [Case.completed == completed]
if tags or taxonomies:
query = query.join(Case_Tags, Case_Tags.case_id == Case.id)
query = query.join(Tags, Case_Tags.tag_id == Tags.id)
if tags:
tags = ast.literal_eval(tags)
conditions.append(Tags.name.in_(list(tags)))
if taxonomies:
taxonomies = ast.literal_eval(taxonomies)
query = query.join(Taxonomy, Taxonomy.id == Tags.taxonomy_id)
conditions.append(Taxonomy.name.in_(list(taxonomies)))
if clusters or galaxies:
query = query.join(Case_Galaxy_Tags, Case_Galaxy_Tags.case_id == Case.id)
query = query.join(Cluster, Case_Galaxy_Tags.cluster_id == Cluster.id)
if clusters:
clusters = ast.literal_eval(clusters)
conditions.append(Cluster.name.in_(list(clusters)))
if galaxies:
galaxies = ast.literal_eval(galaxies)
query = query.join(Galaxy, Galaxy.id == Cluster.galaxy_id)
conditions.append(Galaxy.name.in_(list(galaxies)))
if filter:
query.order_by(desc(filter))
return query.filter(and_(*conditions)).paginate(page=page, per_page=25, max_per_page=50)
def sort_by_status(page, taxonomies=[], galaxies=[], tags=[], clusters=[], or_and_taxo="true", or_and_galaxies="true", completed=False):
cases = build_case_query(page, completed, tags, taxonomies, galaxies, clusters)
if tags:
tags = ast.literal_eval(tags)
if taxonomies:
taxonomies = ast.literal_eval(taxonomies)
if galaxies:
galaxies = ast.literal_eval(galaxies)
if clusters:
clusters = ast.literal_eval(clusters)
if tags or taxonomies or galaxies or clusters:
if or_and_taxo == "false":
glob_list = []
for case in cases:
tags_db = case.to_json()["tags"]
loc_tag = [tag["name"] for tag in tags_db]
taxo_list = [Taxonomy.query.get(tag["taxonomy_id"]).name for tag in tags_db]
if (not tags or all(item in loc_tag for item in tags)) and \
(not taxonomies or all(item in taxo_list for item in taxonomies)):
glob_list.append(case)
cases = glob_list
if or_and_galaxies == "false":
glob_list = []
for case in cases:
clusters_db = case.to_json()["clusters"]
loc_cluster = [cluster["name"] for cluster in clusters_db]
galaxies_list = [Galaxy.query.get(cluster["galaxy_id"]).name for cluster in clusters_db]
if (not clusters or all(item in loc_cluster for item in clusters)) and \
(not galaxies or all(item in galaxies_list for item in galaxies)):
glob_list.append(case)
cases = glob_list
else:
cases = Case.query.filter_by(completed=completed).paginate(page=page, per_page=25, max_per_page=50)
return cases
def sort_by_filter(filter, page, taxonomies=[], galaxies=[], tags=[], clusters=[], or_and_taxo="true", or_and_galaxies="true", completed=False):
cases = build_case_query(page, completed, tags, taxonomies, galaxies, clusters, filter)
nb_pages = cases.pages
if tags:
tags = ast.literal_eval(tags)
if taxonomies:
taxonomies = ast.literal_eval(taxonomies)
if galaxies:
galaxies = ast.literal_eval(galaxies)
if clusters:
clusters = ast.literal_eval(clusters)
if tags or taxonomies or galaxies or clusters:
if or_and_taxo == "false":
glob_list = []
for case in cases:
tags_db = case.to_json()["tags"]
loc_tag = [tag["name"] for tag in tags_db]
taxo_list = [Taxonomy.query.get(tag["taxonomy_id"]).name for tag in tags_db]
if (not tags or all(item in loc_tag for item in tags)) and \
(not taxonomies or all(item in taxo_list for item in taxonomies)):
glob_list.append(case)
cases = glob_list
if or_and_galaxies == "false":
glob_list = []
for case in cases:
clusters_db = case.to_json()["clusters"]
loc_cluster = [cluster["name"] for cluster in clusters_db]
galaxies_list = [Galaxy.query.get(cluster["galaxy_id"]).name for cluster in clusters_db]
if (not clusters or all(item in loc_cluster for item in clusters)) and \
(not galaxies or all(item in galaxies_list for item in galaxies)):
glob_list.append(case)
cases = glob_list
else:
cases = Case.query.filter_by(completed=completed).order_by(desc(filter)).paginate(page=page, per_page=25, max_per_page=50)
nb_pages = cases.pages
# for deadline filter, only case with a deadline defined is required
loc = list()
for case in cases:
if getattr(case, filter):
loc.append(case)
return loc, nb_pages
def fork_case_core(cid, case_title_fork, user):
case_title_stored = CommonModel.get_case_by_title(case_title_fork)
if case_title_stored:
return {"message": "Error, title already exist"}
case = CommonModel.get_case(cid)
case_json = case.to_json()
case_json["title"] = case_title_fork
if case.deadline:
case_json["deadline_date"] = datetime.datetime.strptime(case_json["deadline"].split(" ")[0], "%Y-%m-%d").date()
case_json["deadline_time"] = datetime.datetime.strptime(case_json["deadline"].split(" ")[1], "%H:%M").time()
else:
case_json["deadline_date"] = None
case_json["deadline_time"] = None
new_case = create_case(case_json, user)
for task in case.tasks:
task_json = task.to_json()
if task.deadline:
task_json["deadline_date"] = datetime.datetime.strptime(task_json["deadline"].split(" ")[0], "%Y-%m-%d").date()
task_json["deadline_time"] = datetime.datetime.strptime(task_json["deadline"].split(" ")[1], "%H:%M").time()
else:
task_json["deadline_date"] = None
task_json["deadline_time"] = None
TaskModel.create_task(task_json, new_case.id, user)
return new_case
def create_template_from_case(cid, case_title_template):
if Case_Template.query.filter_by(title=case_title_template).first():
return {"message": "Error, title already exist"}
case = CommonModel.get_case(cid)
new_template = Case_Template(
uuid=str(uuid.uuid4()),
title=case_title_template,
description=case.description
)
db.session.add(new_template)
db.session.commit()
for c_t in Case_Tags.query.filter_by(case_id=case.id).all():
case_tag = Case_Template_Tags(
case_id=new_template.id,
tag_id=c_t.tag_id
)
db.session.add(case_tag)
db.session.commit()
for c_t in Case_Galaxy_Tags.query.filter_by(case_id=case.id).all():
case_cluster = Case_Template_Galaxy_Tags(
template_id=new_template.id,
cluster_id=c_t.cluster_id
)
db.session.add(case_cluster)
db.session.commit()
for task in case.tasks:
task_exist = Task_Template.query.filter_by(title=task.title).first()
if not task_exist:
task_template = Task_Template(
uuid=str(uuid.uuid4()),
title=task.title,
description=task.description,
url=task.url
)
db.session.add(task_template)
db.session.commit()
## Tags
for t_t in Task_Tags.query.filter_by(task_id=task.id).all():
task_tag = Task_Template_Tags(
task_id=task_template.id,
tag_id=t_t.tag_id
)
db.session.add(task_tag)
db.session.commit()
## Clusters
for t_t in Task_Galaxy_Tags.query.filter_by(task_id=task.id).all():
task_cluster = Task_Template_Galaxy_Tags(
template_id=task_template.id,
cluster_id=t_t.cluster_id
)
db.session.add(task_cluster)
db.session.commit()
## Task Connectors
for t_c in Task_Connector_Instance.query.filter_by(task_id=task.id).all():
task_connector = Task_Template_Connector_Instance(
template_id=task_template.id,
instance_id=t_c.instance_id
)
db.session.add(task_connector)
db.session.commit()
case_task_template = Case_Task_Template(
case_id=new_template.id,
task_id=task_template.id
)
db.session.add(case_task_template)
db.session.commit()
else:
case_task_template = Case_Task_Template(
case_id=new_template.id,
task_id=task_exist.id
)
db.session.add(case_task_template)
db.session.commit()
return new_template
def change_recurring(form_dict, cid, current_user):
case = CommonModel.get_case(cid)
recurring_status = Status.query.filter_by(name="Recurring").first()
created_status = Status.query.filter_by(name="Created").first()
if "once" in form_dict and form_dict["once"]:
case.recurring_type = "once"
case.recurring_date = form_dict["once"]
case.status_id = recurring_status.id
elif "daily" in form_dict and form_dict["daily"]:
case.recurring_type = "daily"
case.recurring_date = datetime.datetime.today() + datetime.timedelta(days=1)
case.status_id = recurring_status.id
elif "weekly" in form_dict and form_dict["weekly"]:
case.recurring_type = "weekly"
if form_dict["weekly"]<datetime.datetime.today().date():
case.recurring_date = datetime.datetime.today() + datetime.timedelta(
days=(form_dict["weekly"].weekday() - datetime.datetime.today().weekday() + 7)
)
else:
case.recurring_date = form_dict["weekly"]
case.status_id = recurring_status.id
elif "monthly" in form_dict and form_dict["monthly"]:
case.recurring_type = "monthly"
if form_dict["monthly"]<datetime.datetime.today().date():
case.recurring_date = form_dict["monthly"] + relativedelta.relativedelta(months=1)
else:
case.recurring_date = form_dict["monthly"]
case.status_id = recurring_status.id
elif "remove" in form_dict and form_dict["remove"]:
case.recurring_type = None
case.recurring_date = None
case.status_id = created_status.id
for notif in Recurring_Notification.query.filter_by(case_id=cid).all():
db.session.delete(notif)
db.session.commit()
else:
return False
db.session.commit()
CommonModel.save_history(case.uuid, current_user, "Recurring changed")
return True
def notify_user(task, user_id):
case = CommonModel.get_case(task.case_id)
message = f"Notify for task '{task.id}-{task.title}' of case '{case.id}-{case.title}'"
NotifModel.create_notification_user(message, task.case_id, user_id=user_id, html_icon="fa-solid fa-bell")
return True
def notify_user_recurring(form_dict, case_id, orgs):
for org in orgs:
if f"check_{org.id}" in form_dict:
for user in org.users:
if not Recurring_Notification.query.filter_by(case_id=case_id, user_id=user.id).first():
rec_notif = Recurring_Notification(case_id=case_id, user_id=user.id)
db.session.add(rec_notif)
db.session.commit()
else:
for user in org.users:
if f"check_{org.id}_user_{user.id}" in form_dict:
if not Recurring_Notification.query.filter_by(case_id=case_id, user_id=user.id).first():
rec_notif = Recurring_Notification(case_id=case_id, user_id=user.id)
db.session.add(rec_notif)
db.session.commit()
else:
notif = Recurring_Notification.query.filter_by(case_id=case_id, user_id=user.id).first()
if notif:
db.session.delete(notif)
db.session.commit()
def get_modules():
return MODULES_CONFIG
def get_instance_module_core(module, user_id):
connector = CommonModel.get_connector_by_name(MODULES_CONFIG[module]["config"]["connector"])
instance_list = list()
for instance in connector.instances:
if CommonModel.get_user_instance_both(user_id=user_id, instance_id=instance.id):
instance_list.append(instance.to_json())
return instance_list

View File

@ -1,160 +0,0 @@
from ..db_class.db import Case, User
from datetime import datetime
from . import common_core as CommonModel
from ..utils.utils import check_tag
def get_user_api(api_key):
return User.query.filter_by(api_key=api_key).first()
def verif_set_recurring(data_dict):
if "once" in data_dict:
try:
data_dict["once"] = datetime.strptime(data_dict["once"], '%Y-%m-%d')
except:
return {"message": "once date bad format, YYYY-mm-dd"}
if "weekly" in data_dict:
try:
data_dict["weekly"] = datetime.strptime(data_dict["weekly"], '%Y-%m-%d').date()
except:
return {"message": "weekly date bad format, YYYY-mm-dd"}
if "monthly" in data_dict:
try:
data_dict["monthly"] = datetime.strptime(data_dict["monthly"], '%Y-%m-%d').date()
except:
return {"message": "monthly date bad format, YYYY-mm-dd"}
return data_dict
def verif_create_case_task(data_dict, isCase):
if "title" not in data_dict or not data_dict["title"]:
return {"message": "Please give a title"}
elif Case.query.filter_by(title=data_dict["title"]).first():
return {"message": "Title already exist"}
if "description" not in data_dict or not data_dict["description"]:
data_dict["description"] = ""
if "deadline_date" in data_dict:
try:
data_dict["deadline_date"] = datetime.strptime(data_dict["deadline_date"], '%Y-%m-%d')
except:
return {"message": "deadline_date bad format"}
else:
data_dict["deadline_date"] = ""
if "deadline_time" in data_dict:
try:
data_dict["deadline_time"] = datetime.strptime(data_dict["deadline_time"], '%H-%M')
except:
return {"message": "deadline_time bad format"}
else:
data_dict["deadline_time"] = ""
if "tags" in data_dict:
for tag in data_dict["tags"]:
if not check_tag(tag):
return {"message": f"Tag '{tag}' doesn't exist"}
else:
data_dict["tags"] = []
if "clusters" in data_dict:
for cluster in data_dict["clusters"]:
if not CommonModel.check_cluster(cluster):
return {"message": f"Cluster '{cluster}' doesn't exist"}
else:
data_dict["clusters"] = []
if not isCase:
if "url" not in data_dict or not data_dict["url"]:
data_dict["url"] = ""
if "connectors" in data_dict:
for connector in data_dict["connectors"]:
if not CommonModel.check_connector(connector):
return {"message": f"Connector '{connector}' doesn't exist"}
else:
data_dict["connectors"] = []
return data_dict
def common_verif(data_dict, case_task):
if "description" not in data_dict or not data_dict["description"]:
data_dict["description"] = case_task.description
if "deadline_date" in data_dict:
try:
data_dict["deadline_date"] = datetime.strptime(data_dict["deadline_date"], '%Y-%m-%d')
except:
return {"message": "date bad format"}
elif case_task.deadline:
data_dict["deadline_date"] = case_task.deadline.strftime('%Y-%m-%d')
else:
data_dict["deadline_date"] = ""
if "deadline_time" in data_dict:
try:
data_dict["deadline_time"] = datetime.strptime(data_dict["deadline_time"], '%H-%M')
except:
return {"message": "time bad format"}
elif case_task.deadline:
data_dict["deadline_time"] = case_task.deadline.strftime('%H-%M')
else:
data_dict["deadline_time"] = ""
if "tags" in data_dict:
for tag in data_dict["tags"]:
if not check_tag(tag):
return {"message": f"Tag '{tag}' doesn't exist"}
elif case_task.to_json()["tags"]:
data_dict["tags"] = case_task.to_json()["tags"]
else:
data_dict["tags"] = []
if "clusters" in data_dict:
for cluster in data_dict["clusters"]:
if not CommonModel.check_cluster(cluster):
return {"message": f"Cluster '{cluster}' doesn't exist"}
elif case_task.to_json()["clusters"]:
data_dict["clusters"] = case_task.to_json()["clusters"]
else:
data_dict["clusters"] = []
return data_dict
def verif_edit_case(data_dict, case_id):
case = CommonModel.get_case(case_id)
if "title" not in data_dict or data_dict["title"] == case.title or not data_dict["title"]:
data_dict["title"] = case.title
elif Case.query.filter_by(title=data_dict["title"]).first():
return {"message": "Title already exist"}
data_dict = common_verif(data_dict, case)
return data_dict
def verif_edit_task(data_dict, task_id):
task = CommonModel.get_task(task_id)
if "title" not in data_dict or data_dict["title"] == task.title or not data_dict["title"]:
data_dict["title"] = task.title
data_dict = common_verif(data_dict, task)
if "url" not in data_dict or not data_dict["url"]:
data_dict["url"] = task.url
if "connectors" in data_dict:
for connector in data_dict["connectors"]:
if not CommonModel.check_connector(connector):
return {"message": f"connector '{connector}' doesn't exist"}
elif task.to_json()["connectors"]:
data_dict["connectors"] = task.to_json()["connectors"]
else:
data_dict["connectors"] = []
return data_dict

View File

@ -1,251 +0,0 @@
import os
import shutil
import datetime
from flask import flash
from .. import db
from ..db_class.db import *
from ..utils.utils import isUUID, create_specific_dir
from sqlalchemy import desc, func
from ..utils import utils
UPLOAD_FOLDER = os.path.join(os.getcwd(), "uploads")
TEMP_FOLDER = os.path.join(os.getcwd(), "temp")
HISTORY_FOLDER = os.path.join(os.getcwd(), "history")
def get_case(cid):
"""Return a case by is id"""
if isUUID(cid):
case = Case.query.filter_by(uuid=cid).first()
elif str(cid).isdigit():
case = Case.query.get(cid)
else:
case = None
return case
def get_task(tid):
"""Return a task by is id"""
if isUUID(tid):
case = Task.query.filter_by(uuid=tid).first()
elif str(tid).isdigit():
case = Task.query.get(tid)
else:
case = None
return case
def get_all_cases():
"""Return all cases"""
return Case.query.filter_by(completed=False).order_by(desc(Case.last_modif))
def get_case_by_completed(completed):
return Case.query.filter_by(completed=completed)
def get_case_by_title(title):
return Case.query.where(func.lower(Case.title)==func.lower(title)).first()
def get_case_template_by_title(title):
return Case_Template.query.filter_by(title=title).first()
def get_task_templates():
return Task_Template.query.all()
def search(text):
"""Return cases containing text"""
return Case.query.where(Case.title.contains(text), Case.completed==False).paginate(page=1, per_page=30, max_per_page=50)
def get_all_users_core(case):
return Org.query.join(Case_Org, Case_Org.case_id==case.id).where(Case_Org.org_id==Org.id).all()
def get_role(user):
"""Return role for the current user"""
return Role.query.get(user.role_id)
def get_org(oid):
"""Return an org by is id"""
return Org.query.get(oid)
def get_org_by_name(name):
"""Return an org by is name"""
return Org.query.filter_by(name=name).first()
def get_org_order_by_name():
"""Return all orgs order by name"""
return Org.query.order_by("name")
def get_org_in_case(org_id, case_id):
return Case_Org.query.filter_by(case_id=case_id, org_id=org_id).first()
def get_org_in_case_by_case_id(case_id):
return Case_Org.query.filter_by(case_id=case_id).all()
def get_orgs_in_case(case_id):
"""Return orgs present in a case"""
case_org = Case_Org.query.filter_by(case_id=case_id).all()
return [Org.query.get(c_o.org_id) for c_o in case_org ]
def get_file(fid):
return File.query.get(fid)
def get_all_status():
return Status.query.all()
def get_status(sid):
return Status.query.get(sid).first()
def get_recu_notif_user(case_id, user_id):
return Recurring_Notification.query.filter_by(case_id=case_id, user_id=user_id).first()
def get_taxonomies():
return [taxo.to_json() for taxo in Taxonomy.query.filter_by(exclude=False).all()]
def get_galaxy(galaxy_id):
return Galaxy.query.get(galaxy_id)
def get_galaxies():
return [galax.to_json() for galax in Galaxy.query.filter_by(exclude=False).order_by('name').all()]
def get_clusters():
return [cluster.to_json() for cluster in Cluster.query.all()]
def get_cluster_by_name(cluster):
return Cluster.query.filter_by(name=cluster).first()
def get_clusters_galaxy(galaxies):
out = dict()
for galaxy in galaxies:
out[galaxy] = [cluster.to_json() for cluster in Cluster.query.join(Galaxy, Galaxy.id==Cluster.galaxy_id).where(Galaxy.name==galaxy).all() if not cluster.exclude]
return out
def get_tags(taxos):
out = dict()
for taxo in taxos:
out[taxo] = [tag.to_json() for tag in Taxonomy.query.filter_by(name=taxo).first().tags if not tag.exclude]
return out
def get_tag(tag):
return Tags.query.filter_by(name=tag).first()
def get_case_tags(cid):
return [tag.name for tag in Tags.query.join(Case_Tags, Case_Tags.tag_id==Tags.id).filter_by(case_id=cid).all()]
def get_case_clusters(cid):
return [cluster for cluster in Cluster.query.join(Case_Galaxy_Tags, Case_Galaxy_Tags.cluster_id==Cluster.id).filter_by(case_id=cid).all()]
def get_task_tags(tid):
return [tag.name for tag in Tags.query.join(Task_Tags, Task_Tags.tag_id==Tags.id).filter_by(task_id=tid).all()]
def get_task_clusters(tid):
return [cluster for cluster in Cluster.query.join(Task_Galaxy_Tags, Task_Galaxy_Tags.cluster_id==Cluster.id).filter_by(task_id=tid).all()]
def get_connectors():
return Connector.query.all()
def get_connector(cid):
return Connector.query.get(cid)
def get_connector_by_name(name):
return Connector.query.where(Connector.name.like(name)).first()
def get_instance(iid):
return Connector_Instance.query.get(iid)
def get_instance_by_name(name):
return Connector_Instance.query.filter_by(name=name).first()
def get_task_connectors(tid):
return Task_Connector_Instance.query.filter_by(task_id=tid).all()
def get_user_instance_both(user_id, instance_id):
return User_Connector_Instance.query.filter_by(user_id=user_id, instance_id=instance_id).all()
def get_user_instance_by_instance(instance_id):
"""Return a user instance by instance id"""
return User_Connector_Instance.query.filter_by(instance_id=instance_id).first()
def get_history(case_uuid):
try:
path_history = os.path.join(HISTORY_FOLDER, str(case_uuid))
with open(path_history, "r") as read_file:
loc_file = read_file.read().splitlines()
return loc_file
except:
return False
def save_history(case_uuid, current_user, message):
create_specific_dir(HISTORY_FOLDER)
path_history = os.path.join(HISTORY_FOLDER, str(case_uuid))
with open(path_history, "a") as write_history:
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
write_history.write(f"[{now}]({current_user.first_name} {current_user.last_name}): {message}\n")
def update_last_modif(case_id):
"""Update 'last_modif' of a case"""
case = Case.query.get(case_id)
case.last_modif = datetime.datetime.now(tz=datetime.timezone.utc)
db.session.commit()
def update_last_modif_task(task_id):
"""Update 'last_modif' of a task"""
if task_id:
task = Task.query.get(task_id)
task.last_modif = datetime.datetime.now(tz=datetime.timezone.utc)
db.session.commit()
def deadline_check(date, time):
"""Combine the date and the time if time exist"""
deadline = None
if date and time:
deadline = datetime.datetime.combine(date, time)
elif date:
deadline = date
return deadline
def delete_temp_folder():
shutil.rmtree(TEMP_FOLDER)
def check_cluster_db(cluster):
return Cluster.query.filter_by(name=cluster).first()
def check_tag(tag_list):
flag = True
for tag in tag_list:
if not utils.check_tag(tag):
flag = False
if not flag:
flash("tag doesn't exist")
return flag
def check_cluster(cluster_list):
flag = True
for cluster in cluster_list:
if not check_cluster_db(cluster):
flag = False
if not flag:
flash("cluster doesn't exist")
return flag
def check_connector(connector_list):
flag = True
for connector in connector_list:
if not get_instance_by_name(connector):
flag = False
if not flag:
flash("Connector doesn't exist")
return flag

View File

@ -1,116 +0,0 @@
from flask_wtf import FlaskForm
from wtforms import ValidationError
from wtforms.fields import (
StringField,
SubmitField,
SelectMultipleField,
TextAreaField,
DateField,
TimeField,
HiddenField,
BooleanField
)
from wtforms.validators import InputRequired, Length, Optional
from ..db_class.db import Case, Case_Org
class CaseForm(FlaskForm):
title = StringField('Title', validators=[Optional(), Length(1, 64)])
description = TextAreaField('Description', validators=[Optional()])
deadline_date = DateField('deadline_date', validators=[Optional()])
deadline_time = TimeField("deadline_time", validators=[Optional()])
template_select = SelectMultipleField(u'Templates', coerce=int)
title_template = StringField('Title Template', validators=[Optional(), Length(1, 64)])
tasks_templates = SelectMultipleField(u'Tasks Templates', coerce=int)
submit = SubmitField('Create')
def validate_title(self, field):
if not field.data and 0 in self.template_select.data:
raise ValidationError("Need to select a title or a template")
if Case.query.filter_by(title=field.data).first():
raise ValidationError("The title already exist")
def validate_template_select(self, field):
if 0 in field.data and not self.title.data:
raise ValidationError("Need to select a template or a title")
if not 0 in field.data and not self.title_template.data:
raise ValidationError("Need a title for the case")
def validate_deadline_time(self, field):
if field.data and not self.deadline_date.data:
raise ValidationError("Choose a date")
def validate_title_template(self, field):
if field.data and not self.template_select.data or 0 in self.template_select.data:
raise ValidationError("A template need to be selected")
if Case.query.filter_by(title=field.data).first():
raise ValidationError("The title already exist")
class CaseEditForm(FlaskForm):
title = StringField('Title', validators=[InputRequired(), Length(1, 64)])
description = TextAreaField('Description', validators=[Optional()])
deadline_date = DateField('deadline_date', validators=[Optional()])
deadline_time = TimeField("deadline_time", validators=[Optional()])
submit = SubmitField('Save')
def validate_deadline_time(self, field):
if field.data and not self.deadline_date.data:
raise ValidationError("Choose a date")
class TaskForm(FlaskForm):
title = StringField('Title', validators=[Optional(), Length(1, 64)])
description = TextAreaField('Description', validators=[Optional()])
url = StringField('Tool/Link', validators=[Optional(), Length(0, 64)])
deadline_date = DateField('deadline_date', validators=[Optional()])
deadline_time = TimeField("deadline_time", validators=[Optional()])
template_select = SelectMultipleField(u'Templates', coerce=int)
submit = SubmitField('Create')
def validate_title(self, field):
if not field.data and 0 in self.template_select.data:
raise ValidationError("Need to select a title or a template")
def validate_template_select(self, field):
if 0 in field.data and not self.title.data:
raise ValidationError("Need to select a template or a title")
def validate_deadline_time(self, field):
if field.data and not self.deadline_date.data:
raise ValidationError("Choose a date")
class TaskEditForm(FlaskForm):
title = StringField('Title', validators=[InputRequired(), Length(1, 64)])
description = TextAreaField('Description', validators=[Optional()])
url = StringField('Tool/Link', validators=[Optional(), Length(0, 64)])
deadline_date = DateField('deadline_date', validators=[Optional()])
deadline_time = TimeField("deadline_time", validators=[Optional()])
submit = SubmitField('Save')
def validate_deadline_time(self, field):
if field.data and not self.deadline_date.data:
raise ValidationError("Choose a date")
class AddOrgsCase(FlaskForm):
org_id = SelectMultipleField(u'Orgs', coerce=int)
case_id = HiddenField("")
submit = SubmitField('Add')
def validate_org_id(self, field):
for org in field.data:
if Case_Org.query.filter_by(case_id = self.case_id.data, org_id=org).first():
raise ValidationError(f"Org {org} already in case")
class RecurringForm(FlaskForm):
case_id = HiddenField("")
once = DateField('One day', validators=[Optional()])
daily = BooleanField('Daily', validators=[Optional()])
weekly = DateField("Start date", validators=[Optional()])
monthly = DateField("Start date", validators=[Optional()])
remove = BooleanField('Remove', validators=[Optional()])
submit = SubmitField('Save')

View File

@ -1,464 +0,0 @@
from flask import Blueprint, render_template, redirect, jsonify, request, flash
from .form import TaskEditForm, TaskForm
from flask_login import login_required, current_user
from . import case_core as CaseModel
from . import common_core as CommonModel
from . import task_core as TaskModel
from ..decorators import editor_required
from ..utils.utils import form_to_dict
task_blueprint = Blueprint(
'task',
__name__,
template_folder='templates',
static_folder='static'
)
@task_blueprint.route("/<cid>/create_task", methods=['GET', 'POST'])
@login_required
def create_task(cid):
"""View of a case"""
if CommonModel.get_case(cid):
present_in_case = CaseModel.get_present_in_case(cid, current_user)
if present_in_case or current_user.is_admin():
form = TaskForm()
form.template_select.choices = [(template.id, template.title) for template in CommonModel.get_task_templates()]
form.template_select.choices.insert(0, (0," "))
if form.validate_on_submit():
tag_list = request.form.getlist("tags_select")
cluster_list = request.form.getlist("clusters_select")
connector_list = request.form.getlist("connectors_select")
if CommonModel.check_tag(tag_list):
if CommonModel.check_cluster(cluster_list):
form_dict = form_to_dict(form)
form_dict["tags"] = tag_list
form_dict["clusters"] = cluster_list
form_dict["connectors"] = connector_list
if TaskModel.create_task(form_dict, cid, current_user):
flash("Task created", "success")
else:
flash("Error Task Created", "error")
return redirect(f"/case/{cid}")
return render_template("case/create_task.html", form=form)
return render_template("case/create_task.html", form=form)
return render_template("case/create_task.html", form=form)
return redirect(f"/case/{cid}")
return render_template("404.html")
@task_blueprint.route("/<cid>/edit_task/<tid>", methods=['GET','POST'])
@login_required
@editor_required
def edit_task(cid, tid):
"""Edit the task"""
if CommonModel.get_case(cid):
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
form = TaskEditForm()
if form.validate_on_submit():
tag_list = request.form.getlist("tags_select")
cluster_list = request.form.getlist("clusters_select")
connector_list = request.form.getlist("connectors_select")
if CommonModel.check_tag(tag_list):
if CommonModel.check_cluster(cluster_list):
form_dict = form_to_dict(form)
form_dict["tags"] = tag_list
form_dict["clusters"] = cluster_list
form_dict["connectors"] = connector_list
TaskModel.edit_task_core(form_dict, tid, current_user)
flash("Task edited", "success")
return redirect(f"/case/{cid}")
return render_template("case/create_task.html", form=form)
return render_template("case/create_task.html", form=form)
else:
task_modif = CommonModel.get_task(tid)
form.description.data = task_modif.description
form.title.data = task_modif.title
form.url.data = task_modif.url
form.deadline_date.data = task_modif.deadline
form.deadline_time.data = task_modif.deadline
return render_template("case/edit_task.html", form=form)
else:
flash("Access denied", "error")
return redirect(f"/case/{cid}")
return render_template("404.html")
@task_blueprint.route("/complete_task/<tid>", methods=['GET'])
@login_required
@editor_required
def complete_task(tid):
"""Complete the task"""
task = CommonModel.get_task(str(tid))
if task:
if CaseModel.get_present_in_case(task.case_id, current_user) or current_user.is_admin():
if TaskModel.complete_task(tid, current_user):
return {"message": "Task completed", "toast_class": "success-subtle"}, 200
return {"message": "Error task completed", "toast_class": "danger-subtle"}, 400
return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401
return {"message": "Task not found", "toast_class": "danger-subtle"}, 404
@task_blueprint.route("/<cid>/delete_task/<tid>", methods=['GET'])
@login_required
@editor_required
def delete_task(cid, tid):
"""Delete the task"""
if CommonModel.get_case(cid):
if CommonModel.get_task(tid):
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
if TaskModel.delete_task(tid, current_user):
return {"message": "Task deleted", "toast_class": "success-subtle"}, 200
return {"message": "Error task deleted", "toast_class": "danger-subtle"}, 400
return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401
return {"message": "Task not found", "toast_class": "danger-subtle"}, 404
return {"message": "Case not found", "toast_class": "danger-subtle"}, 404
@task_blueprint.route("/<cid>/modif_note/<tid>", methods=['POST'])
@login_required
@editor_required
def modif_note(cid, tid):
"""Modify note of the task"""
if CommonModel.get_case(cid):
if CommonModel.get_task(tid):
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
notes = request.json["notes"]
if TaskModel.modif_note_core(tid, current_user, notes):
return {"message": "Note added", "toast_class": "success-subtle"}, 200
return {"message": "Error add/modify note", "toast_class": "danger-subtle"}, 400
return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401
return {"message": "Task not found", "toast_class": "danger-subtle"}, 404
return {"message": "Case not found", "toast_class": "danger-subtle"}, 404
@task_blueprint.route("/<cid>/get_note/<tid>", methods=['GET'])
@editor_required
def get_note(cid, tid):
"""Get not of a task in text format"""
if CommonModel.get_case(cid):
task = CommonModel.get_task(tid)
if task:
return {"note": task.notes}, 201
return {"message": "Task not found", "toast_class": "danger-subtle"}, 404
return {"message": "Case not found", "toast_class": "danger-subtle"}, 404
@task_blueprint.route("/<cid>/take_task/<tid>", methods=['GET'])
@login_required
@editor_required
def take_task(cid, tid):
"""Assign current user to the task"""
if CommonModel.get_case(cid):
if CommonModel.get_task(tid):
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
if TaskModel.assign_task(tid, user=current_user, current_user=current_user, flag_current_user=True):
return {"message": "User Assigned", "toast_class": "success-subtle"}, 200
return {"message": "Error assignment", "toast_class": "danger-subtle"}, 400
return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401
return {"message": "Task not found", "toast_class": "danger-subtle"}, 404
return {"message": "Case not found", "toast_class": "danger-subtle"}, 404
@task_blueprint.route("/<cid>/assign_users/<tid>", methods=['POST'])
@login_required
@editor_required
def assign_user(cid, tid):
"""Assign a list of users to the task"""
if CommonModel.get_case(cid):
if "users_id" in request.json:
users_list = request.json["users_id"]
if CommonModel.get_task(tid):
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
for user in users_list:
TaskModel.assign_task(tid, user=user, current_user=current_user, flag_current_user=False)
return {"message": "Users Assigned", "toast_class": "success-subtle"}, 200
return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401
return {"message": "Task not found", "toast_class": "danger-subtle"}, 404
return {"message": "'users_id' is missing", "toast_class": "danger-subtle"}, 400
return {"message": "Case not found", "toast_class": "danger-subtle"}, 404
@task_blueprint.route("/<cid>/remove_assignment/<tid>", methods=['GET'])
@login_required
@editor_required
def remove_assign_task(cid, tid):
"""Remove current user assignment to the task"""
if CommonModel.get_case(cid):
if CommonModel.get_task(tid):
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
if TaskModel.remove_assign_task(tid, user=current_user, current_user=current_user, flag_current_user=True):
return {"message": "User Removed from assignment", "toast_class": "success-subtle"}, 200
return {"message": "Error removed assignment", "toast_class": "danger-subtle"}, 400
return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401
return {"message": "Task not found", "toast_class": "danger-subtle"}, 404
return {"message": "Case not found", "toast_class": "danger-subtle"}, 404
@task_blueprint.route("/<cid>/remove_assigned_user/<tid>", methods=['POST'])
@login_required
@editor_required
def remove_assigned_user(cid, tid):
"""Assign current user to the task"""
if CommonModel.get_case(cid):
if "user_id" in request.json:
user_id = request.json["user_id"]
if CommonModel.get_task(tid):
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
if TaskModel.remove_assign_task(tid, user=user_id, current_user=current_user, flag_current_user=False):
return {"message": "User Removed from assignment", "toast_class": "success-subtle"}, 200
return {"message": "Error removed assignment", "toast_class": "danger-subtle"}, 400
return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401
return {"message": "Task not found", "toast_class": "danger-subtle"}, 404
return {"message": "'user_id' is missing", "toast_class": "danger-subtle"}, 400
return {"message": "Case not found", "toast_class": "danger-subtle"}, 404
@task_blueprint.route("/<cid>/change_task_status/<tid>", methods=['POST'])
@login_required
@editor_required
def change_task_status(cid, tid):
"""Change the status of the task"""
if CommonModel.get_case(cid):
if "status" in request.json:
status = request.json["status"]
task = CommonModel.get_task(tid)
if task:
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
if TaskModel.change_task_status(status, task, current_user):
return {"message": "Status changed", "toast_class": "success-subtle"}, 200
return {"message": "Error changed status", "toast_class": "danger-subtle"}, 400
return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401
return {"message": "Task not found", "toast_class": "danger-subtle"}, 404
return {"message": "'status' is missing", "toast_class": "danger-subtle"}, 400
return {"message": "Case not found", "toast_class": "danger-subtle"}, 404
@task_blueprint.route("/task/<tid>/download_file/<fid>", methods=['GET'])
@login_required
@editor_required
def download_file(tid, fid):
"""Download the file"""
task = CommonModel.get_task(tid)
file = CommonModel.get_file(fid)
if file and file in task.files:
if CaseModel.get_present_in_case(task.case_id, current_user) or current_user.is_admin():
return TaskModel.download_file(file)
return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401
return {"message": "File not found", "toast_class": "danger-subtle"}, 404
@task_blueprint.route("/task/<tid>/delete_file/<fid>", methods=['GET'])
@login_required
@editor_required
def delete_file(tid, fid):
"""Delete the file"""
task = CommonModel.get_task(tid)
file = CommonModel.get_file(fid)
if file and file in task.files:
if CaseModel.get_present_in_case(task.case_id, current_user) or current_user.is_admin():
if TaskModel.delete_file(file, task, current_user):
return {"message": "File Deleted", "toast_class": "success-subtle"}, 200
return {"message": "Error deleting file"}, 400
return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401
return {"message": "File not found", "toast_class": "danger-subtle"}, 404
@task_blueprint.route("/<cid>/add_files/<tid>", methods=['POST'])
@login_required
@editor_required
def add_files(cid, tid):
"""Add files to a task"""
if CommonModel.get_case(cid):
task = CommonModel.get_task(tid)
if task:
if len(request.files) > 0:
if TaskModel.add_file_core(task=task, files_list=request.files, current_user=current_user):
return {"message":"Files added", "toast_class": "success-subtle"}, 200
return {"message":"Something goes wrong adding files", "toast_class": "danger-subtle"}, 400
return {"message":"No Files given", "toast_class": "warning-subtle"}, 400
return {"message":"Task not found", "toast_class": "danger-subtle"}, 404
return {"message":"Case not found", "toast_class": "danger-subtle"}, 404
@task_blueprint.route("/<cid>/get_files/<tid>", methods=['GET'])
@login_required
@editor_required
def get_files(cid, tid):
"""Get files of a task"""
if CommonModel.get_case(cid):
task = CommonModel.get_task(tid)
if task:
file_list = [file.to_json() for file in task.files]
return {"files": file_list}, 200
return {"message":"Task not found", "toast_class": "danger-subtle"}, 404
return {"message":"Case not found", "toast_class": "danger-subtle"}, 404
@task_blueprint.route("/<cid>/sort_by_ongoing_task", methods=['GET'])
@login_required
def sort_by_ongoing_task(cid):
"""Sort Task by living one"""
case = CommonModel.get_case(cid)
tags = request.args.get('tags')
or_and_taxo = request.args.get("or_and_taxo")
taxonomies = request.args.get('taxonomies')
galaxies = request.args.get('galaxies')
clusters = request.args.get('clusters')
or_and_galaxies = request.args.get("or_and_galaxies")
return TaskModel.sort_by_status_task_core(case, current_user, taxonomies, galaxies, tags, clusters, or_and_taxo, or_and_galaxies, completed=False)
@task_blueprint.route("/<cid>/sort_by_finished_task", methods=['GET'])
@login_required
def sort_by_finished_task(cid):
"""Sort task by finished one"""
case = CommonModel.get_case(cid)
tags = request.args.get('tags')
or_and_taxo = request.args.get("or_and_taxo")
taxonomies = request.args.get('taxonomies')
galaxies = request.args.get('galaxies')
clusters = request.args.get('clusters')
or_and_galaxies = request.args.get("or_and_galaxies")
return TaskModel.sort_by_status_task_core(case, current_user, taxonomies, galaxies, tags, clusters, or_and_taxo, or_and_galaxies, completed=True)
@task_blueprint.route("/<cid>/tasks/ongoing", methods=['GET'])
@login_required
def ongoing_tasks_sort_by_filter(cid):
"""Sort by filter for living task"""
tags = request.args.get('tags')
or_and_taxo = request.args.get("or_and_taxo")
taxonomies = request.args.get('taxonomies')
galaxies = request.args.get('galaxies')
clusters = request.args.get('clusters')
or_and_galaxies = request.args.get("or_and_galaxies")
filter = request.args.get('filter')
if filter:
case = CommonModel.get_case(cid)
return TaskModel.sort_tasks_by_filter(case, current_user, filter, taxonomies, galaxies, tags, clusters, or_and_taxo, or_and_galaxies, completed=False)
return {"message": "No filter pass"}, 400
@task_blueprint.route("/<cid>/tasks/finished", methods=['GET'])
@login_required
def finished_tasks_sort_by_filter(cid):
"""Sort by filter for finished task"""
tags = request.args.get('tags')
or_and_taxo = request.args.get("or_and_taxo")
taxonomies = request.args.get('taxonomies')
filter = request.args.get('filter')
galaxies = request.args.get('galaxies')
clusters = request.args.get('clusters')
or_and_galaxies = request.args.get("or_and_galaxies")
if filter:
case = CommonModel.get_case(cid)
return TaskModel.sort_tasks_by_filter(case, current_user, filter, taxonomies, galaxies, tags, clusters, or_and_taxo, or_and_galaxies, completed=True)
return {"message": "No filter pass"}, 400
@task_blueprint.route("/<cid>/task/<tid>/notify_user", methods=['POST'])
@login_required
@editor_required
def notify_user(cid, tid):
"""Notify a user about a task"""
if CommonModel.get_case(cid):
if "user_id" in request.json:
user = request.json["user_id"]
task = CommonModel.get_task(tid)
if task:
if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin():
if CaseModel.notify_user(task, user):
return {"message":"User notified", "toast_class": "success-subtle"}, 200
return {"message":"Something goes wrong", "toast_class": "danger-subtle"}, 400
return {"message":"Action not Allowed", "toast_class": "warning-subtle"}, 400
return {"message":"Task not found", "toast_class": "danger-subtle"}, 404
return {"message": "'user_id' is missing", "toast_class": "danger-subtle"}, 404
return {"message": "Case not found", "toast_class": "danger-subtle"}, 404
@task_blueprint.route("/<cid>/task/<tid>/export_notes", methods=['GET'])
@login_required
def export_notes(cid, tid):
"""Export note of a task as pdf"""
case = CommonModel.get_case(cid)
if case:
task = CommonModel.get_task(tid)
if task:
data_dict = dict(request.args)
if "type" in data_dict:
type_req = data_dict["type"]
res = TaskModel.export_notes(task, type_req)
CommonModel.delete_temp_folder()
return res
return {"message": "'type' is missing", 'toast_class': "warning-subtle"}, 400
return {"message": "Task not found", 'toast_class': "danger-subtle"}, 404
return {"message": "Case not found", 'toast_class': "danger-subtle"}, 404
@task_blueprint.route("/get_taxonomies_task/<tid>", methods=['GET'])
@login_required
def get_taxonomies_case(tid):
task = CommonModel.get_task(tid)
if task:
tags = CommonModel.get_task_tags(task.id)
taxonomies = []
if tags:
taxonomies = [tag.split(":")[0] for tag in tags]
return {"tags": tags, "taxonomies": taxonomies}
return {"message": "task Not found", 'toast_class': "danger-subtle"}, 404
@task_blueprint.route("/get_galaxies_task/<tid>", methods=['GET'])
@login_required
def get_galaxies_task(tid):
task = CommonModel.get_task(tid)
if task:
clusters = CommonModel.get_task_clusters(task.id)
galaxies = []
if clusters:
for cluster in clusters:
loc_g = CommonModel.get_galaxy(cluster.galaxy_id)
if not loc_g.name in galaxies:
galaxies.append(loc_g.name)
index = clusters.index(cluster)
clusters[index] = cluster.tag
return {"clusters": clusters, "galaxies": galaxies}
return {"message": "task Not found", 'toast_class': "danger-subtle"}, 404
@task_blueprint.route("/get_connectors", methods=['GET'])
@login_required
def get_connectors():
connectors_list = CommonModel.get_connectors()
connectors_dict = dict()
for connector in connectors_list:
loc = list()
for instance in connector.instances:
if CommonModel.get_user_instance_both(user_id=current_user.id, instance_id=instance.id):
loc.append(instance.to_json())
if loc:
connectors_dict[connector.name] = loc
return jsonify({"connectors": connectors_dict}), 200
@task_blueprint.route("/get_connectors_task/<tid>", methods=['GET'])
@login_required
def get_connectors_task(tid):
task = CommonModel.get_task(tid)
if task:
return {"connectors": [CommonModel.get_instance(task_instance.instance_id).name for task_instance in CommonModel.get_task_connectors(task.id) ]}
return {"message": "task Not found", 'toast_class': "danger-subtle"}, 404

View File

@ -1,555 +0,0 @@
import os
import ast
import uuid
import shutil
import datetime
import subprocess
from .. import db
from ..db_class.db import *
from ..utils.utils import create_specific_dir
from sqlalchemy import desc, and_
from flask import request, send_file
from werkzeug.utils import secure_filename
from ..notification import notification_core as NotifModel
from . import common_core as CommonModel
UPLOAD_FOLDER = os.path.join(os.getcwd(), "uploads")
FILE_FOLDER = os.path.join(UPLOAD_FOLDER, "files")
TEMP_FOLDER = os.path.join(os.getcwd(), "temp")
def delete_task(tid, current_user):
"""Delete a task by is id"""
task = CommonModel.get_task(tid)
if task is not None:
for file in task.files:
try:
os.remove(os.path.join(FILE_FOLDER, file.uuid))
except:
return False
db.session.delete(file)
db.session.commit()
case = CommonModel.get_case(task.case_id)
task_users = Task_User.query.where(Task_User.task_id==task.id).all()
for task_user in task_users:
user = User.query.get(task_user.user_id)
NotifModel.create_notification_user(f"Task '{task.id}-{task.title}' of case '{case.id}-{case.title}' was deleted", task.case_id, user_id=user.id, html_icon="fa-solid fa-trash")
Task_Tags.query.filter_by(task_id=task.id).delete()
Task_Galaxy_Tags.query.filter_by(task_id=task.id).delete()
Task_User.query.filter_by(task_id=task.id).delete()
Task_Connector_Instance.query.filter_by(task_id=task.id).delete()
db.session.delete(task)
CommonModel.update_last_modif(task.case_id)
db.session.commit()
CommonModel.save_history(case.uuid, current_user, f"Task '{task.title}' deleted")
return True
return False
def complete_task(tid, current_user):
"""Complete task by is id"""
task = CommonModel.get_task(tid)
if task is not None:
task.completed = not task.completed
case = CommonModel.get_case(task.case_id)
task_users = Task_User.query.where(Task_User.task_id==task.id).all()
if task.completed:
task.status_id = Status.query.filter_by(name="Finished").first().id
message = f"Task '{task.id}-{task.title}' of case '{case.id}-{case.title}' completed"
else:
task.status_id = Status.query.filter_by(name="Created").first().id
message = f"Task '{task.id}-{task.title}' of case '{case.id}-{case.title}' revived"
for task_user in task_users:
user = User.query.get(task_user.user_id)
if task.completed:
message = f"Task '{task.id}-{task.title}' of case '{case.id}-{case.title}' completed"
NotifModel.create_notification_user(message, task.case_id, user_id=user.id, html_icon="fa-solid fa-check")
else:
message = f"Task '{task.id}-{task.title}' of case '{case.id}-{case.title}' revived"
NotifModel.create_notification_user(message, task.case_id, user_id=user.id, html_icon="fa-solid fa-heart-circle-bolt")
CommonModel.update_last_modif(task.case_id)
CommonModel.update_last_modif_task(task.id)
db.session.commit()
CommonModel.save_history(case.uuid, current_user, f"Task '{task.title}' completed")
return True
return False
def create_task(form_dict, cid, current_user):
"""Add a task to the DB"""
if "template_select" in form_dict and not 0 in form_dict["template_select"]:
template = Task_Template.query.get(form_dict["template_select"])
task = Task(
uuid=str(uuid.uuid4()),
title=template.title,
description=template.description,
url=template.url,
creation_date=datetime.datetime.now(tz=datetime.timezone.utc),
last_modif=datetime.datetime.now(tz=datetime.timezone.utc),
case_id=cid,
status_id=1
)
db.session.add(task)
db.session.commit()
for t_t in Task_Template_Tags.query.filter_by(task_id=task.id).all():
task_tag = Task_Tags(
task_id=task.id,
tag_id=t_t.tag_id
)
db.session.add(task_tag)
db.session.commit()
for t_t in Task_Template_Galaxy_Tags.query.filter_by(task_id=task.id).all():
task_tag = Task_Galaxy_Tags(
task_id=task.id,
cluster_id=t_t.cluster_id
)
db.session.add(task_tag)
db.session.commit()
for t_t in Task_Template_Connector_Instance.query.filter_by(template_id=task.id).all():
task_instance = Task_Connector_Instance(
task_id=task.id,
instance_id=t_t.instance_id
)
db.session.add(task_instance)
db.session.commit()
else:
deadline = CommonModel.deadline_check(form_dict["deadline_date"], form_dict["deadline_time"])
## Check if instance is from current user
for instance in form_dict["connectors"]:
instance = CommonModel.get_instance_by_name(instance)
if not CommonModel.get_user_instance_by_instance(instance.id):
return False
task = Task(
uuid=str(uuid.uuid4()),
title=form_dict["title"],
description=form_dict["description"],
url=form_dict["url"],
creation_date=datetime.datetime.now(tz=datetime.timezone.utc),
last_modif=datetime.datetime.now(tz=datetime.timezone.utc),
deadline=deadline,
case_id=cid,
status_id=1
)
db.session.add(task)
db.session.commit()
for tags in form_dict["tags"]:
tag = CommonModel.get_tag(tags)
task_tag = Task_Tags(
tag_id=tag.id,
task_id=task.id
)
db.session.add(task_tag)
db.session.commit()
for clusters in form_dict["clusters"]:
cluster = CommonModel.get_cluster_by_name(clusters)
task_galaxy_tag = Task_Galaxy_Tags(
cluster_id=cluster.id,
task_id=task.id
)
db.session.add(task_galaxy_tag)
db.session.commit()
for instance in form_dict["connectors"]:
instance = CommonModel.get_instance_by_name(instance)
task_instance = Task_Connector_Instance(
task_id=task.id,
instance_id=instance.id
)
db.session.add(task_instance)
db.session.commit()
CommonModel.update_last_modif(cid)
case = CommonModel.get_case(cid)
CommonModel.save_history(case.uuid, current_user, f"Task '{task.title}' Created")
return task
def edit_task_core(form_dict, tid, current_user):
"""Edit a task to the DB"""
task = CommonModel.get_task(tid)
deadline = CommonModel.deadline_check(form_dict["deadline_date"], form_dict["deadline_time"])
## Check if instance is from current user
for instance in form_dict["connectors"]:
instance = CommonModel.get_instance_by_name(instance)
if not CommonModel.get_user_instance_by_instance(instance.id):
return False
task.title = form_dict["title"]
task.description=form_dict["description"]
task.url=form_dict["url"]
task.deadline=deadline
## Tags
task_tag_db = Task_Tags.query.filter_by(task_id=task.id).all()
for tags in form_dict["tags"]:
tag = CommonModel.get_tag(tags)
if not tags in task_tag_db:
task_tag = Task_Tags(
tag_id=tag.id,
task_id=task.id
)
db.session.add(task_tag)
db.session.commit()
for c_t_db in task_tag_db:
if not c_t_db in form_dict["tags"]:
Task_Tags.query.filter_by(id=c_t_db.id).delete()
db.session.commit()
## Clusters
task_tag_db = Task_Galaxy_Tags.query.filter_by(task_id=task.id).all()
for clusters in form_dict["clusters"]:
cluster = CommonModel.get_cluster_by_name(clusters)
if not clusters in task_tag_db:
task_tag = Task_Galaxy_Tags(
cluster_id=cluster.id,
task_id=task.id
)
db.session.add(task_tag)
db.session.commit()
for c_t_db in task_tag_db:
if not c_t_db in form_dict["clusters"]:
Task_Galaxy_Tags.query.filter_by(id=c_t_db.id).delete()
db.session.commit()
## Connectors
task_connector_db = Task_Connector_Instance.query.filter_by(task_id=task.id).all()
for connectors in form_dict["connectors"]:
instance = CommonModel.get_instance_by_name(connectors)
if not connectors in task_connector_db:
task_tag = Task_Connector_Instance(
instance_id=instance.id,
task_id=task.id
)
db.session.add(task_tag)
db.session.commit()
for c_t_db in task_connector_db:
if not c_t_db in form_dict["connectors"]:
Task_Connector_Instance.query.filter_by(id=c_t_db.id).delete()
db.session.commit()
CommonModel.update_last_modif(task.case_id)
CommonModel.update_last_modif_task(task.id)
db.session.commit()
case = CommonModel.get_case(task.case_id)
CommonModel.save_history(case.uuid, current_user, f"Task '{task.title}' edited")
def add_file_core(task, files_list, current_user):
create_specific_dir(UPLOAD_FOLDER)
create_specific_dir(FILE_FOLDER)
for file in files_list:
if files_list[file].filename:
uuid_loc = str(uuid.uuid4())
filename = secure_filename(files_list[file].filename)
try:
file_data = request.files[file].read()
with open(os.path.join(FILE_FOLDER, uuid_loc), "wb") as write_file:
write_file.write(file_data)
except Exception as e:
print(e)
return False
f = File(
name=filename,
task_id=task.id,
uuid = uuid_loc
)
db.session.add(f)
CommonModel.update_last_modif(task.case_id)
CommonModel.update_last_modif_task(task.id)
db.session.commit()
case = CommonModel.get_case(task.case_id)
CommonModel.save_history(case.uuid, current_user, f"File added for task '{task.title}'")
return True
def modif_note_core(tid, current_user, notes):
"""Modify a noe of a task to the DB"""
task = CommonModel.get_task(tid)
if task:
task.notes = notes
CommonModel.update_last_modif(task.case_id)
CommonModel.update_last_modif_task(task.id)
db.session.commit()
case = CommonModel.get_case(task.case_id)
CommonModel.save_history(case.uuid, current_user, f"Notes for '{task.title}' modified")
return True
return False
def assign_task(tid, user, current_user, flag_current_user):
"""Assign current user to a task"""
task = CommonModel.get_task(tid)
case = CommonModel.get_case(task.case_id)
if task:
if type(user) == str or type(user) == int:
user = User.query.get(user)
task_user = Task_User(task_id=task.id, user_id=user.id)
if not flag_current_user:
NotifModel.create_notification_user(f"You have been assign to: '{task.id}-{task.title}' of case '{case.id}-{case.title}'", task.case_id, user_id=user.id, html_icon="fa-solid fa-hand")
if not Task_User.query.filter_by(task_id=task.id, user_id=user.id).first():
db.session.add(task_user)
CommonModel.update_last_modif(task.case_id)
CommonModel.update_last_modif_task(task.id)
db.session.commit()
CommonModel.save_history(case.uuid, current_user, f"Task '{task.id}-{task.title}' assigned to {user.first_name} {user.last_name}")
return True
return False
return False
def get_users_assign_task(task_id, user):
"""Return users assigned to a task"""
task_users = Task_User.query.filter_by(task_id=task_id).all()
users = list()
flag = False
for task_user in task_users:
u = User.query.get(task_user.user_id)
# if current user is assigned to the task
if u.id == user.id:
flag = True
users.append(u.to_json())
return users, flag
def remove_assign_task(tid, user, current_user, flag_current_user):
"""Remove current user to the assignement to a task"""
task = CommonModel.get_task(tid)
case = CommonModel.get_case(task.case_id)
if task:
if type(user) == int or type(user) == str:
user = User.query.get(user)
task_users = Task_User.query.filter_by(task_id=task.id, user_id=user.id).all()
if not flag_current_user:
NotifModel.create_notification_user(f"Your assignment have been removed: '{task.id}-{task.title}' of case '{case.id}-{case.title}'", task.case_id, user_id=user.id, html_icon="fa-solid fa-handshake-slash")
for task_user in task_users:
db.session.delete(task_user)
CommonModel.update_last_modif(task.case_id)
CommonModel.update_last_modif_task(task.id)
db.session.commit()
CommonModel.save_history(case.uuid, current_user, f"Assignment '{task.title}' removed to {user.first_name} {user.last_name}")
return True
return False
def change_task_status(status, task, current_user):
task.status_id = status
CommonModel.update_last_modif(task.case_id)
CommonModel.update_last_modif_task(task.id)
db.session.commit()
case = CommonModel.get_case(task.case_id)
CommonModel.save_history(case.uuid, current_user, f"Status changed for task '{task.title}'")
return True
def download_file(file):
return send_file(os.path.join(FILE_FOLDER, file.uuid), as_attachment=True, download_name=file.name)
def delete_file(file, task, current_user):
try:
os.remove(os.path.join(FILE_FOLDER, file.uuid))
except:
return False
db.session.delete(file)
db.session.commit()
case = CommonModel.get_case(task.case_id)
CommonModel.save_history(case.uuid, current_user, f"File deleted for task '{task.title}'")
return True
def get_task_info(tasks_list, user):
tasks = list()
for task in tasks_list:
case = CommonModel.get_case(task.case_id)
users, is_current_user_assigned = get_users_assign_task(task.id, user)
file_list = list()
for file in task.files:
file_list.append(file.to_json())
finalTask = task.to_json()
finalTask["users"] = users
finalTask["is_current_user_assigned"] = is_current_user_assigned
finalTask["files"] = file_list
finalTask["case_title"] = case.title
finalTask["instances"] = list()
for task_connector in CommonModel.get_task_connectors(task.id):
loc_instance = CommonModel.get_instance(task_connector.instance_id).to_json()
loc_instance["icon"] = Icon_File.query.join(Connector_Icon, Connector_Icon.file_icon_id==Icon_File.id)\
.join(Connector, Connector.icon_id==Connector_Icon.id)\
.join(Connector_Instance, Connector_Instance.connector_id==Connector.id)\
.where(Connector_Instance.id==task_connector.instance_id)\
.first().uuid
finalTask["instances"].append(loc_instance)
tasks.append(finalTask)
return tasks
def build_task_query(completed, tags=None, taxonomies=None, galaxies=None, clusters=None, filter=None):
query = Task.query
conditions = [Task.completed == completed]
if tags or taxonomies:
query = query.join(Task_Tags, Task_Tags.task_id == Task.id)
query = query.join(Tags, Task_Tags.tag_id == Tags.id)
if tags:
tags = ast.literal_eval(tags)
conditions.append(Tags.name.in_(list(tags)))
if taxonomies:
taxonomies = ast.literal_eval(taxonomies)
query = query.join(Taxonomy, Taxonomy.id == Tags.taxonomy_id)
conditions.append(Taxonomy.name.in_(list(taxonomies)))
if clusters or galaxies:
query = query.join(Task_Galaxy_Tags, Task_Galaxy_Tags.task_id == Task.id)
query = query.join(Cluster, Task_Galaxy_Tags.cluster_id == Cluster.id)
if clusters:
clusters = ast.literal_eval(clusters)
conditions.append(Cluster.name.in_(list(clusters)))
if galaxies:
galaxies = ast.literal_eval(galaxies)
query = query.join(Galaxy, Galaxy.id == Cluster.galaxy_id)
conditions.append(Galaxy.name.in_(list(galaxies)))
if filter:
query.order_by(desc(filter))
return query.filter(and_(*conditions)).all()
def sort_by_status_task_core(case, user, taxonomies=[], galaxies=[], tags=[], clusters=[], or_and_taxo="true", or_and_galaxies="true", completed=False, no_info=False, filter=False):
tasks = build_task_query(completed, tags, taxonomies, galaxies, clusters, filter)
if tags:
tags = ast.literal_eval(tags)
if taxonomies:
taxonomies = ast.literal_eval(taxonomies)
if galaxies:
galaxies = ast.literal_eval(galaxies)
if clusters:
clusters = ast.literal_eval(clusters)
if tags or taxonomies or galaxies or clusters:
if or_and_taxo == "false":
glob_list = []
for task in tasks:
tags_db = task.to_json()["tags"]
loc_tag = [tag["name"] for tag in tags_db]
taxo_list = [Taxonomy.query.get(tag["taxonomy_id"]).name for tag in tags_db]
if (not tags or all(item in loc_tag for item in tags)) and \
(not taxonomies or all(item in taxo_list for item in taxonomies)):
glob_list.append(task)
tasks = glob_list
if or_and_galaxies == "false":
glob_list = []
for task in tasks:
clusters_db = task.to_json()["clusters"]
loc_cluster = [cluster["name"] for cluster in clusters_db]
galaxies_list = [Galaxy.query.get(cluster["galaxy_id"]).name for cluster in clusters_db]
if (not clusters or all(item in loc_cluster for item in clusters)) and \
(not galaxies or all(item in galaxies_list for item in galaxies)):
glob_list.append(task)
tasks = glob_list
else:
tasks = Task.query.filter_by(case_id=case.id, completed=completed).all()
if no_info:
return tasks
return get_task_info(tasks, user)
def sort_tasks_by_filter(case, user, filter, taxonomies=[], galaxies=[], tags=[], clusters=[], or_and_taxo="true", or_and_galaxies="true", completed=False):
tasks_list = sort_by_status_task_core(case, user, taxonomies, galaxies, tags, clusters, or_and_taxo, or_and_galaxies, completed, no_info=True, filter=filter)
loc_list = list()
if filter == "assigned_tasks":
for task in tasks_list:
if Task_User.query.filter_by(task_id=task.id).first():
loc_list.append(task)
tasks_list = loc_list
elif filter == "my_assignment":
for task in tasks_list:
if Task_User.query.filter_by(task_id=task.id, user_id=user.id).first():
task.is_current_user_assigned = True
loc_list.append(task)
tasks_list = loc_list
elif filter == "deadline":
# for deadline filter, only task with a deadline defined is required
loc = list()
for task in tasks_list:
if getattr(task, filter):
loc.append(task)
tasks_list = loc
else:
# status, last_modif, title
tasks_list.sort(key=lambda x: getattr(x, filter))
return get_task_info(tasks_list, user)
def export_notes(task, type_req):
if not os.path.isdir(TEMP_FOLDER):
os.mkdir(TEMP_FOLDER)
download_filename = f"export_note_task_{task.id}.{type_req}"
temp_md = os.path.join(TEMP_FOLDER, "index.md")
temp_export = os.path.join(TEMP_FOLDER, f"output.{type_req}")
with open(temp_md, "w")as write_file:
write_file.write(task.notes)
if type_req == "pdf":
process = subprocess.Popen(["pandoc", temp_md, "--pdf-engine=xelatex", "-V", "colorlinks=true", "-V", "linkcolor=blue", "-V", "urlcolor=red", "-V", "tocolor=gray"\
"--number-sections", "--toc", "--template", "eisvogel", "-o", temp_export, "--filter=pandoc-mermaid"], stdout=subprocess.PIPE)
elif type_req == "docx":
process = subprocess.Popen(["pandoc", temp_md, "-o", temp_export, "--filter=mermaid-filter"], stdout=subprocess.PIPE)
process.wait()
try:
shutil.rmtree(os.path.join(os.getcwd(), "mermaid-images"))
except:
pass
return send_file(temp_export, as_attachment=True, download_name=download_filename)

View File

@ -153,9 +153,13 @@ class Session_class:
db.session.commit()
histories = History.query.all()
while len(histories) > 100:
history = History.query.filter_by(func.min(History.id))
history.delete()
while len(histories) > 3:
history = History.query.order_by(History.id).all()
Session_db.query.filter_by(id=history[0].session_id).delete()
History.query.filter_by(id=history[0].id).delete()
histories = History.query.all()
db.session.commit()
return

View File

@ -11,9 +11,24 @@
<br>
<div v-if="history">
<div v-for="h in history">
[[h]]
</div>
<template v-for="h, key in history">
<div class="list-group" style="margin-bottom: 20px;">
<!-- <a :href="'/query/'+h.uuid" class="list-group-item list-group-item-action"> -->
<a href="#" class="list-group-item list-group-item-action">
<div class="d-flex w-100 justify-content-between">
<h5 class="mb-1">[[key+1]]- [[h.query]]</h5>
<small><i>[[h.uuid]]</i></small>
</div>
<p class="mb-1" style="color: green;"><u>Input</u>:</p>
<div>[[h.input]]</div>
<br>
<p class="mb-1" style="color: #2000ff;"><u>Modules</u>:</p>
<div>
<template v-for="module in h.modules">[[module]],</template>
</div>
</a>
</div>
</template>
</div>
<span id="goTop">[<a href="#top">Go Back Top</a>]</span>