MISP-maltego/src/MISP_maltego/transforms/common/util.py

417 lines
16 KiB
Python
Raw Normal View History

2018-11-20 11:10:19 +01:00
from canari.maltego.entities import Unknown, Hash, Domain, IPv4Address, URL, DNSName, AS, Website, NSRecord, PhoneNumber, EmailAddress, File, Person, Hashtag, Location, Company, Alias, Port, Twitter
from MISP_maltego.transforms.common.entities import MISPEvent, MISPObject, MISPGalaxy
2018-12-01 20:02:15 +01:00
from canari.maltego.message import UIMessageType, UIMessage, Label, LinkStyle
from pymisp import PyMISP
import json
import os
import os.path
import tempfile
import time
# mapping_maltego_to_misp = {
# 'maltego.Hash': ['md5', 'sha1', 'sha256', 'sha224', 'sha384', 'sha512', 'sha512/224', 'sha512/256'],
# # 'maltego.Banner': [''],
# # 'maltego.WebTitle': [''],
# 'maltego.Domain': ['domain', 'hostname'],
# # 'maltego.Netblock': [''],
# # 'maltego.MXRecord': [''],
# 'maltego.IPv4Address': ['ip-src', 'ip-dst', 'ip'],
# 'maltego.URL': ['url', 'uri'],
# 'maltego.DNSName': ['domain', 'hostname'],
# 'maltego.AS': ['AS'],
# # 'maltego.UniqueIdentifier': [''],
# 'maltego.Website': ['domain', 'hostname'],
# 'maltego.NSRecord': ['domain', 'hostname'],
# # 'maltego.Document': [''],
# 'maltego.PhoneNumber': ['phone-number'],
# 'maltego.EmailAddress': ['email-src', 'email-dst'],
# # 'maltego.Image': [''], # LATER file image
# # 'maltego.Phrase': [''],
# 'maltego.File': ['filename'],
# # 'maltego.Person': [''],
# # 'maltego.Sentiment': [''],
# # 'maltego.Alias': [''],
# # 'maltego.GPS': [''],
# # 'maltego.CircularArea': [''],
# # 'maltego.NominatimLocation': [''],
# # 'maltego.Location': [''],
# # 'maltego.Device': [''],
# # 'maltego.affiliation.Flickr': [''],
# # 'maltego.FacebookObject': [''],
# # 'maltego.hashtag': [''],
# # 'maltego.affiliation.Twitter': [''],
# # 'maltego.affiliation.Facebook': [''],
# # 'maltego.Twit': [''],
# # 'maltego.Port': [''],
# # 'maltego.Service': [''],
# # 'maltego.BuiltWithTechnology': [''],
# }
# mapping_misp_to_maltego = {}
# for key, vals in mapping_maltego_to_misp.items():
# for val in vals:
# if val not in mapping_misp_to_maltego:
# mapping_misp_to_maltego[val] = []
# mapping_misp_to_maltego[val].append(key)
mapping_misp_to_maltego = {
'AS': [AS],
'domain': [Domain, NSRecord, Website, DNSName],
'email-dst': [EmailAddress],
'email-src': [EmailAddress],
'filename': [File],
'hostname': [Website, NSRecord, Domain, DNSName],
'ip': [IPv4Address],
'ip-dst': [IPv4Address],
'ip-src': [IPv4Address],
'md5': [Hash],
'phone-number': [PhoneNumber],
'sha1': [Hash],
'sha224': [Hash],
'sha256': [Hash],
'sha384': [Hash],
'sha512': [Hash],
'sha512/224': [Hash],
'sha512/256': [Hash],
2018-11-21 09:10:28 +01:00
'ssdeep': [Hash],
2018-11-21 09:12:05 +01:00
'impfuzzy': [Hash],
'uri': [URL],
'url': [URL],
'whois-registrant-email': [EmailAddress],
2018-11-20 11:10:19 +01:00
'country-of-residence': [Location],
'github-organisation': [Company],
'github-username': [Alias],
'imphash': [Hash],
'jabber-id': [Alias],
'passport-country': [Location],
'place-of-birth': [Location],
'port': [Port],
'target-email': [EmailAddress],
'target-location': [Location],
'target-org': [Company],
'target-user': [Alias],
'twitter-id': [Twitter],
# object mappings
'nameserver': [NSRecord],
2018-12-11 12:07:08 +01:00
# TODO add more object mappings
# custom types created internally for technical reasons
# 'rekey_value': [Unknown]
}
tag_note_prefixes = ['tlp:', 'PAP:', 'de-vs:', 'euci:', 'fr-classif:', 'nato:']
2018-12-11 12:07:08 +01:00
misp_connection = None
2018-12-11 12:07:08 +01:00
def get_misp_connection(config=None):
global misp_connection
if misp_connection:
return misp_connection
if not config:
raise Exception("ERROR: MISP connection not yet established, and config not provided as parameter.")
if config['MISP_maltego.local.misp_verify'] in ['True', 'true', 1, 'yes', 'Yes']:
misp_verify = True
else:
misp_verify = False
if config['MISP_maltego.local.misp_debug'] in ['True', 'true', 1, 'yes', 'Yes']:
misp_debug = True
else:
misp_debug = False
2018-12-11 12:07:08 +01:00
misp_connection = PyMISP(config['MISP_maltego.local.misp_url'], config['MISP_maltego.local.misp_key'], misp_verify, 'json', misp_debug)
return misp_connection
def entity_obj_to_entity(entity_obj, v, t, **kwargs):
if entity_obj == Hash:
2018-12-11 12:07:08 +01:00
return entity_obj(v, _type=t, **kwargs) # LATER type is conflicting with type of Entity, Report this as bug see line 326 /usr/local/lib/python3.5/dist-packages/canari/maltego/entities.py
return entity_obj(v, **kwargs)
def attribute_to_entity(a, link_label=None, event_tags=None):
# prepare some attributes to a better form
a['data'] = None # empty the file content as we really don't need this here
if a['type'] == 'malware-sample':
a['type'] = 'filename|md5'
2018-12-11 12:07:08 +01:00
if a['type'] == 'regkey|value': # LATER regkey|value => needs to be a special non-combined object
a['type'] = 'regkey'
2018-12-01 20:02:15 +01:00
combined_tags = event_tags
if 'Galaxy' in a:
for g in a['Galaxy']:
for c in g['GalaxyCluster']:
yield galaxycluster_to_entity(c)
# TODO today the tag is attached to the event, not the attribute, this is something we want to fix soon.
if 'Tag' in a:
for t in a['Tag']:
combined_tags.append(t['name'])
# ignore all misp-galaxies
if t['name'].startswith('misp-galaxy'):
continue
yield Hashtag(t['name'])
notes = convert_tags_to_note(combined_tags)
2018-12-01 20:02:15 +01:00
# special cases
if a['type'] in ('url', 'uri'):
yield(URL(url=a['value'], link_label=link_label, notes=notes))
2018-12-01 20:02:15 +01:00
return
# attribute is from an object, and a relation gives better understanding of the type of attribute
if a.get('object_relation') and mapping_misp_to_maltego.get(a['object_relation']):
entity_obj = mapping_misp_to_maltego[a['object_relation']][0]
yield entity_obj(a['value'], labels=[Label('comment', a.get('comment'))], link_label=link_label, notes=notes)
# combined attributes
elif '|' in a['type']:
t_1, t_2 = a['type'].split('|')
v_1, v_2 = a['value'].split('|')
if t_1 in mapping_misp_to_maltego:
entity_obj = mapping_misp_to_maltego[t_1][0]
2018-12-01 20:02:15 +01:00
labels = [Label('comment', a.get('comment'))]
if entity_obj == File:
labels.append(Label('hash', v_2))
yield entity_obj_to_entity(entity_obj, v_1, t_1, labels=labels, link_label=link_label, notes=notes) # LATER change the comment to include the second part of the regkey
else:
yield UIMessage("Type {} of combined type {} not supported for attribute: {}".format(t_1, a['type'], a), type=UIMessageType.Inform)
if t_2 in mapping_misp_to_maltego:
entity_obj = mapping_misp_to_maltego[t_2][0]
2018-12-01 20:02:15 +01:00
labels = [Label('comment', a.get('comment'))]
if entity_obj == Hash:
labels.append(Label('filename', v_1))
yield entity_obj_to_entity(entity_obj, v_2, t_2, labels=labels, link_label=link_label, notes=notes) # LATER change the comment to include the first part of the regkey
else:
yield UIMessage("Type {} of combined type {} not supported for attribute: {}".format(t_2, a['type'], a), type=UIMessageType.Inform)
# normal attributes
elif a['type'] in mapping_misp_to_maltego:
entity_obj = mapping_misp_to_maltego[a['type']][0]
yield entity_obj_to_entity(entity_obj, a['value'], a['type'], labels=[Label('comment', a.get('comment'))], link_label=link_label, notes=notes)
# not supported in our maltego mapping
else:
yield Unknown(a['value'], type=a['type'], labels=[Label('comment', a.get('comment'))], link_label=link_label, notes=notes)
yield UIMessage("Type {} not fully supported for attribute: {}".format(a['type'], a), type=UIMessageType.Inform)
2018-12-11 12:07:08 +01:00
# LATER : relationships from attributes - not yet supported by MISP yet, but there are references in the datamodel
2018-12-01 20:02:15 +01:00
def object_to_entity(o, link_label=None):
2018-12-11 12:07:08 +01:00
# Generate a human readable display-name:
# - find the first RequiredOneOf that exists
# - if none, use the first RequiredField
# LATER further finetune the human readable version of this object
misp = get_misp_connection()
o_template = misp.get_object_template_id(o['template_uuid'])
human_readable = None
try:
found = False
while not found: # the while loop is broken once something is found, or the requiredOneOf has no elements left
required_ote_type = o_template['ObjectTemplate']['requirements']['requiredOneOf'].pop(0)
for ote in o_template['ObjectTemplateElement']:
if ote['object_relation'] == required_ote_type:
required_a_type = ote['type']
break
for a in o['Attribute']:
if a['type'] == required_a_type:
human_readable = '{}: {}'.format(o['name'], a['value'])
found = True
break
except Exception:
pass
if not human_readable:
try:
found = False
parts = []
for required_ote_type in o_template['ObjectTemplate']['requirements']['required']:
for ote in o_template['ObjectTemplateElement']:
if ote['object_relation'] == required_ote_type:
required_a_type = ote['type']
break
for a in o['Attribute']:
if a['type'] == required_a_type:
parts.append(a['value'])
break
human_readable = '{}: {}'.format(o['name'], '|'.join(parts))
except Exception:
human_readable = o['name']
pass
return MISPObject(
2018-12-11 12:07:08 +01:00
human_readable,
uuid=o['uuid'],
event_id=int(o['event_id']),
meta_category=o.get('meta_category'),
2018-12-01 20:02:15 +01:00
description=o.get('description'),
comment=o.get('comment'),
link_label=link_label
)
2018-12-11 12:07:08 +01:00
def object_to_attributes(o, e):
# first process attributes from an object that belong together (eg: first-name + last-name), and remove them from the list
if o['name'] == 'person':
first_name = get_attribute_in_object(o, 'first-name', drop=True).get('value')
last_name = get_attribute_in_object(o, 'last-name', drop=True).get('value')
yield entity_obj_to_entity(Person, ' '.join([first_name, last_name]).strip(), 'person', lastname=last_name, firstnames=first_name)
# process normal attributes
for a in o['Attribute']:
for item in attribute_to_entity(a):
yield item
2018-12-01 20:02:15 +01:00
# process relationships between objects and attributes
if 'ObjectReference' in o:
for ref in o['ObjectReference']:
# the reference is an Object
if ref.get('Object'):
2018-12-11 12:07:08 +01:00
# get the full object in the event, as our objectReference included does not contain everything we need
sub_object = get_object_in_event(ref['Object']['uuid'], e)
yield object_to_entity(sub_object, link_label=ref['relationship_type'])
2018-12-01 20:02:15 +01:00
# the reference is an Attribute
if ref.get('Attribute'):
2018-12-11 12:07:08 +01:00
ref['Attribute']['event_id'] = ref['event_id'] # LATER remove this ugly workaround - object can't be requested directly from MISP using the uuid, and to find a full object we need the event_id
2018-12-01 20:02:15 +01:00
for item in attribute_to_entity(ref['Attribute'], link_label=ref['relationship_type']):
yield item
2018-12-11 12:07:08 +01:00
def get_object_in_event(uuid, e):
for o in e['Event']['Object']:
if o['uuid'] == uuid:
return o
def get_attribute_in_object(o, attribute_type, drop=False):
'''Gets the first attribute of a specific type within an object'''
found_attribute = {'value': ''}
for i, a in enumerate(o['Attribute']):
if a['type'] == attribute_type:
found_attribute = a.copy()
if drop: # drop the attribute from the object
o['Attribute'].pop(i)
break
return found_attribute
def convert_tags_to_note(tags):
if not tags:
return None
notes = []
for tag in tags:
for tag_note_prefix in tag_note_prefixes:
if tag.startswith(tag_note_prefix):
notes.append(tag)
return '\n'.join(notes)
def event_to_entity(e, link_style=LinkStyle.Normal):
tags = []
if 'Tag' in e['Event']:
for t in e['Event']['Tag']:
tags.append(t['name'])
notes = convert_tags_to_note(tags)
return MISPEvent(e['Event']['id'], uuid=e['Event']['uuid'], info=e['Event']['info'], link_style=link_style, notes=notes)
def galaxycluster_to_entity(c, link_label=None):
if 'meta' in c and 'uuid' in c['meta']:
c['uuid'] = c['meta']['uuid'].pop(0)
galaxy_cluster = get_galaxy_cluster(c['uuid'])
icon_url = None
if 'icon' in galaxy_cluster: # LATER further investigate if using icons locally is a good idea.
icon_url = 'file://{}/{}.png'.format(os.path.join(os.getcwd(), 'MISP_maltego', 'resources', 'images', 'fontawesome'), galaxy_cluster['icon'])
if c['meta'].get('synonyms'):
synonyms = ', '.join(c['meta']['synonyms'])
else:
synonyms = ''
return MISPGalaxy(
'{}\n{}'.format(c['type'], c['value']),
uuid=c['uuid'],
description=c['description'],
cluster_type=c['type'],
cluster_value=c['value'],
synonyms=synonyms,
tag_name=c['tag_name'],
link_label=link_label,
icon_url=icon_url
)
# FIXME this uses the galaxies from github as the MISP web UI does not fully support the Galaxies in the webui.
# See https://github.com/MISP/MISP/issues/3801
galaxy_archive_url = 'https://github.com/MISP/misp-galaxy/archive/master.zip'
local_path_root = os.path.join(tempfile.gettempdir(), 'MISP-maltego')
local_path_uuid_mapping = os.path.join(local_path_root, 'MISP_maltego_galaxy_mapping.json')
local_path_clusters = os.path.join(local_path_root, 'misp-galaxy-master', 'clusters')
galaxy_cluster_uuids = None
def galaxy_update_local_copy(force=False):
import io
import json
import os
import requests
from zipfile import ZipFile
# some aging and automatic re-downloading
if not os.path.exists(local_path_root):
os.mkdir(local_path_root)
force = True
if not os.path.exists(local_path_uuid_mapping):
force = True
else:
# force update if cache is older thn 24 hours
if time.time() - os.path.getmtime(local_path_uuid_mapping) > 60 * 60 * 24:
force = True
if force:
# download the latest zip of the public galaxy
resp = requests.get(galaxy_archive_url)
zf = ZipFile(io.BytesIO(resp.content))
zf.extractall(local_path_root)
zf.close()
# generate the uuid mapping and save it to a file
galaxies_fnames = []
for f in os.listdir(local_path_clusters):
if '.json' in f:
galaxies_fnames.append(f)
galaxies_fnames.sort()
cluster_uuids = {}
for galaxy_fname in galaxies_fnames:
fullPathClusters = os.path.join(local_path_clusters, galaxy_fname)
with open(fullPathClusters) as fp:
galaxy = json.load(fp)
with open(fullPathClusters.replace('clusters', 'galaxies')) as fg:
galaxy_main = json.load(fg)
for cluster in galaxy['values']:
if 'uuid' not in cluster:
continue
# keep track of the cluster, but also enhance it to look like the cluster we receive when accessing the web.
cluster_uuids[cluster['uuid']] = cluster
cluster_uuids[cluster['uuid']]['type'] = galaxy['type']
cluster_uuids[cluster['uuid']]['tag_name'] = 'misp-galaxy:{}="{}"'.format(galaxy['type'], cluster['value'])
if 'icon' in galaxy_main:
cluster_uuids[cluster['uuid']]['icon'] = galaxy_main['icon']
with open(local_path_uuid_mapping, 'w') as f:
json.dump(cluster_uuids, f, sort_keys=True, indent=4)
def galaxy_load_cluster_mapping():
galaxy_update_local_copy()
with open(local_path_uuid_mapping, 'r') as f:
cluster_uuids = json.load(f)
return cluster_uuids
def get_galaxy_cluster(uuid):
global galaxy_cluster_uuids
if not galaxy_cluster_uuids:
galaxy_cluster_uuids = galaxy_load_cluster_mapping()
return galaxy_cluster_uuids.get(uuid)