mirror of https://github.com/CIRCL/AIL-framework
				
				
				
			
		
			
				
	
	
		
			240 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			240 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Python
		
	
	
#!/usr/bin/env python3
 | 
						|
# -*-coding:UTF-8 -*
 | 
						|
 | 
						|
"""
 | 
						|
    Blueprint Flask: MISP format import export
 | 
						|
"""
 | 
						|
import io
 | 
						|
import os
 | 
						|
import sys
 | 
						|
import json
 | 
						|
 | 
						|
from flask import render_template, jsonify, request, Blueprint, redirect, url_for, Response, send_file, abort
 | 
						|
from flask_login import login_required, current_user
 | 
						|
 | 
						|
sys.path.append('modules')
 | 
						|
 | 
						|
# Import Role_Manager
 | 
						|
from Role_Manager import login_admin, login_analyst, login_read_only
 | 
						|
 | 
						|
sys.path.append(os.environ['AIL_BIN'])
 | 
						|
##################################
 | 
						|
# Import Project packages
 | 
						|
##################################
 | 
						|
from exporter import MISPExporter
 | 
						|
from exporter import TheHiveExporter
 | 
						|
from lib.objects import ail_objects
 | 
						|
from lib.Investigations import Investigation
 | 
						|
 | 
						|
# TODO REMOVE ME
 | 
						|
from export import MispImport  # TODO REMOVE ME
 | 
						|
 | 
						|
# TODO REMOVE ME
 | 
						|
 | 
						|
# ============ BLUEPRINT ============
 | 
						|
import_export = Blueprint('import_export', __name__,
 | 
						|
                          template_folder=os.path.join(os.environ['AIL_FLASK'], 'templates/import_export'))
 | 
						|
 | 
						|
# ============ VARIABLES ============
 | 
						|
misp_exporter_objects = MISPExporter.MISPExporterAILObjects()
 | 
						|
misp_exporter_investigation = MISPExporter.MISPExporterInvestigation()
 | 
						|
 | 
						|
thehive_exporter_item = TheHiveExporter.TheHiveExporterItem()
 | 
						|
 | 
						|
 | 
						|
# ============ FUNCTIONS ============
 | 
						|
 | 
						|
 | 
						|
# ============= ROUTES ==============
 | 
						|
@import_export.route('/import_export/import')
 | 
						|
@login_required
 | 
						|
@login_analyst
 | 
						|
def import_object():
 | 
						|
    return render_template("import_object.html")
 | 
						|
 | 
						|
 | 
						|
# TODO
 | 
						|
@import_export.route("/import_export/import_file", methods=['POST'])
 | 
						|
@login_required
 | 
						|
@login_analyst
 | 
						|
def import_object_file():
 | 
						|
    error = None
 | 
						|
 | 
						|
    is_file = False
 | 
						|
    if 'file' in request.files:
 | 
						|
        file = request.files['file']
 | 
						|
        if file:
 | 
						|
            if file.filename:
 | 
						|
                is_file = True
 | 
						|
 | 
						|
    all_imported_obj = []
 | 
						|
    if is_file:
 | 
						|
        filename = MispImport.sanitize_import_file_path(file.filename)
 | 
						|
        file.save(filename)
 | 
						|
        map_uuid_global_id = MispImport.import_objs_from_file(filename)
 | 
						|
        os.remove(filename)
 | 
						|
        for obj_uuid in map_uuid_global_id:
 | 
						|
            dict_obj = MispImport.get_global_id_from_id(map_uuid_global_id[obj_uuid])
 | 
						|
            obj = ail_objects.get_object(dict_obj['type'], dict_obj['subtype'], dict_obj['id'])
 | 
						|
            dict_obj['uuid'] = obj_uuid
 | 
						|
            dict_obj['url'] = obj.get_link(flask_context=True)
 | 
						|
            dict_obj['node'] = obj.get_svg_icon()
 | 
						|
            all_imported_obj.append(dict_obj)
 | 
						|
 | 
						|
        if not all_imported_obj:
 | 
						|
            error = "error: Empty or invalid JSON file"
 | 
						|
 | 
						|
    return render_template("import_object.html", all_imported_obj=all_imported_obj, error=error)
 | 
						|
 | 
						|
 | 
						|
@import_export.route("/misp/objects/export", methods=['GET'])
 | 
						|
@login_required
 | 
						|
@login_analyst
 | 
						|
def objects_misp_export():
 | 
						|
    user_id = current_user.get_id()
 | 
						|
    object_types = ail_objects.get_all_objects_with_subtypes_tuple()
 | 
						|
    to_export = MISPExporter.get_user_misp_objects_to_export(user_id)
 | 
						|
    return render_template("export_object.html", object_types=object_types, to_export=to_export)
 | 
						|
 | 
						|
 | 
						|
@import_export.route("/misp/objects/export/post", methods=['POST'])
 | 
						|
@login_required
 | 
						|
@login_analyst
 | 
						|
def objects_misp_export_post():
 | 
						|
    user_id = current_user.get_id()
 | 
						|
 | 
						|
    # Get new added Object
 | 
						|
    new_export = []
 | 
						|
    user_lvl_export = {}
 | 
						|
    for obj_tuple in list(request.form):
 | 
						|
        # GET NEW LEVEL
 | 
						|
        if obj_tuple[0] == '{':
 | 
						|
            obj_j = obj_tuple.replace("'", "\"")
 | 
						|
            obj_dict = json.loads(obj_j)  # TODO sanitize
 | 
						|
            lvl = int(request.form.getlist(obj_tuple)[0])
 | 
						|
            user_lvl_export[f"{obj_dict['type']}:{obj_dict['subtype']}:{obj_dict['id']}"] = lvl
 | 
						|
 | 
						|
        l_input = request.form.getlist(obj_tuple)
 | 
						|
        if len(l_input) == 3:
 | 
						|
            if l_input[0] != 'Object type...':
 | 
						|
                new_type, new_subtype = l_input[0].split(':', 1)
 | 
						|
                if not new_subtype:
 | 
						|
                    new_subtype = ''
 | 
						|
                new_export.append({'type': new_type, 'subtype': new_subtype, 'id': l_input[1], 'lvl': l_input[2]})
 | 
						|
 | 
						|
    objects = []
 | 
						|
    invalid_obj = []
 | 
						|
    for obj in new_export:
 | 
						|
        if not ail_objects.exists_obj(obj['type'], obj['subtype'], obj['id']):
 | 
						|
            invalid_obj.append(obj)
 | 
						|
        else:
 | 
						|
            objects.append(obj)
 | 
						|
    for obj in MISPExporter.get_user_misp_objects_to_export(user_id):
 | 
						|
        if not ail_objects.exists_obj(obj['type'], obj['subtype'], obj['id']):
 | 
						|
            invalid_obj.append(obj)
 | 
						|
        else:
 | 
						|
            str_id = f"{obj['type']}:{obj['subtype']}:{obj['id']}"
 | 
						|
            if str_id in user_lvl_export:
 | 
						|
                obj['lvl'] = user_lvl_export[str_id]
 | 
						|
            objects.append(obj)
 | 
						|
 | 
						|
    if invalid_obj:
 | 
						|
        object_types = ail_objects.get_all_objects_with_subtypes_tuple()
 | 
						|
        return render_template("export_object.html", object_types=object_types,
 | 
						|
                               to_export=objects, l_obj_invalid=invalid_obj)
 | 
						|
 | 
						|
    export = request.form.get('export_to_misp', False)
 | 
						|
    distribution = request.form.get('misp_event_distribution')
 | 
						|
    threat_level = request.form.get('threat_level_id')
 | 
						|
    analysis = request.form.get('misp_event_analysis')
 | 
						|
    info = request.form.get('misp_event_info')
 | 
						|
    publish = request.form.get('misp_event_info', False)
 | 
						|
 | 
						|
    objs = ail_objects.get_objects(objects)
 | 
						|
    event = misp_exporter_objects.create_event(objs, distribution=distribution, threat_level=threat_level,
 | 
						|
                                               analysis=analysis, info=info, export=export, publish=publish)
 | 
						|
 | 
						|
    MISPExporter.delete_user_misp_objects_to_export(user_id)
 | 
						|
 | 
						|
    if not export:
 | 
						|
        return send_file(io.BytesIO(event['event'].encode()), as_attachment=True,
 | 
						|
                         download_name=f'ail_export_{event["uuid"]}.json')
 | 
						|
    else:
 | 
						|
        object_types = ail_objects.get_all_objects_with_subtypes_tuple()
 | 
						|
        return render_template("export_object.html", object_types=object_types,
 | 
						|
                               misp_url=event['url'])
 | 
						|
 | 
						|
 | 
						|
@import_export.route("/misp/objects/export/add", methods=['GET'])
 | 
						|
@login_required
 | 
						|
@login_analyst
 | 
						|
def add_object_id_to_export():
 | 
						|
    user_id = current_user.get_id()
 | 
						|
    obj_type = request.args.get('type')
 | 
						|
    obj_id = request.args.get('id')
 | 
						|
    obj_subtype = request.args.get('subtype')
 | 
						|
    obj_lvl = request.args.get('lvl')
 | 
						|
 | 
						|
    try:
 | 
						|
        obj_lvl = int(obj_lvl)
 | 
						|
    except:
 | 
						|
        obj_lvl = 0
 | 
						|
 | 
						|
    if not ail_objects.exists_obj(obj_type, obj_subtype, obj_id):
 | 
						|
        abort(404)
 | 
						|
    MISPExporter.add_user_misp_object_to_export(user_id, obj_type, obj_subtype, obj_id, lvl=obj_lvl)
 | 
						|
    # redirect
 | 
						|
    return redirect(url_for('import_export.objects_misp_export'))
 | 
						|
 | 
						|
 | 
						|
@import_export.route("/misp/objects/export/delete", methods=['GET'])
 | 
						|
@login_required
 | 
						|
@login_analyst
 | 
						|
def delete_object_id_to_export():
 | 
						|
    user_id = current_user.get_id()
 | 
						|
    obj_type = request.args.get('type')
 | 
						|
    obj_id = request.args.get('id')
 | 
						|
    obj_subtype = request.args.get('subtype')
 | 
						|
 | 
						|
    MISPExporter.delete_user_misp_object_to_export(user_id, obj_type, obj_subtype, obj_id)
 | 
						|
    return jsonify(success=True)
 | 
						|
 | 
						|
 | 
						|
@import_export.route("/investigation/misp/export", methods=['GET'])
 | 
						|
@login_required
 | 
						|
@login_analyst
 | 
						|
def export_investigation():
 | 
						|
    investigation_uuid = request.args.get("uuid")
 | 
						|
    investigation = Investigation(investigation_uuid)
 | 
						|
    if not investigation.exists():
 | 
						|
        abort(404)
 | 
						|
    if misp_exporter_objects.ping_misp():
 | 
						|
        event = misp_exporter_investigation.export(investigation)
 | 
						|
        print(event)
 | 
						|
    else:
 | 
						|
        return Response(json.dumps({"error": "Can't reach MISP Instance"}, indent=2, sort_keys=True),
 | 
						|
                        mimetype='application/json'), 400
 | 
						|
    return redirect(url_for('investigations_b.show_investigation', uuid=investigation_uuid))
 | 
						|
 | 
						|
 | 
						|
@import_export.route("/thehive/objects/case/export", methods=['POST'])
 | 
						|
@login_required
 | 
						|
@login_analyst
 | 
						|
def create_thehive_case():
 | 
						|
    description = request.form['hive_description']
 | 
						|
    title = request.form['hive_case_title']
 | 
						|
    threat_level = request.form['threat_level_hive']
 | 
						|
    tlp = request.form['hive_tlp']
 | 
						|
    item_id = request.form['obj_id']
 | 
						|
 | 
						|
    item = ail_objects.get_object('item', '', item_id)
 | 
						|
    if not item.exists():
 | 
						|
        abort(404)
 | 
						|
 | 
						|
    case_id = thehive_exporter_item.export(item.get_id(), description=description, title=title,
 | 
						|
                                           threat_level=threat_level, tlp=tlp)
 | 
						|
    if case_id:
 | 
						|
        return redirect(thehive_exporter_item.get_case_url(case_id))
 | 
						|
    else:
 | 
						|
        return 'error'
 |