mirror of https://github.com/MISP/MISP-maltego
289 lines
11 KiB
Python
289 lines
11 KiB
Python
from canari.maltego.entities import Unknown, Hash, Domain, IPv4Address, URL, DNSName, AS, Website, NSRecord, PhoneNumber, EmailAddress, File, Person, Hashtag
|
|
from MISP_maltego.transforms.common.entities import MISPEvent, MISPObject, MISPGalaxy
|
|
from canari.maltego.message import UIMessageType, UIMessage, Label
|
|
from pymisp import PyMISP
|
|
import json
|
|
import tempfile
|
|
import os
|
|
|
|
|
|
# mapping_maltego_to_misp = {
|
|
# 'maltego.Hash': ['md5', 'sha1', 'sha256', 'sha224', 'sha384', 'sha512', 'sha512/224', 'sha512/256'],
|
|
# # 'maltego.Banner': [''],
|
|
# # 'maltego.WebTitle': [''],
|
|
# 'maltego.Domain': ['domain', 'hostname'],
|
|
# # 'maltego.Netblock': [''],
|
|
# # 'maltego.MXRecord': [''],
|
|
# 'maltego.IPv4Address': ['ip-src', 'ip-dst', 'ip'],
|
|
# 'maltego.URL': ['url', 'uri'],
|
|
# 'maltego.DNSName': ['domain', 'hostname'],
|
|
# 'maltego.AS': ['AS'],
|
|
# # 'maltego.UniqueIdentifier': [''],
|
|
# 'maltego.Website': ['domain', 'hostname'],
|
|
# 'maltego.NSRecord': ['domain', 'hostname'],
|
|
# # 'maltego.Document': [''],
|
|
# 'maltego.PhoneNumber': ['phone-number'],
|
|
# 'maltego.EmailAddress': ['email-src', 'email-dst'],
|
|
# # 'maltego.Image': [''], # LATER file image
|
|
# # 'maltego.Phrase': [''],
|
|
# 'maltego.File': ['filename'],
|
|
# # 'maltego.Person': [''],
|
|
# # 'maltego.Sentiment': [''],
|
|
# # 'maltego.Alias': [''],
|
|
# # 'maltego.GPS': [''],
|
|
# # 'maltego.CircularArea': [''],
|
|
# # 'maltego.NominatimLocation': [''],
|
|
# # 'maltego.Location': [''],
|
|
# # 'maltego.Device': [''],
|
|
# # 'maltego.affiliation.Flickr': [''],
|
|
# # 'maltego.FacebookObject': [''],
|
|
# # 'maltego.hashtag': [''],
|
|
# # 'maltego.affiliation.Twitter': [''],
|
|
# # 'maltego.affiliation.Facebook': [''],
|
|
# # 'maltego.Twit': [''],
|
|
# # 'maltego.Port': [''],
|
|
# # 'maltego.Service': [''],
|
|
# # 'maltego.BuiltWithTechnology': [''],
|
|
# }
|
|
|
|
# mapping_misp_to_maltego = {}
|
|
# for key, vals in mapping_maltego_to_misp.items():
|
|
# for val in vals:
|
|
# if val not in mapping_misp_to_maltego:
|
|
# mapping_misp_to_maltego[val] = []
|
|
# mapping_misp_to_maltego[val].append(key)
|
|
|
|
mapping_misp_to_maltego = {
|
|
'AS': [AS],
|
|
'domain': [Domain, NSRecord, Website, DNSName],
|
|
'email-dst': [EmailAddress],
|
|
'email-src': [EmailAddress],
|
|
'filename': [File],
|
|
'hostname': [Website, NSRecord, Domain, DNSName],
|
|
'ip': [IPv4Address],
|
|
'ip-dst': [IPv4Address],
|
|
'ip-src': [IPv4Address],
|
|
'md5': [Hash],
|
|
'phone-number': [PhoneNumber],
|
|
'sha1': [Hash],
|
|
'sha224': [Hash],
|
|
'sha256': [Hash],
|
|
'sha384': [Hash],
|
|
'sha512': [Hash],
|
|
'sha512/224': [Hash],
|
|
'sha512/256': [Hash],
|
|
'uri': [URL],
|
|
'url': [URL],
|
|
|
|
'whois-registrant-email': [EmailAddress],
|
|
|
|
# object mappings
|
|
'nameserver': [NSRecord],
|
|
# FIXME add more object mappings
|
|
# custom types created internally for technical reasons
|
|
# 'rekey_value': [Unknown]
|
|
}
|
|
|
|
|
|
def get_misp_connection(config):
|
|
if config['MISP_maltego.local.misp_verify'] in ['True', 'true', 1, 'yes', 'Yes']:
|
|
misp_verify = True
|
|
else:
|
|
misp_verify = False
|
|
if config['MISP_maltego.local.misp_debug'] in ['True', 'true', 1, 'yes', 'Yes']:
|
|
misp_debug = True
|
|
else:
|
|
misp_debug = False
|
|
return PyMISP(config['MISP_maltego.local.misp_url'], config['MISP_maltego.local.misp_key'], misp_verify, 'json', misp_debug)
|
|
|
|
|
|
def entity_obj_to_entity(entity_obj, v, t, **kwargs):
|
|
if entity_obj == Hash:
|
|
return entity_obj(v, _type=t, **kwargs) # FIXME type is conflicting with type of Entity, Report this as bug see line 326 /usr/local/lib/python3.5/dist-packages/canari/maltego/entities.py
|
|
|
|
return entity_obj(v, **kwargs)
|
|
|
|
|
|
def attribute_to_entity(a):
|
|
# prepare some attributes to a better form
|
|
a['data'] = None # empty the file content as we really don't need this here # FIXME feature request for misp.get_event() to not get attachment content
|
|
if a['type'] == 'malware-sample':
|
|
a['type'] = 'filename|md5'
|
|
if a['type'] == 'regkey|value':
|
|
a['type'] = 'regkey'
|
|
# FIXME regkey|value => needs to be a special non-combined object
|
|
|
|
# attribute is from an object, and a relation gives better understanding of the type of attribute
|
|
if a.get('object_relation') and mapping_misp_to_maltego.get(a['object_relation']):
|
|
entity_obj = mapping_misp_to_maltego[a['object_relation']][0]
|
|
yield entity_obj(a['value'], labels=[Label('comment', a['comment'])])
|
|
|
|
# combined attributes
|
|
elif '|' in a['type']:
|
|
t_1, t_2 = a['type'].split('|')
|
|
v_1, v_2 = a['value'].split('|')
|
|
if t_1 in mapping_misp_to_maltego:
|
|
entity_obj = mapping_misp_to_maltego[t_1][0]
|
|
labels = [Label('comment', a['comment'])]
|
|
if entity_obj == File:
|
|
labels.append(Label('hash', v_2))
|
|
yield entity_obj_to_entity(entity_obj, v_1, t_1, labels=labels) # TODO change the comment to include the second part of the regkey
|
|
else:
|
|
yield UIMessage("Type {} of combined type {} not supported for attribute: {}".format(t_1, a['type'], a), type=UIMessageType.Inform)
|
|
if t_2 in mapping_misp_to_maltego:
|
|
entity_obj = mapping_misp_to_maltego[t_2][0]
|
|
labels = [Label('comment', a['comment'])]
|
|
if entity_obj == Hash:
|
|
labels.append(Label('filename', v_1))
|
|
yield entity_obj_to_entity(entity_obj, v_2, t_2, labels=labels) # TODO change the comment to include the first part of the regkey
|
|
else:
|
|
yield UIMessage("Type {} of combined type {} not supported for attribute: {}".format(t_2, a['type'], a), type=UIMessageType.Inform)
|
|
|
|
# normal attributes
|
|
elif a['type'] in mapping_misp_to_maltego:
|
|
entity_obj = mapping_misp_to_maltego[a['type']][0]
|
|
yield entity_obj_to_entity(entity_obj, a['value'], a['type'], labels=[Label('comment', a['comment'])])
|
|
|
|
# not supported in our maltego mapping
|
|
else:
|
|
yield Unknown(a['value'], type=a['type'], labels=[Label('comment', a['comment'])])
|
|
yield UIMessage("Type {} not fully supported for attribute: {}".format(a['type'], a), type=UIMessageType.Inform)
|
|
|
|
if 'Galaxy' in a:
|
|
for g in a['Galaxy']:
|
|
for c in g['GalaxyCluster']:
|
|
yield galaxycluster_to_entity(c)
|
|
|
|
if 'Tag' in a:
|
|
for t in a['Tag']:
|
|
# ignore all misp-galaxies
|
|
if t['name'].startswith('misp-galaxy'):
|
|
continue
|
|
yield Hashtag(t['name'])
|
|
|
|
|
|
def object_to_entity(o):
|
|
return MISPObject(
|
|
o['name'],
|
|
uuid=o['uuid'],
|
|
event_id=int(o['event_id']),
|
|
meta_category=o.get('meta_category'),
|
|
description=o['description'],
|
|
comment=o['comment']
|
|
)
|
|
|
|
|
|
def object_to_attributes(o):
|
|
# first process attributes from an object that belong together (eg: first-name + last-name), and remove them from the list
|
|
if o['name'] == 'person':
|
|
first_name = get_attribute_in_object(o, 'first-name', drop=True).get('value')
|
|
last_name = get_attribute_in_object(o, 'last-name', drop=True).get('value')
|
|
yield entity_obj_to_entity(Person, ' '.join([first_name, last_name]).strip(), 'person', lastname=last_name, firstnames=first_name)
|
|
|
|
# process normal attributes
|
|
for a in o['Attribute']:
|
|
for item in attribute_to_entity(a):
|
|
yield item
|
|
|
|
|
|
def get_attribute_in_object(o, attribute_type, drop=False):
|
|
'''Gets the first attribute of a specific type within an object'''
|
|
found_attribute = {'value': ''}
|
|
for i, a in enumerate(o['Attribute']):
|
|
if a['type'] == attribute_type:
|
|
found_attribute = a.copy()
|
|
if drop: # drop the attribute from the object
|
|
o['Attribute'].pop(i)
|
|
break
|
|
return found_attribute
|
|
|
|
|
|
def event_to_entity(e):
|
|
return MISPEvent(e['Event']['id'], uuid=e['Event']['uuid'], info=e['Event']['info'])
|
|
|
|
|
|
def galaxycluster_to_entity(c, link_label=None):
|
|
# print(json.dumps(c, sort_keys=True, indent=4))
|
|
if c['meta'].get('synonyms'):
|
|
synonyms = ', '.join(c['meta']['synonyms'])
|
|
else:
|
|
synonyms = ''
|
|
return MISPGalaxy(
|
|
'{}\n{}'.format(c['type'], c['value']),
|
|
uuid=c['uuid'],
|
|
description=c['description'],
|
|
cluster_type=c['type'],
|
|
cluster_value=c['value'],
|
|
synonyms=synonyms,
|
|
tag_name=c['tag_name'],
|
|
link_label=link_label
|
|
)
|
|
|
|
|
|
# FIXME this uses the galaxies from github as the MISP web UI does not fully support the Galaxies in the webui.
|
|
# See https://github.com/MISP/MISP/issues/3801
|
|
galaxy_archive_url = 'https://github.com/MISP/misp-galaxy/archive/master.zip'
|
|
local_path_root = os.path.join(tempfile.gettempdir(), 'MISP-maltego')
|
|
local_path_uuid_mapping = os.path.join(local_path_root, 'MISP_maltego_galaxy_mapping.json')
|
|
local_path_clusters = os.path.join(local_path_root, 'misp-galaxy-master', 'clusters')
|
|
galaxy_cluster_uuids = None
|
|
|
|
|
|
def galaxy_update_local_copy(force=False):
|
|
import io
|
|
import json
|
|
import os
|
|
import requests
|
|
from zipfile import ZipFile
|
|
|
|
# FIXME put some aging and automatic re-downloading
|
|
if not os.path.exists(local_path_root):
|
|
os.mkdir(local_path_root)
|
|
force = True
|
|
|
|
if force:
|
|
# download the latest zip of the public galaxy
|
|
resp = requests.get(galaxy_archive_url)
|
|
zf = ZipFile(io.BytesIO(resp.content))
|
|
zf.extractall(local_path_root)
|
|
zf.close()
|
|
|
|
# generate the uuid mapping and save it to a file
|
|
galaxies_fnames = []
|
|
for f in os.listdir(local_path_clusters):
|
|
if '.json' in f:
|
|
galaxies_fnames.append(f)
|
|
galaxies_fnames.sort()
|
|
|
|
cluster_uuids = {}
|
|
for galaxy_fname in galaxies_fnames:
|
|
fullPathClusters = os.path.join(local_path_clusters, galaxy_fname)
|
|
with open(fullPathClusters) as fp:
|
|
galaxy = json.load(fp)
|
|
for cluster in galaxy['values']:
|
|
# print(cluster['uuid'])
|
|
if 'uuid' not in cluster:
|
|
continue
|
|
# keep track of the cluster, but also enhance it to look like the cluster we receive when accessing the web.
|
|
cluster_uuids[cluster['uuid']] = cluster
|
|
cluster_uuids[cluster['uuid']]['type'] = galaxy['type']
|
|
cluster_uuids[cluster['uuid']]['tag_name'] = 'misp-galaxy:{}="{}"'.format(galaxy['type'], cluster['value'])
|
|
|
|
with open(local_path_uuid_mapping, 'w') as f:
|
|
json.dump(cluster_uuids, f, sort_keys=True, indent=4)
|
|
|
|
|
|
def galaxy_load_cluster_mapping():
|
|
galaxy_update_local_copy()
|
|
with open(local_path_uuid_mapping, 'r') as f:
|
|
cluster_uuids = json.load(f)
|
|
return cluster_uuids
|
|
|
|
|
|
def get_galaxy_cluster(uuid):
|
|
global galaxy_cluster_uuids
|
|
if not galaxy_cluster_uuids:
|
|
galaxy_cluster_uuids = galaxy_load_cluster_mapping()
|
|
|
|
return galaxy_cluster_uuids.get(uuid)
|