AIL-framework/var/www/blueprints/import_export.py

211 lines
7.9 KiB
Python
Raw Normal View History

2020-02-17 10:52:25 +01:00
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
'''
Blueprint Flask: MISP format import export
'''
import os
import sys
2020-02-18 11:27:00 +01:00
import uuid
2020-02-17 10:52:25 +01:00
import json
import random
2020-02-17 17:01:16 +01:00
from flask import Flask, render_template, jsonify, request, Blueprint, redirect, url_for, Response, send_file
2020-02-17 10:52:25 +01:00
from flask_login import login_required, current_user, login_user, logout_user
sys.path.append('modules')
import Flask_config
# Import Role_Manager
from Role_Manager import create_user_db, check_password_strength, check_user_role_integrity
from Role_Manager import login_admin, login_analyst, login_read_only
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'export'))
import MispImport
import MispExport
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib'))
import Correlate_object
import AILObjects
2020-02-17 10:52:25 +01:00
# ============ BLUEPRINT ============
import_export = Blueprint('import_export', __name__, template_folder=os.path.join(os.environ['AIL_FLASK'], 'templates/import_export'))
# ============ VARIABLES ============
# ============ FUNCTIONS ============
# ============= ROUTES ==============
@import_export.route('/import_export/import')
@login_required
@login_analyst
def import_object():
return render_template("import_object.html")
2020-02-17 10:52:25 +01:00
@import_export.route("/import_export/import_file", methods=['POST'])
@login_required
@login_analyst
def import_object_file():
error = None
2020-02-17 10:52:25 +01:00
is_file = False
if 'file' in request.files:
file = request.files['file']
if file:
if file.filename:
is_file = True
all_imported_obj = []
2020-02-17 10:52:25 +01:00
if is_file:
filename = MispImport.sanitize_import_file_path(file.filename)
file.save(filename)
map_uuid_global_id = MispImport.import_objs_from_file(filename)
os.remove(filename)
for obj_uuid in map_uuid_global_id:
dict_obj = Correlate_object.get_global_id_from_id(map_uuid_global_id[obj_uuid])
dict_obj['uuid'] = obj_uuid
dict_obj['url'] = Correlate_object.get_item_url(dict_obj['type'], dict_obj['id'], correlation_type=dict_obj['subtype'])
dict_obj['node'] = Correlate_object.get_correlation_node_icon(dict_obj['type'], correlation_type=dict_obj['subtype'], value=dict_obj['id'])
all_imported_obj.append(dict_obj)
2020-02-17 10:52:25 +01:00
if not all_imported_obj:
error = "error: Empty or invalid JSON file"
return render_template("import_object.html", all_imported_obj=all_imported_obj, error=error)
2020-02-17 10:52:25 +01:00
@import_export.route('/import_export/export')
@login_required
@login_analyst
def export_object():
user_id = current_user.get_id()
# get user saved obj to export
l_obj_to_export = AILObjects.get_user_list_of_obj_to_export(user_id)
return render_template("export_object.html", l_obj_to_export=l_obj_to_export)
2020-02-17 17:01:16 +01:00
@import_export.route("/import_export/export_file", methods=['POST'])
@login_required
@login_analyst
def export_object_file():
user_id = current_user.get_id()
2020-02-17 17:01:16 +01:00
l_obj_to_export = []
l_obj_invalid = []
export_to_misp = False
dict_misp_event_export = {}
# Get new added Object
2020-02-17 17:01:16 +01:00
for obj_tuple in list(request.form):
l_input = request.form.getlist(obj_tuple)
if len(l_input) == 3:
obj_type = l_input[0]
obj_id = l_input[1]
lvl = l_input[2]
lvl = MispExport.sanitize_obj_export_lvl(lvl)
obj_subtype = obj_type.split(';')
if len(obj_subtype) == 2:
obj_type = obj_subtype[0]
obj_subtype = obj_subtype[1]
else:
obj_subtype = None
obj_dict = {'id': obj_id, 'type': obj_type, 'lvl': lvl}
if obj_subtype:
obj_dict['subtype'] = obj_subtype
if MispExport.is_valid_obj_to_export(obj_type, obj_subtype, obj_id):
l_obj_to_export.append(obj_dict)
AILObjects.add_user_object_to_export(user_id, obj_dict['type'], obj_dict['id'], obj_dict['lvl'], obj_subtype=obj_dict.get('subtype', None))
2020-02-17 17:01:16 +01:00
else:
2020-02-18 11:27:00 +01:00
if obj_id:
l_obj_invalid.append(obj_dict)
else:
dict_misp_event_export[str(obj_tuple)] = request.form.get(obj_tuple)
if dict_misp_event_export.get('export_to_misp', None):
export_to_misp = True
else:
dict_misp_event_export = None
2020-02-18 11:27:00 +01:00
if l_obj_invalid:
# get user saved obj to export # # TODO: # performance
l_obj_to_export = AILObjects.get_user_list_of_obj_to_export(user_id)
for obj_dict in l_obj_invalid: # set uuid input
2020-02-18 11:27:00 +01:00
obj_dict['uuid'] = str(uuid.uuid4())
obj_dict['type'] = Correlate_object.get_obj_str_type_subtype(obj_dict['type'], obj_dict.get('subtype', None))
return render_template("export_object.html", l_obj_to_export=l_obj_to_export,
l_obj_invalid=l_obj_invalid, dict_misp_event_export=dict_misp_event_export)
2020-02-18 11:27:00 +01:00
else:
2020-02-21 11:03:28 +01:00
if export_to_misp and MispExport.ping_misp():
event = MispExport.create_list_of_objs_to_export(l_obj_to_export, r_type='event')
2020-02-21 15:47:05 +01:00
event_metadata = MispExport.create_misp_event(event, distribution=dict_misp_event_export.get('export_to_misp', None),
threat_level_id=dict_misp_event_export.get('misp_threat_level_id', None),
publish=dict_misp_event_export.get('misp_publish', None),
analysis=dict_misp_event_export.get('misp_event_analysis', None),
event_info=dict_misp_event_export.get('misp_event_info', None))
AILObjects.delete_all_user_object_to_export(user_id)
return render_template("export_object.html", l_obj_to_export=l_obj_to_export,
event_metadata=event_metadata,
l_obj_invalid=[], dict_misp_event_export=[])
else:
# get user saved obj to export # # TODO: # performance
json_export = MispExport.create_list_of_objs_to_export(l_obj_to_export)
export_filename = MispExport.get_export_filename(json_export)
2020-02-21 14:35:34 +01:00
json_export = MispExport.create_in_memory_file(json_export.to_json())
AILObjects.delete_all_user_object_to_export(user_id)
return send_file(json_export, as_attachment=True, attachment_filename=export_filename)
@import_export.route("/import_export/add_object_id_to_export", methods=['GET'])
@login_required
@login_analyst
def add_object_id_to_export():
user_id = current_user.get_id()
user_id = current_user.get_id()
obj_type = request.args.get('obj_type')
obj_id = request.args.get('obj_id')
obj_subtype = request.args.get('obj_subtype')
obj_lvl = request.args.get('obj_lvl')
AILObjects.add_user_object_to_export(user_id, obj_type, obj_id, obj_lvl, obj_subtype=obj_subtype)
# redirect
return redirect(url_for('import_export.export_object'))
@import_export.route("/import_export/investigation", methods=['GET'])
@login_required
@login_analyst
def export_investigation():
investigation_uuid = request.args.get("uuid")
if MispExport.ping_misp():
event_metadata = MispExport.create_investigation_event(investigation_uuid)
else:
return Response(json.dumps({"error": "Can't reach MISP Instance"}, indent=2, sort_keys=True), mimetype='application/json'), 400
return redirect(url_for('investigations_b.show_investigation', uuid=investigation_uuid))
# @import_export.route("/import_export/delete_object_id_to_export", methods=['GET'])
# @login_required
# @login_analyst
# def delete_object_id_to_export():
# user_id = current_user.get_id()
# obj_type = request.args.get('obj_type')
# obj_id = request.args.get('obj_id')
# obj_subtype = request.args.get('obj_subtype')
# obj_lvl = request.args.get('obj_lvl')
# AILObjects.delete_user_object_to_export(user_id, object_type, object_id, obj_lvl, obj_subtype=obj_subtype)
# # redirect
# return 'ok'