mirror of https://github.com/MISP/misp-modules
chg: [website] History blueprint
parent
0364deccaa
commit
00a7ab6d61
|
@ -28,7 +28,9 @@ def create_app():
|
|||
sess.init_app(app)
|
||||
|
||||
from .home import home_blueprint
|
||||
from .history.history import history_blueprint
|
||||
app.register_blueprint(home_blueprint, url_prefix="/")
|
||||
app.register_blueprint(history_blueprint, url_prefix="/")
|
||||
|
||||
return app
|
||||
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
import json
|
||||
from flask import Flask, Blueprint, render_template, request, jsonify
|
||||
from . import history_core as HistoryModel
|
||||
|
||||
history_blueprint = Blueprint(
|
||||
'history',
|
||||
__name__,
|
||||
template_folder='templates',
|
||||
static_folder='static'
|
||||
)
|
||||
|
||||
|
||||
@history_blueprint.route("/history", methods=["GET"])
|
||||
def history():
|
||||
"""View all history"""
|
||||
return render_template("history.html")
|
||||
|
||||
@history_blueprint.route("/get_history", methods=["GET"])
|
||||
def get_history():
|
||||
"""Get all history"""
|
||||
histories = HistoryModel.get_history()
|
||||
return histories
|
||||
|
||||
@history_blueprint.route("/history_session", methods=["GET"])
|
||||
def history_session():
|
||||
"""View all history"""
|
||||
return render_template("history_session.html", tree_view=False)
|
||||
|
||||
@history_blueprint.route("/get_history_session", methods=["GET"])
|
||||
def get_history_session():
|
||||
"""Get all history"""
|
||||
histories = HistoryModel.get_history_session()
|
||||
if histories:
|
||||
return histories
|
||||
return {}
|
||||
|
||||
@history_blueprint.route("/save_history/<sid>", methods=["GET"])
|
||||
def save_history(sid):
|
||||
return HistoryModel.save_history_core(sid)
|
||||
|
||||
|
||||
@history_blueprint.route("/history_tree", methods=["GET"])
|
||||
def history_tree():
|
||||
"""View all history"""
|
||||
return render_template("history_session.html", tree_view=True)
|
||||
|
||||
@history_blueprint.route("/get_history_tree", methods=["GET"])
|
||||
def get_history_tree():
|
||||
"""Get all history"""
|
||||
histories = HistoryModel.get_history_tree()
|
||||
if histories:
|
||||
return histories
|
||||
return {}
|
|
@ -0,0 +1,158 @@
|
|||
import json
|
||||
from ..utils.utils import isUUID
|
||||
from .. import db
|
||||
from ..db_class.db import History, Session_db, History_Tree
|
||||
from .. import sess
|
||||
from flask import session as sess
|
||||
from sqlalchemy import desc
|
||||
|
||||
|
||||
|
||||
def get_session(sid):
|
||||
"""Return a session by uuid"""
|
||||
return Session_db.query.filter_by(uuid=sid).first()
|
||||
|
||||
|
||||
|
||||
def get_history():
|
||||
"""Return history"""
|
||||
histories_list = list()
|
||||
histories = History.query.order_by(desc(History.id))
|
||||
for history in histories:
|
||||
session = Session_db.query.get(history.session_id)
|
||||
histories_list.append(session.history_json())
|
||||
return histories_list
|
||||
|
||||
|
||||
def util_set_flask_session(parent_id, loc_session, current_session):
|
||||
if parent_id == loc_session["uuid"]:
|
||||
loc_json = {
|
||||
"uuid": current_session.uuid,
|
||||
"modules": current_session.modules_list,
|
||||
"query": current_session.query,
|
||||
"input": current_session.input_query,
|
||||
"query_date": current_session.query_date.strftime('%Y-%m-%d')
|
||||
}
|
||||
loc_session["children"].append(loc_json)
|
||||
return True
|
||||
elif "children" in loc_session:
|
||||
return deep_explore(loc_session["children"], parent_id, current_session)
|
||||
|
||||
def deep_explore(session_dict, parent_id, current_session):
|
||||
for loc_session in session_dict:
|
||||
if not "children" in loc_session:
|
||||
loc_session["children"] = list()
|
||||
if util_set_flask_session(parent_id, loc_session, current_session):
|
||||
return True
|
||||
return False
|
||||
|
||||
def set_flask_session(current_session, parent_id):
|
||||
current_query = sess.get("current_query")
|
||||
if not current_query or current_query not in sess:
|
||||
loc_json = {
|
||||
"uuid": current_session.uuid,
|
||||
"modules": current_session.modules_list,
|
||||
"query": current_session.query,
|
||||
"input": current_session.input_query,
|
||||
"query_date": current_session.query_date.strftime('%Y-%m-%d')
|
||||
}
|
||||
|
||||
sess["current_query"] = current_session.uuid
|
||||
sess[sess.get("current_query")] = loc_json
|
||||
sess[sess.get("current_query")]["children"] = list()
|
||||
else:
|
||||
# sess["uuid"]
|
||||
loc_session = sess.get(sess.get("current_query"))
|
||||
if not "children" in loc_session:
|
||||
loc_session["children"] = list()
|
||||
if not util_set_flask_session(parent_id, loc_session, current_session):
|
||||
sess["current_query"] = current_session.uuid
|
||||
|
||||
def get_history_session():
|
||||
current_query = sess.get("current_query")
|
||||
loc_list = list()
|
||||
if current_query:
|
||||
# If current query have no children then don't display it
|
||||
# It's already save in history
|
||||
# Only parent-child tree structure is in flask session
|
||||
current_query_value = sess.get(sess.get("current_query"))
|
||||
if current_query_value and current_query_value["children"]:
|
||||
loc_list.append(current_query_value)
|
||||
for q in sess:
|
||||
if isUUID(q):
|
||||
# If query have no children then don't display it
|
||||
q_value = sess.get(q)
|
||||
if q_value["children"]:
|
||||
if not q == current_query:
|
||||
loc_list.append(q_value)
|
||||
|
||||
return loc_list
|
||||
|
||||
|
||||
|
||||
|
||||
def util_save_history(session):
|
||||
loc_dict = dict()
|
||||
loc_dict[session["uuid"]] = []
|
||||
|
||||
if "children" in session and session["children"]:
|
||||
for child in session["children"]:
|
||||
loc_dict[session["uuid"]].append(util_save_history(child))
|
||||
return loc_dict
|
||||
|
||||
|
||||
def save_history_core(sid):
|
||||
"""Save history from session to db"""
|
||||
if sid in sess:
|
||||
session = sess.get(sid)
|
||||
# Doesn't already exist
|
||||
history_tree_db = History_Tree.query.filter_by(session_uuid=session["uuid"]).first()
|
||||
if not history_tree_db:
|
||||
if "children" in session and session["children"]:
|
||||
# Get all children before add to db
|
||||
loc_dict = util_save_history(session)
|
||||
h = History_Tree(
|
||||
session_uuid = session["uuid"],
|
||||
tree=json.dumps(loc_dict)
|
||||
)
|
||||
db.session.add(h)
|
||||
db.session.commit()
|
||||
return {"message": "History Save", 'toast_class': "success-subtle"}
|
||||
return {"message": "No children", 'toast_class': "warning-subtle"}
|
||||
# Save same session but with new value
|
||||
elif not json.loads(history_tree_db.tree) == session:
|
||||
if "children" in session and session["children"]:
|
||||
# Get all children before add to db
|
||||
loc_dict = util_save_history(session)
|
||||
history_tree_db.tree = json.dumps(loc_dict)
|
||||
db.session.commit()
|
||||
return {"message": "History updated", 'toast_class': "success-subtle"}
|
||||
return {"message": "History already saved", 'toast_class': "warning-subtle"}
|
||||
return {"message": "Session not found", 'toast_class': "danger-subtle"}
|
||||
|
||||
|
||||
|
||||
def util_get_history_tree(child):
|
||||
loc_child = list(child.keys())[0]
|
||||
loc_session = get_session(loc_child)
|
||||
loc_json = loc_session.history_json()
|
||||
loc_json["children"] = list()
|
||||
if child[loc_child]:
|
||||
for s_child in child[loc_child]:
|
||||
loc_json["children"].append(util_get_history_tree(s_child))
|
||||
return loc_json
|
||||
|
||||
def get_history_tree():
|
||||
"""Return all histories saved as tree"""
|
||||
histories_tree = History_Tree.query.order_by(desc(History_Tree.id))
|
||||
loc_dict = list()
|
||||
for history_tree in histories_tree:
|
||||
tree = json.loads(history_tree.tree)
|
||||
print(history_tree.session_uuid)
|
||||
loc_session = get_session(history_tree.session_uuid)
|
||||
loc_json = loc_session.history_json()
|
||||
loc_json["children"] = list()
|
||||
for child in tree[history_tree.session_uuid]:
|
||||
loc_json["children"].append(util_get_history_tree(child))
|
||||
loc_dict.append(loc_json)
|
||||
return loc_dict
|
|
@ -142,46 +142,3 @@ def change_status():
|
|||
return {'message': 'Something went wrong', 'toast_class': "danger-subtle"}, 400
|
||||
return {'message': 'Need to pass "module_id"', 'toast_class': "warning-subtle"}, 400
|
||||
|
||||
|
||||
|
||||
@home_blueprint.route("/history", methods=["GET"])
|
||||
def history():
|
||||
"""View all history"""
|
||||
return render_template("history.html")
|
||||
|
||||
@home_blueprint.route("/get_history", methods=["GET"])
|
||||
def get_history():
|
||||
"""Get all history"""
|
||||
histories = HomeModel.get_history()
|
||||
return histories
|
||||
|
||||
@home_blueprint.route("/history_session", methods=["GET"])
|
||||
def history_session():
|
||||
"""View all history"""
|
||||
return render_template("history_session.html", tree_view=False)
|
||||
|
||||
@home_blueprint.route("/get_history_session", methods=["GET"])
|
||||
def get_history_session():
|
||||
"""Get all history"""
|
||||
histories = HomeModel.get_history_session()
|
||||
if histories:
|
||||
return histories
|
||||
return {}
|
||||
|
||||
@home_blueprint.route("/save_history/<sid>", methods=["GET"])
|
||||
def save_history(sid):
|
||||
return HomeModel.save_history_core(sid)
|
||||
|
||||
|
||||
@home_blueprint.route("/history_tree", methods=["GET"])
|
||||
def history_tree():
|
||||
"""View all history"""
|
||||
return render_template("history_session.html", tree_view=True)
|
||||
|
||||
@home_blueprint.route("/get_history_tree", methods=["GET"])
|
||||
def get_history_tree():
|
||||
"""Get all history"""
|
||||
histories = HomeModel.get_history_tree()
|
||||
if histories:
|
||||
return histories
|
||||
return {}
|
|
@ -147,136 +147,3 @@ def get_history():
|
|||
histories_list.append(session.history_json())
|
||||
return histories_list
|
||||
|
||||
|
||||
def util_set_flask_session(parent_id, loc_session, current_session):
|
||||
if parent_id == loc_session["uuid"]:
|
||||
loc_json = {
|
||||
"uuid": current_session.uuid,
|
||||
"modules": current_session.modules_list,
|
||||
"query": current_session.query,
|
||||
"input": current_session.input_query,
|
||||
"query_date": current_session.query_date.strftime('%Y-%m-%d')
|
||||
}
|
||||
loc_session["children"].append(loc_json)
|
||||
return True
|
||||
elif "children" in loc_session:
|
||||
return deep_explore(loc_session["children"], parent_id, current_session)
|
||||
|
||||
def deep_explore(session_dict, parent_id, current_session):
|
||||
for loc_session in session_dict:
|
||||
if not "children" in loc_session:
|
||||
loc_session["children"] = list()
|
||||
if util_set_flask_session(parent_id, loc_session, current_session):
|
||||
return True
|
||||
return False
|
||||
|
||||
def set_flask_session(current_session, parent_id):
|
||||
current_query = sess.get("current_query")
|
||||
if not current_query or current_query not in sess:
|
||||
loc_json = {
|
||||
"uuid": current_session.uuid,
|
||||
"modules": current_session.modules_list,
|
||||
"query": current_session.query,
|
||||
"input": current_session.input_query,
|
||||
"query_date": current_session.query_date.strftime('%Y-%m-%d')
|
||||
}
|
||||
|
||||
sess["current_query"] = current_session.uuid
|
||||
sess[sess.get("current_query")] = loc_json
|
||||
sess[sess.get("current_query")]["children"] = list()
|
||||
else:
|
||||
# sess["uuid"]
|
||||
loc_session = sess.get(sess.get("current_query"))
|
||||
if not "children" in loc_session:
|
||||
loc_session["children"] = list()
|
||||
if not util_set_flask_session(parent_id, loc_session, current_session):
|
||||
sess["current_query"] = current_session.uuid
|
||||
|
||||
def get_history_session():
|
||||
current_query = sess.get("current_query")
|
||||
loc_list = list()
|
||||
if current_query:
|
||||
# If current query have no children then don't display it
|
||||
# It's already save in history
|
||||
# Only parent-child tree structure is in flask session
|
||||
current_query_value = sess.get(sess.get("current_query"))
|
||||
if current_query_value and current_query_value["children"]:
|
||||
loc_list.append(current_query_value)
|
||||
for q in sess:
|
||||
if isUUID(q):
|
||||
# If query have no children then don't display it
|
||||
q_value = sess.get(q)
|
||||
if q_value["children"]:
|
||||
if not q == current_query:
|
||||
loc_list.append(q_value)
|
||||
|
||||
return loc_list
|
||||
|
||||
|
||||
|
||||
|
||||
def util_save_history(session):
|
||||
loc_dict = dict()
|
||||
loc_dict[session["uuid"]] = []
|
||||
|
||||
if "children" in session and session["children"]:
|
||||
for child in session["children"]:
|
||||
loc_dict[session["uuid"]].append(util_save_history(child))
|
||||
return loc_dict
|
||||
|
||||
|
||||
def save_history_core(sid):
|
||||
"""Save history from session to db"""
|
||||
if sid in sess:
|
||||
session = sess.get(sid)
|
||||
# Doesn't already exist
|
||||
history_tree_db = History_Tree.query.filter_by(session_uuid=session["uuid"]).first()
|
||||
if not history_tree_db:
|
||||
if "children" in session and session["children"]:
|
||||
# Get all children before add to db
|
||||
loc_dict = util_save_history(session)
|
||||
h = History_Tree(
|
||||
session_uuid = session["uuid"],
|
||||
tree=json.dumps(loc_dict)
|
||||
)
|
||||
db.session.add(h)
|
||||
db.session.commit()
|
||||
return {"message": "History Save", 'toast_class': "success-subtle"}
|
||||
return {"message": "No children", 'toast_class': "warning-subtle"}
|
||||
# Save same session but with new value
|
||||
elif not json.loads(history_tree_db.tree) == session:
|
||||
if "children" in session and session["children"]:
|
||||
# Get all children before add to db
|
||||
loc_dict = util_save_history(session)
|
||||
history_tree_db.tree = json.dumps(loc_dict)
|
||||
db.session.commit()
|
||||
return {"message": "History updated", 'toast_class': "success-subtle"}
|
||||
return {"message": "History already saved", 'toast_class': "warning-subtle"}
|
||||
return {"message": "Session not found", 'toast_class': "danger-subtle"}
|
||||
|
||||
|
||||
|
||||
def util_get_history_tree(child):
|
||||
loc_child = list(child.keys())[0]
|
||||
loc_session = get_session(loc_child)
|
||||
loc_json = loc_session.history_json()
|
||||
loc_json["children"] = list()
|
||||
if child[loc_child]:
|
||||
for s_child in child[loc_child]:
|
||||
loc_json["children"].append(util_get_history_tree(s_child))
|
||||
return loc_json
|
||||
|
||||
def get_history_tree():
|
||||
"""Return all histories saved as tree"""
|
||||
histories_tree = History_Tree.query.order_by(desc(History_Tree.id))
|
||||
loc_dict = list()
|
||||
for history_tree in histories_tree:
|
||||
tree = json.loads(history_tree.tree)
|
||||
|
||||
loc_session = get_session(history_tree.session_uuid)
|
||||
loc_json = loc_session.history_json()
|
||||
loc_json["children"] = list()
|
||||
for child in tree[history_tree.session_uuid]:
|
||||
loc_json["children"].append(util_get_history_tree(child))
|
||||
loc_dict.append(loc_json)
|
||||
return loc_dict
|
||||
|
|
|
@ -7,7 +7,8 @@ from .utils.utils import query_post_query, query_get_module
|
|||
from . import home_core as HomeModel
|
||||
import uuid
|
||||
from . import db
|
||||
from .db_class.db import History, Session_db
|
||||
from .db_class.db import History, History_Tree, Session_db
|
||||
from . import sess
|
||||
|
||||
|
||||
sessions = list()
|
||||
|
@ -144,8 +145,10 @@ class Session_class:
|
|||
|
||||
histories = History.query.all()
|
||||
|
||||
while len(histories) > 10:
|
||||
while len(histories) > 200:
|
||||
history = History.query.order_by(History.id).all()
|
||||
session = Session_db.query.filter_by(id=history[0].session_id)
|
||||
if not History_Tree.query.filter_by(session_uuid=session.uuid):
|
||||
Session_db.query.filter_by(id=history[0].session_id).delete()
|
||||
History.query.filter_by(id=history[0].id).delete()
|
||||
|
||||
|
|
Loading…
Reference in New Issue