mirror of https://github.com/MISP/misp-modules
enriches with c25 MISP objects
parent
ce7d1175e7
commit
a4bcc15db0
|
@ -1,9 +1,8 @@
|
|||
import json
|
||||
import requests
|
||||
import time
|
||||
from typing import List
|
||||
from . import standard_error_message
|
||||
from pymisp import MISPAttribute, MISPEvent
|
||||
import uuid
|
||||
from . import check_input_attribute, standard_error_message
|
||||
from pymisp import MISPAttribute, MISPEvent, MISPObject
|
||||
|
||||
moduleinfo = {'version': '0.1',
|
||||
'author': 'Milo Volpicelli',
|
||||
|
@ -11,17 +10,19 @@ moduleinfo = {'version': '0.1',
|
|||
'module-type': ['expansion', 'hover']}
|
||||
moduleconfig = ['api_id', 'apikey', 'base_url']
|
||||
misperrors = {'error': 'Error'}
|
||||
misp_type_in = ['domain', 'email', 'filename', 'md5', 'sha1', 'sha256', 'ip', 'url', 'vulnerability', 'btc',
|
||||
'xmr', 'ja3-fingerprint-md5']
|
||||
misp_type_in = ['domain', 'email-src', 'email-dst', 'filename', 'md5', 'sha1', 'sha256', 'ip-src', 'ip-dst', 'url',
|
||||
'vulnerability', 'btc', 'xmr', 'ja3-fingerprint-md5']
|
||||
|
||||
mapping_out = { # mapping between the MISP attributes type and the compatible Cluster25 indicator types.
|
||||
'domain': {'type': 'domain', 'to_ids': True},
|
||||
'email': {'type': 'email', 'to_ids': True},
|
||||
'email-src': {'type': 'email-src', 'to_ids': True},
|
||||
'email-dst': {'type': 'email-dst', 'to_ids': True},
|
||||
'filename': {'type': 'filename', 'to_ids': True},
|
||||
'md5': {'type': 'md5', 'to_ids': True},
|
||||
'sha1': {'type': 'sha1', 'to_ids': True},
|
||||
'sha256': {'type': 'sha256', 'to_ids': True},
|
||||
'ipv4': {'type': 'ip', 'to_ids': True},
|
||||
'ipv6': {'type': 'ip', 'to_ids': True},
|
||||
'ip-src': {'type': 'ip-src', 'to_ids': True},
|
||||
'ip-dst': {'type': 'ip-dst', 'to_ids': True},
|
||||
'url': {'type': 'url', 'to_ids': True},
|
||||
'cve': {'type': 'vulnerability', 'to_ids': True},
|
||||
'btcaddress': {'type': 'btc', 'to_ids': True},
|
||||
|
@ -48,32 +49,124 @@ def handler(q=False):
|
|||
misperrors['error'] = 'Cluster25 base_url is missing'
|
||||
return misperrors
|
||||
|
||||
# validate params
|
||||
if not request.get('params') or not request.get('params', {}).get('value'):
|
||||
return {'error': f'{standard_error_message}, which should contain a value.'}
|
||||
# validate attribute
|
||||
if not request.get('attribute') or not check_input_attribute(request['attribute']):
|
||||
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
|
||||
attribute = request.get('attribute')
|
||||
if not any(input_type == attribute.get('type') for input_type in misp_type_in):
|
||||
return {'error': 'Unsupported attribute type.'}
|
||||
|
||||
client = Cluster25CTI(request['config']['api_id'], request['config']['apikey'], request['config']['base_url'])
|
||||
|
||||
return {'results': lookup_indicator(client, request.get('params'))}
|
||||
return lookup_indicator(client, request.get('attribute'))
|
||||
|
||||
|
||||
def lookup_indicator(client, indicator):
|
||||
def format_content(content):
|
||||
if isinstance(content, str) or isinstance(content, bool) or isinstance(content, int):
|
||||
return content
|
||||
ret = ""
|
||||
tmp_ret = []
|
||||
if content is None:
|
||||
return ret
|
||||
is_dict = isinstance(content, dict)
|
||||
is_list = isinstance(content, list)
|
||||
for index, key in enumerate(content):
|
||||
if is_dict:
|
||||
if isinstance(content[key], dict):
|
||||
ret = format_content(content[key])
|
||||
|
||||
result = client.investigate(indicator)
|
||||
elif isinstance(content[key], list):
|
||||
for list_item in content[key]:
|
||||
tmp_ret.append(format_content(list_item))
|
||||
else:
|
||||
tmp_ret.append(f"{key}: {content[key]}")
|
||||
elif is_list:
|
||||
if isinstance(content[index], str):
|
||||
ret = ", ".join(content)
|
||||
else:
|
||||
ret = format_content(content)
|
||||
if tmp_ret:
|
||||
ret = " ".join(tmp_ret)
|
||||
return ret
|
||||
|
||||
misp_event = MISPEvent()
|
||||
|
||||
def lookup_indicator(client, attr):
|
||||
|
||||
result = client.investigate(attr)
|
||||
if result.get('error'):
|
||||
return result
|
||||
misp_event = MISPEvent()
|
||||
attribute = MISPAttribute()
|
||||
attribute.from_dict(**attr)
|
||||
misp_event.add_attribute(**attribute)
|
||||
|
||||
if mapping_out.get(result.get('indicator_type')):
|
||||
r = mapping_out[result.get('indicator_type')].copy()
|
||||
r['value'] = result
|
||||
attribute = MISPAttribute()
|
||||
attribute.from_dict(**r)
|
||||
misp_event.add_attribute(**attribute)
|
||||
misp_object_g = MISPObject('c25_generic_info')
|
||||
misp_object_g.template_uuid = uuid.uuid4()
|
||||
misp_object_g.description = 'c25_generic_info'
|
||||
setattr(misp_object_g, 'meta-category', 'network')
|
||||
|
||||
misp_objects = []
|
||||
for ind, entry in enumerate(result):
|
||||
if isinstance(result[entry], dict):
|
||||
tmp_obj = MISPObject(f"c25_{entry}")
|
||||
tmp_obj.template_uuid = uuid.uuid4()
|
||||
tmp_obj.description = f"c25_{entry}"
|
||||
setattr(tmp_obj, 'meta-category', 'network')
|
||||
tmp_obj.add_reference(attribute['uuid'], 'related-to')
|
||||
for key in result[entry]:
|
||||
if isinstance(result[entry][key], dict):
|
||||
for index, key2 in enumerate(result[entry][key]):
|
||||
if result[entry][key][key2]:
|
||||
tmp_obj.add_attribute(f"{entry}_{key}_{key2}", **{'type': 'text', 'value': format_content(
|
||||
result[entry][key][key2])})
|
||||
|
||||
elif isinstance(result[entry][key], list):
|
||||
for index, key2 in enumerate(result[entry][key]):
|
||||
if isinstance(key2, dict):
|
||||
tmp_obj_2 = MISPObject(f"c25_{entry}_{key}_{index+1}")
|
||||
tmp_obj_2.template_uuid = uuid.uuid4()
|
||||
tmp_obj_2.description = f"c25_{entry}_{key}"
|
||||
setattr(tmp_obj_2, 'meta-category', 'network')
|
||||
tmp_obj_2.add_reference(attribute['uuid'], 'related-to')
|
||||
for k in key2:
|
||||
if key2[k]:
|
||||
tmp_obj_2.add_attribute(k, **{'type': 'text', 'value': format_content(key2[k])})
|
||||
misp_objects.append(tmp_obj_2)
|
||||
elif key2 is not None:
|
||||
tmp_obj.add_attribute(f"{entry}_{key}", **{'type': 'text', 'value': format_content(key2)})
|
||||
elif result[entry][key] is not None:
|
||||
tmp_obj.add_attribute(key, **{'type': 'text', 'value': result[entry][key]})
|
||||
|
||||
if tmp_obj.attributes:
|
||||
misp_objects.append(tmp_obj)
|
||||
|
||||
elif isinstance(result[entry], list):
|
||||
for index, key in enumerate(result[entry]):
|
||||
if isinstance(key, dict):
|
||||
tmp_obj = MISPObject(f"c25_{entry}_{index+1}")
|
||||
tmp_obj.template_uuid = uuid.uuid4()
|
||||
tmp_obj.description = f"c25_{entry}_{index+1}"
|
||||
setattr(tmp_obj, 'meta-category', 'network')
|
||||
tmp_obj.add_reference(attribute['uuid'], 'related-to')
|
||||
for key2 in key:
|
||||
if key[key2]:
|
||||
tmp_obj.add_attribute(key2, **{'type': 'text', 'value': format_content(key[key2])})
|
||||
tmp_obj.add_reference(attribute['uuid'], 'related-to')
|
||||
misp_objects.append(tmp_obj)
|
||||
elif key is not None:
|
||||
misp_object_g.add_attribute(entry, **{'type': 'text', 'value': format_content(key)})
|
||||
else:
|
||||
if result[entry]:
|
||||
misp_object_g.add_attribute(entry, **{'type': 'text', 'value': result[entry]})
|
||||
|
||||
misp_object_g.add_reference(attribute['uuid'], 'related-to')
|
||||
misp_event.add_object(misp_object_g)
|
||||
for misp_object in misp_objects:
|
||||
misp_event.add_object(misp_object)
|
||||
|
||||
event = json.loads(misp_event.to_json())
|
||||
return {'Object': event.get('Object', []), 'Attribute': event.get('Attribute', [])}
|
||||
results = {key: event[key] for key in ('Attribute', 'Object')}
|
||||
return {'results': results}
|
||||
|
||||
|
||||
def introspection():
|
||||
|
@ -107,4 +200,3 @@ class Cluster25CTI:
|
|||
return{'error': f"Unable to retrieve investigate result for indicator '{indicator.get('value')}' "
|
||||
f"from C25 platform, status {r.status_code}"}
|
||||
return r.json()["data"]
|
||||
|
||||
|
|
Loading…
Reference in New Issue