From a4bcc15db0251dcf5e7933b71376139bb6a41b09 Mon Sep 17 00:00:00 2001 From: Milo Volpicelli Date: Thu, 26 Oct 2023 15:47:22 +0000 Subject: [PATCH] enriches with c25 MISP objects --- .../modules/expansion/cluster25_expand.py | 140 +++++++++++++++--- 1 file changed, 116 insertions(+), 24 deletions(-) diff --git a/misp_modules/modules/expansion/cluster25_expand.py b/misp_modules/modules/expansion/cluster25_expand.py index ace8d8b..f777e3b 100755 --- a/misp_modules/modules/expansion/cluster25_expand.py +++ b/misp_modules/modules/expansion/cluster25_expand.py @@ -1,9 +1,8 @@ import json import requests -import time -from typing import List -from . import standard_error_message -from pymisp import MISPAttribute, MISPEvent +import uuid +from . import check_input_attribute, standard_error_message +from pymisp import MISPAttribute, MISPEvent, MISPObject moduleinfo = {'version': '0.1', 'author': 'Milo Volpicelli', @@ -11,17 +10,19 @@ moduleinfo = {'version': '0.1', 'module-type': ['expansion', 'hover']} moduleconfig = ['api_id', 'apikey', 'base_url'] misperrors = {'error': 'Error'} -misp_type_in = ['domain', 'email', 'filename', 'md5', 'sha1', 'sha256', 'ip', 'url', 'vulnerability', 'btc', - 'xmr', 'ja3-fingerprint-md5'] +misp_type_in = ['domain', 'email-src', 'email-dst', 'filename', 'md5', 'sha1', 'sha256', 'ip-src', 'ip-dst', 'url', + 'vulnerability', 'btc', 'xmr', 'ja3-fingerprint-md5'] + mapping_out = { # mapping between the MISP attributes type and the compatible Cluster25 indicator types. 'domain': {'type': 'domain', 'to_ids': True}, - 'email': {'type': 'email', 'to_ids': True}, + 'email-src': {'type': 'email-src', 'to_ids': True}, + 'email-dst': {'type': 'email-dst', 'to_ids': True}, 'filename': {'type': 'filename', 'to_ids': True}, 'md5': {'type': 'md5', 'to_ids': True}, 'sha1': {'type': 'sha1', 'to_ids': True}, 'sha256': {'type': 'sha256', 'to_ids': True}, - 'ipv4': {'type': 'ip', 'to_ids': True}, - 'ipv6': {'type': 'ip', 'to_ids': True}, + 'ip-src': {'type': 'ip-src', 'to_ids': True}, + 'ip-dst': {'type': 'ip-dst', 'to_ids': True}, 'url': {'type': 'url', 'to_ids': True}, 'cve': {'type': 'vulnerability', 'to_ids': True}, 'btcaddress': {'type': 'btc', 'to_ids': True}, @@ -48,32 +49,124 @@ def handler(q=False): misperrors['error'] = 'Cluster25 base_url is missing' return misperrors - # validate params - if not request.get('params') or not request.get('params', {}).get('value'): - return {'error': f'{standard_error_message}, which should contain a value.'} + # validate attribute + if not request.get('attribute') or not check_input_attribute(request['attribute']): + return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'} + attribute = request.get('attribute') + if not any(input_type == attribute.get('type') for input_type in misp_type_in): + return {'error': 'Unsupported attribute type.'} client = Cluster25CTI(request['config']['api_id'], request['config']['apikey'], request['config']['base_url']) - return {'results': lookup_indicator(client, request.get('params'))} + return lookup_indicator(client, request.get('attribute')) -def lookup_indicator(client, indicator): +def format_content(content): + if isinstance(content, str) or isinstance(content, bool) or isinstance(content, int): + return content + ret = "" + tmp_ret = [] + if content is None: + return ret + is_dict = isinstance(content, dict) + is_list = isinstance(content, list) + for index, key in enumerate(content): + if is_dict: + if isinstance(content[key], dict): + ret = format_content(content[key]) - result = client.investigate(indicator) + elif isinstance(content[key], list): + for list_item in content[key]: + tmp_ret.append(format_content(list_item)) + else: + tmp_ret.append(f"{key}: {content[key]}") + elif is_list: + if isinstance(content[index], str): + ret = ", ".join(content) + else: + ret = format_content(content) + if tmp_ret: + ret = " ".join(tmp_ret) + return ret - misp_event = MISPEvent() + +def lookup_indicator(client, attr): + + result = client.investigate(attr) if result.get('error'): return result + misp_event = MISPEvent() + attribute = MISPAttribute() + attribute.from_dict(**attr) + misp_event.add_attribute(**attribute) - if mapping_out.get(result.get('indicator_type')): - r = mapping_out[result.get('indicator_type')].copy() - r['value'] = result - attribute = MISPAttribute() - attribute.from_dict(**r) - misp_event.add_attribute(**attribute) + misp_object_g = MISPObject('c25_generic_info') + misp_object_g.template_uuid = uuid.uuid4() + misp_object_g.description = 'c25_generic_info' + setattr(misp_object_g, 'meta-category', 'network') + + misp_objects = [] + for ind, entry in enumerate(result): + if isinstance(result[entry], dict): + tmp_obj = MISPObject(f"c25_{entry}") + tmp_obj.template_uuid = uuid.uuid4() + tmp_obj.description = f"c25_{entry}" + setattr(tmp_obj, 'meta-category', 'network') + tmp_obj.add_reference(attribute['uuid'], 'related-to') + for key in result[entry]: + if isinstance(result[entry][key], dict): + for index, key2 in enumerate(result[entry][key]): + if result[entry][key][key2]: + tmp_obj.add_attribute(f"{entry}_{key}_{key2}", **{'type': 'text', 'value': format_content( + result[entry][key][key2])}) + + elif isinstance(result[entry][key], list): + for index, key2 in enumerate(result[entry][key]): + if isinstance(key2, dict): + tmp_obj_2 = MISPObject(f"c25_{entry}_{key}_{index+1}") + tmp_obj_2.template_uuid = uuid.uuid4() + tmp_obj_2.description = f"c25_{entry}_{key}" + setattr(tmp_obj_2, 'meta-category', 'network') + tmp_obj_2.add_reference(attribute['uuid'], 'related-to') + for k in key2: + if key2[k]: + tmp_obj_2.add_attribute(k, **{'type': 'text', 'value': format_content(key2[k])}) + misp_objects.append(tmp_obj_2) + elif key2 is not None: + tmp_obj.add_attribute(f"{entry}_{key}", **{'type': 'text', 'value': format_content(key2)}) + elif result[entry][key] is not None: + tmp_obj.add_attribute(key, **{'type': 'text', 'value': result[entry][key]}) + + if tmp_obj.attributes: + misp_objects.append(tmp_obj) + + elif isinstance(result[entry], list): + for index, key in enumerate(result[entry]): + if isinstance(key, dict): + tmp_obj = MISPObject(f"c25_{entry}_{index+1}") + tmp_obj.template_uuid = uuid.uuid4() + tmp_obj.description = f"c25_{entry}_{index+1}" + setattr(tmp_obj, 'meta-category', 'network') + tmp_obj.add_reference(attribute['uuid'], 'related-to') + for key2 in key: + if key[key2]: + tmp_obj.add_attribute(key2, **{'type': 'text', 'value': format_content(key[key2])}) + tmp_obj.add_reference(attribute['uuid'], 'related-to') + misp_objects.append(tmp_obj) + elif key is not None: + misp_object_g.add_attribute(entry, **{'type': 'text', 'value': format_content(key)}) + else: + if result[entry]: + misp_object_g.add_attribute(entry, **{'type': 'text', 'value': result[entry]}) + + misp_object_g.add_reference(attribute['uuid'], 'related-to') + misp_event.add_object(misp_object_g) + for misp_object in misp_objects: + misp_event.add_object(misp_object) event = json.loads(misp_event.to_json()) - return {'Object': event.get('Object', []), 'Attribute': event.get('Attribute', [])} + results = {key: event[key] for key in ('Attribute', 'Object')} + return {'results': results} def introspection(): @@ -107,4 +200,3 @@ class Cluster25CTI: return{'error': f"Unable to retrieve investigate result for indicator '{indicator.get('value')}' " f"from C25 platform, status {r.status_code}"} return r.json()["data"] -