mirror of https://github.com/MISP/misp-modules
cluster25_expand: handles related items and more
parent
a4bcc15db0
commit
52f53f81d0
|
@ -10,7 +10,7 @@ moduleinfo = {'version': '0.1',
|
|||
'module-type': ['expansion', 'hover']}
|
||||
moduleconfig = ['api_id', 'apikey', 'base_url']
|
||||
misperrors = {'error': 'Error'}
|
||||
misp_type_in = ['domain', 'email-src', 'email-dst', 'filename', 'md5', 'sha1', 'sha256', 'ip-src', 'ip-dst', 'url',
|
||||
misp_type_in = ['domain', 'email-src', 'email-dst', 'filename', 'md5', 'sha1', 'sha256', 'ip-src', 'ip-dst', 'url',
|
||||
'vulnerability', 'btc', 'xmr', 'ja3-fingerprint-md5']
|
||||
|
||||
mapping_out = { # mapping between the MISP attributes type and the compatible Cluster25 indicator types.
|
||||
|
@ -62,7 +62,7 @@ def handler(q=False):
|
|||
|
||||
|
||||
def format_content(content):
|
||||
if isinstance(content, str) or isinstance(content, bool) or isinstance(content, int):
|
||||
if isinstance(content, str) or isinstance(content, bool) or isinstance(content, int) or isinstance(content, float):
|
||||
return content
|
||||
ret = ""
|
||||
tmp_ret = []
|
||||
|
@ -74,7 +74,6 @@ def format_content(content):
|
|||
if is_dict:
|
||||
if isinstance(content[key], dict):
|
||||
ret = format_content(content[key])
|
||||
|
||||
elif isinstance(content[key], list):
|
||||
for list_item in content[key]:
|
||||
tmp_ret.append(format_content(list_item))
|
||||
|
@ -91,7 +90,6 @@ def format_content(content):
|
|||
|
||||
|
||||
def lookup_indicator(client, attr):
|
||||
|
||||
result = client.investigate(attr)
|
||||
if result.get('error'):
|
||||
return result
|
||||
|
@ -115,25 +113,53 @@ def lookup_indicator(client, attr):
|
|||
tmp_obj.add_reference(attribute['uuid'], 'related-to')
|
||||
for key in result[entry]:
|
||||
if isinstance(result[entry][key], dict):
|
||||
for index, key2 in enumerate(result[entry][key]):
|
||||
if result[entry][key][key2]:
|
||||
tmp_obj.add_attribute(f"{entry}_{key}_{key2}", **{'type': 'text', 'value': format_content(
|
||||
result[entry][key][key2])})
|
||||
for index, item in enumerate(result[entry][key]):
|
||||
if result[entry][key][item]:
|
||||
tmp_obj.add_attribute(f"{entry}_{key}_{item}", **{'type': 'text', 'value': format_content(
|
||||
result[entry][key][item])})
|
||||
|
||||
elif isinstance(result[entry][key], list):
|
||||
for index, key2 in enumerate(result[entry][key]):
|
||||
if isinstance(key2, dict):
|
||||
tmp_obj_2 = MISPObject(f"c25_{entry}_{key}_{index+1}")
|
||||
for index, item in enumerate(result[entry][key]):
|
||||
if isinstance(item, dict):
|
||||
tmp_obj_2 = MISPObject(f"c25_{entry}_{key}_{index + 1}")
|
||||
tmp_obj_2.template_uuid = uuid.uuid4()
|
||||
tmp_obj_2.description = f"c25_{entry}_{key}"
|
||||
setattr(tmp_obj_2, 'meta-category', 'network')
|
||||
tmp_obj_2.add_reference(attribute['uuid'], 'related-to')
|
||||
for k in key2:
|
||||
if key2[k]:
|
||||
tmp_obj_2.add_attribute(k, **{'type': 'text', 'value': format_content(key2[k])})
|
||||
for sub_key in item:
|
||||
if isinstance(item[sub_key], list):
|
||||
for sub_item in item[sub_key]:
|
||||
if isinstance(sub_item, dict):
|
||||
tmp_obj_3 = MISPObject(f"c25_{entry}_{sub_key}_{index + 1}")
|
||||
tmp_obj_3.template_uuid = uuid.uuid4()
|
||||
tmp_obj_3.description = f"c25_{entry}_{sub_key}"
|
||||
setattr(tmp_obj_3, 'meta-category', 'network')
|
||||
tmp_obj_3.add_reference(attribute['uuid'], 'related-to')
|
||||
for sub_sub_key in sub_item:
|
||||
if isinstance(sub_item[sub_sub_key], list):
|
||||
for idx, sub_sub_item in enumerate(sub_item[sub_sub_key]):
|
||||
if sub_sub_item.get("name"):
|
||||
sub_sub_item = sub_sub_item.get("name")
|
||||
tmp_obj_3.add_attribute(f"{sub_sub_key}_{idx + 1}",
|
||||
**{'type': 'text',
|
||||
'value': format_content(
|
||||
sub_sub_item)})
|
||||
else:
|
||||
tmp_obj_3.add_attribute(sub_sub_key,
|
||||
**{'type': 'text',
|
||||
'value': format_content(
|
||||
sub_item[sub_sub_key])})
|
||||
misp_objects.append(tmp_obj_3)
|
||||
else:
|
||||
tmp_obj_2.add_attribute(sub_key, **{'type': 'text',
|
||||
'value': format_content(sub_item)})
|
||||
|
||||
elif item[sub_key]:
|
||||
tmp_obj_2.add_attribute(sub_key,
|
||||
**{'type': 'text', 'value': format_content(item[sub_key])})
|
||||
misp_objects.append(tmp_obj_2)
|
||||
elif key2 is not None:
|
||||
tmp_obj.add_attribute(f"{entry}_{key}", **{'type': 'text', 'value': format_content(key2)})
|
||||
elif item is not None:
|
||||
tmp_obj.add_attribute(f"{entry}_{key}", **{'type': 'text', 'value': format_content(item)})
|
||||
elif result[entry][key] is not None:
|
||||
tmp_obj.add_attribute(key, **{'type': 'text', 'value': result[entry][key]})
|
||||
|
||||
|
@ -143,18 +169,19 @@ def lookup_indicator(client, attr):
|
|||
elif isinstance(result[entry], list):
|
||||
for index, key in enumerate(result[entry]):
|
||||
if isinstance(key, dict):
|
||||
tmp_obj = MISPObject(f"c25_{entry}_{index+1}")
|
||||
tmp_obj = MISPObject(f"c25_{entry}_{index + 1}")
|
||||
tmp_obj.template_uuid = uuid.uuid4()
|
||||
tmp_obj.description = f"c25_{entry}_{index+1}"
|
||||
tmp_obj.description = f"c25_{entry}_{index + 1}"
|
||||
setattr(tmp_obj, 'meta-category', 'network')
|
||||
tmp_obj.add_reference(attribute['uuid'], 'related-to')
|
||||
for key2 in key:
|
||||
if key[key2]:
|
||||
tmp_obj.add_attribute(key2, **{'type': 'text', 'value': format_content(key[key2])})
|
||||
for item in key:
|
||||
if key[item]:
|
||||
tmp_obj.add_attribute(item, **{'type': 'text', 'value': format_content(key[item])})
|
||||
tmp_obj.add_reference(attribute['uuid'], 'related-to')
|
||||
misp_objects.append(tmp_obj)
|
||||
elif key is not None:
|
||||
misp_object_g.add_attribute(entry, **{'type': 'text', 'value': format_content(key)})
|
||||
misp_object_g.add_attribute(f"{entry}_{index + 1}",
|
||||
**{'type': 'text', 'value': format_content(key)})
|
||||
else:
|
||||
if result[entry]:
|
||||
misp_object_g.add_attribute(entry, **{'type': 'text', 'value': result[entry]})
|
||||
|
@ -197,6 +224,7 @@ class Cluster25CTI:
|
|||
params = {'indicator': indicator.get('value')}
|
||||
r = requests.get(url=f"{self.base_url}/investigate", params=params, headers=self.headers)
|
||||
if r.status_code != 200:
|
||||
return{'error': f"Unable to retrieve investigate result for indicator '{indicator.get('value')}' "
|
||||
f"from C25 platform, status {r.status_code}"}
|
||||
return {'error': f"Unable to retrieve investigate result for indicator '{indicator.get('value')}' "
|
||||
f"from C25 platform, status {r.status_code}"}
|
||||
return r.json()["data"]
|
||||
|
||||
|
|
Loading…
Reference in New Issue