2019-05-02 20:37:37 +02:00
from canari . maltego . entities import Hash , Domain , IPv4Address , URL , DNSName , AS , Website , NSRecord , PhoneNumber , EmailAddress , File , Person , Hashtag , Location , Company , Alias , Port , Twitter
2019-10-12 08:13:17 +02:00
from canari . maltego . message import Label , LinkStyle , MaltegoException , Bookmark , LinkDirection , UIMessage , UIMessageType
2019-12-25 22:41:12 +01:00
from canari . mode import is_local_exec_mode , is_remote_exec_mode
2019-10-12 08:13:17 +02:00
from distutils . version import StrictVersion
2019-12-17 21:42:24 +01:00
from MISP_maltego . transforms . common . entities import MISPEvent , MISPObject , MISPGalaxy , ThreatActor , Software , AttackTechnique
2019-10-06 16:57:27 +02:00
from pymisp import ExpandedPyMISP as PyMISP
2018-11-12 13:25:53 +01:00
import json
import os
2018-11-21 09:32:58 +01:00
import os . path
2019-10-12 08:13:17 +02:00
import requests
2018-11-21 09:32:58 +01:00
import tempfile
import time
2018-11-12 13:25:53 +01:00
2019-06-12 12:39:52 +02:00
# FIXME from galaxy 'to MISP Event' is confusing
2018-11-12 13:25:53 +01:00
2020-01-14 20:33:48 +01:00
__version__ = ' 1.4.3 ' # also update version in setup.py
2018-11-12 13:25:53 +01:00
mapping_misp_to_maltego = {
' AS ' : [ AS ] ,
' domain ' : [ Domain , NSRecord , Website , DNSName ] ,
' email-dst ' : [ EmailAddress ] ,
' email-src ' : [ EmailAddress ] ,
' filename ' : [ File ] ,
' hostname ' : [ Website , NSRecord , Domain , DNSName ] ,
' ip ' : [ IPv4Address ] ,
' ip-dst ' : [ IPv4Address ] ,
' ip-src ' : [ IPv4Address ] ,
' md5 ' : [ Hash ] ,
' phone-number ' : [ PhoneNumber ] ,
' sha1 ' : [ Hash ] ,
' sha224 ' : [ Hash ] ,
' sha256 ' : [ Hash ] ,
' sha384 ' : [ Hash ] ,
' sha512 ' : [ Hash ] ,
' sha512/224 ' : [ Hash ] ,
' sha512/256 ' : [ Hash ] ,
2018-11-21 09:10:28 +01:00
' ssdeep ' : [ Hash ] ,
2018-11-21 09:12:05 +01:00
' impfuzzy ' : [ Hash ] ,
2018-11-12 13:25:53 +01:00
' uri ' : [ URL ] ,
' url ' : [ URL ] ,
' whois-registrant-email ' : [ EmailAddress ] ,
2018-11-20 11:10:19 +01:00
' country-of-residence ' : [ Location ] ,
' github-organisation ' : [ Company ] ,
' github-username ' : [ Alias ] ,
' imphash ' : [ Hash ] ,
' jabber-id ' : [ Alias ] ,
' passport-country ' : [ Location ] ,
' place-of-birth ' : [ Location ] ,
' port ' : [ Port ] ,
' target-email ' : [ EmailAddress ] ,
' target-location ' : [ Location ] ,
' target-org ' : [ Company ] ,
' target-user ' : [ Alias ] ,
' twitter-id ' : [ Twitter ] ,
2018-11-12 13:25:53 +01:00
# object mappings
' nameserver ' : [ NSRecord ] ,
2018-12-11 12:07:08 +01:00
# TODO add more object mappings
2018-11-12 13:25:53 +01:00
# custom types created internally for technical reasons
# 'rekey_value': [Unknown]
}
2019-03-16 21:00:56 +01:00
mapping_galaxy_icon = {
# "android": "malware", # "android",
" btc " : " ransomware " ,
" bug " : " vulnerability " ,
# "cart-arrow-down": "malware", #"tds",
" chain " : " course_of_action " ,
" door-open " : " backdoor " ,
" eye " : " malware " ,
" gavel " : " tool " ,
# "globe": "cert-eu-govsector",
# "industry": "sector",
# "internet-explorer": "exploit-kit",
" key " : " stealer " ,
" map " : " attack_pattern " ,
" optin-monster " : " malware " ,
# "shield": "malpedia",
# "shield": "preventive-measure",
" sitemap " : " botnet " ,
" usd " : " malware " , # "banker",
# "user-secret": "mitre-intrusion-set",
" user-secret " : " threat_actor " ,
}
2019-12-17 21:42:24 +01:00
mapping_galaxy_type = {
# 'amitt-misinformation-pattern': '',
' android ' : Software ,
' backdoor ' : Software ,
' banker ' : Software ,
' botnet ' : Software ,
# 'branded-vulnerability': '',
# 'cert-eu-govsector': '',
' cloud-security ' : AttackTechnique ,
' exploit-kit ' : Software ,
' financial-fraud ' : AttackTechnique ,
' guidelines ' : AttackTechnique ,
' malpedia ' : Software ,
' microsoft-activity-group ' : ThreatActor ,
' mitre-attack-pattern ' : AttackTechnique ,
# 'mitre-course-of-action': '',
' mitre-intrusion-set ' : ThreatActor ,
' mitre-malware ' : Software ,
' mitre-tool ' : Software ,
# 'preventive-measure': '',
' ransomware ' : Software ,
' rat ' : Software ,
# 'region': '',
# 'sector': '',
' social-dark-patterns ' : AttackTechnique ,
' stealer ' : Software ,
' surveillance-vendor ' : ThreatActor ,
# 'target-information': '',
' tds ' : Software ,
' threat-actor ' : ThreatActor ,
' tool ' : Software
}
2018-12-11 15:44:48 +01:00
tag_note_prefixes = [ ' tlp: ' , ' PAP: ' , ' de-vs: ' , ' euci: ' , ' fr-classif: ' , ' nato: ' ]
2018-12-11 12:07:08 +01:00
misp_connection = None
2019-10-12 08:13:17 +02:00
update_url = ' https://raw.githubusercontent.com/MISP/MISP-maltego/master/setup.py '
local_path_root = os . path . join ( tempfile . gettempdir ( ) , ' MISP-maltego ' )
local_path_version = os . path . join ( local_path_root , ' versioncheck ' )
if not os . path . exists ( local_path_root ) :
os . mkdir ( local_path_root )
2019-12-25 22:41:12 +01:00
os . chmod ( local_path_root , mode = 0o777 ) # temporary workaround - see https://github.com/redcanari/canari3/issues/61
2019-10-12 08:13:17 +02:00
def check_update ( config ) :
2019-12-25 22:41:12 +01:00
# Do not check updates if running as remote transform
if is_remote_exec_mode ( ) :
return None
2019-10-12 08:13:17 +02:00
# only raise the alert once a day/reboot to the user.
try :
if time . time ( ) - os . path . getmtime ( local_path_version ) > 60 * 60 * 24 : # check the timestamp of the file
recheck = True
else :
recheck = False
except Exception : # file does not exist, so check version
recheck = True
if not recheck :
return None
# remember we checked the version
from pathlib import Path
Path ( local_path_version ) . touch ( )
# UIMessageType must be Fatal as otherwise it is not shown to the user.
if ' MISP_maltego.local.check_updates ' not in config :
return UIMessage ( " ' check_updates ' parameter missing in ' .canari/MISP_maltego.conf ' . Please set ' check_updates = True/False ' . " , type = UIMessageType . Fatal )
if config [ ' MISP_maltego.local.check_updates ' ] :
# check the version
r = requests . get ( update_url )
for l in r . text . splitlines ( ) :
if ' version= ' in l :
online_ver = l . strip ( ) . strip ( ' , ' ) . split ( ' = ' ) . pop ( ) . strip ( " ' " )
if StrictVersion ( online_ver ) > StrictVersion ( __version__ ) :
2019-10-19 10:13:32 +02:00
message = ( ' A new version of MISP-Maltego is available. \n '
' To upgrade, please: \n '
' pip3 --upgrade MISP-maltego '
' canari create-profile MISP_maltego \n '
' And import the newly generated .mtz bundle in Maltego (Import > Import Configuration) ' )
return UIMessage ( message , type = UIMessageType . Fatal )
2019-10-12 08:13:17 +02:00
break
return None
2018-11-12 13:25:53 +01:00
2018-12-11 12:07:08 +01:00
2019-12-27 09:54:20 +01:00
def get_misp_connection ( config = None , parameters = None ) :
2018-12-11 12:07:08 +01:00
global misp_connection
if misp_connection :
return misp_connection
if not config :
2019-02-07 14:02:55 +01:00
raise MaltegoException ( " ERROR: MISP connection not yet established, and config not provided as parameter. " )
2019-12-27 09:54:20 +01:00
misp_verify = True
misp_debug = False
misp_url = None
misp_key = None
2019-02-07 14:02:55 +01:00
try :
2019-12-27 09:54:20 +01:00
if is_local_exec_mode ( ) :
misp_url = config [ ' MISP_maltego.local.misp_url ' ]
misp_key = config [ ' MISP_maltego.local.misp_key ' ]
if config [ ' MISP_maltego.local.misp_verify ' ] in [ ' False ' , ' false ' , 0 , ' no ' , ' No ' ] :
misp_verify = False
if config [ ' MISP_maltego.local.misp_debug ' ] in [ ' True ' , ' true ' , 1 , ' yes ' , ' Yes ' ] :
misp_debug = True
if is_remote_exec_mode ( ) :
try :
misp_url = parameters [ ' mispurl ' ] . value
misp_key = parameters [ ' mispkey ' ] . value
except AttributeError :
raise MaltegoException ( " ERROR: mispurl and mispkey need to be set to something valid " )
2020-01-14 20:33:48 +01:00
misp_connection = PyMISP ( misp_url , misp_key , misp_verify , ' json ' , misp_debug )
2019-02-07 14:02:55 +01:00
except Exception :
2019-12-27 09:54:20 +01:00
if is_local_exec_mode ( ) :
raise MaltegoException ( " ERROR: Cannot connect to MISP server. Please verify your MISP_Maltego.conf settings. " )
if is_remote_exec_mode ( ) :
raise MaltegoException ( " ERROR: Cannot connect to MISP server. Please verify your settings (MISP URL and API key), and ensure the MISP server is reachable from the internet. " )
2018-12-11 12:07:08 +01:00
return misp_connection
2018-11-12 13:25:53 +01:00
def entity_obj_to_entity ( entity_obj , v , t , * * kwargs ) :
if entity_obj == Hash :
2019-02-08 22:41:33 +01:00
return entity_obj ( v , _type = t , * * kwargs ) # LATER type is conflicting with type of Entity, Report this as bug see line 326 /usr/local/lib/python3.5/dist-packages/canari/maltego/entities.py
2018-11-12 13:25:53 +01:00
2019-02-08 22:41:33 +01:00
return entity_obj ( v , * * kwargs )
2018-11-12 13:25:53 +01:00
2019-03-26 21:12:02 +01:00
def get_entity_property ( entity , name ) :
for k , v in entity . fields . items ( ) :
if k == name :
return v . value
return None
2019-03-23 18:19:20 +01:00
def attribute_to_entity ( a , link_label = None , event_tags = [ ] , only_self = False ) :
2018-11-12 13:25:53 +01:00
# prepare some attributes to a better form
2018-12-11 15:44:48 +01:00
a [ ' data ' ] = None # empty the file content as we really don't need this here
2018-11-12 13:25:53 +01:00
if a [ ' type ' ] == ' malware-sample ' :
a [ ' type ' ] = ' filename|md5 '
2018-12-11 12:07:08 +01:00
if a [ ' type ' ] == ' regkey|value ' : # LATER regkey|value => needs to be a special non-combined object
2018-11-12 13:25:53 +01:00
a [ ' type ' ] = ' regkey '
2018-12-01 20:02:15 +01:00
2018-12-11 15:44:48 +01:00
combined_tags = event_tags
2019-02-08 22:41:33 +01:00
if ' Galaxy ' in a and not only_self :
2018-12-11 15:44:48 +01:00
for g in a [ ' Galaxy ' ] :
for c in g [ ' GalaxyCluster ' ] :
yield galaxycluster_to_entity ( c )
2019-05-22 10:29:43 +02:00
# complement the event tags with the attribute tags.
2019-02-08 22:41:33 +01:00
if ' Tag ' in a and not only_self :
2018-12-11 15:44:48 +01:00
for t in a [ ' Tag ' ] :
combined_tags . append ( t [ ' name ' ] )
# ignore all misp-galaxies
if t [ ' name ' ] . startswith ( ' misp-galaxy ' ) :
continue
2019-02-07 14:03:30 +01:00
# ignore all those we add as notes
if tag_matches_note_prefix ( t [ ' name ' ] ) :
continue
2019-02-07 14:57:15 +01:00
yield Hashtag ( t [ ' name ' ] , bookmark = Bookmark . Green )
2018-12-11 15:44:48 +01:00
notes = convert_tags_to_note ( combined_tags )
2018-12-01 20:02:15 +01:00
# special cases
if a [ ' type ' ] in ( ' url ' , ' uri ' ) :
2019-12-17 21:40:18 +01:00
yield ( URL ( url = a [ ' value ' ] , short_title = a [ ' value ' ] , link_label = link_label , notes = notes , bookmark = Bookmark . Green ) )
2018-12-01 20:02:15 +01:00
return
2018-11-12 13:25:53 +01:00
# attribute is from an object, and a relation gives better understanding of the type of attribute
if a . get ( ' object_relation ' ) and mapping_misp_to_maltego . get ( a [ ' object_relation ' ] ) :
entity_obj = mapping_misp_to_maltego [ a [ ' object_relation ' ] ] [ 0 ]
2019-02-07 14:57:15 +01:00
yield entity_obj ( a [ ' value ' ] , labels = [ Label ( ' comment ' , a . get ( ' comment ' ) ) ] , link_label = link_label , notes = notes , bookmark = Bookmark . Green )
2018-11-12 13:25:53 +01:00
# combined attributes
elif ' | ' in a [ ' type ' ] :
t_1 , t_2 = a [ ' type ' ] . split ( ' | ' )
v_1 , v_2 = a [ ' value ' ] . split ( ' | ' )
if t_1 in mapping_misp_to_maltego :
entity_obj = mapping_misp_to_maltego [ t_1 ] [ 0 ]
2018-12-01 20:02:15 +01:00
labels = [ Label ( ' comment ' , a . get ( ' comment ' ) ) ]
2018-11-12 13:25:53 +01:00
if entity_obj == File :
labels . append ( Label ( ' hash ' , v_2 ) )
2019-02-07 14:57:15 +01:00
yield entity_obj_to_entity ( entity_obj , v_1 , t_1 , labels = labels , link_label = link_label , notes = notes , bookmark = Bookmark . Green ) # LATER change the comment to include the second part of the regkey
2018-11-12 13:25:53 +01:00
if t_2 in mapping_misp_to_maltego :
entity_obj = mapping_misp_to_maltego [ t_2 ] [ 0 ]
2018-12-01 20:02:15 +01:00
labels = [ Label ( ' comment ' , a . get ( ' comment ' ) ) ]
2018-11-12 13:25:53 +01:00
if entity_obj == Hash :
labels . append ( Label ( ' filename ' , v_1 ) )
2019-02-07 14:57:15 +01:00
yield entity_obj_to_entity ( entity_obj , v_2 , t_2 , labels = labels , link_label = link_label , notes = notes , bookmark = Bookmark . Green ) # LATER change the comment to include the first part of the regkey
2018-11-12 13:25:53 +01:00
# normal attributes
elif a [ ' type ' ] in mapping_misp_to_maltego :
entity_obj = mapping_misp_to_maltego [ a [ ' type ' ] ] [ 0 ]
2019-02-07 14:57:15 +01:00
yield entity_obj_to_entity ( entity_obj , a [ ' value ' ] , a [ ' type ' ] , labels = [ Label ( ' comment ' , a . get ( ' comment ' ) ) ] , link_label = link_label , notes = notes , bookmark = Bookmark . Green )
2018-11-12 13:25:53 +01:00
2019-05-22 10:29:43 +02:00
# not supported in our maltego mapping are not handled
2018-11-12 13:25:53 +01:00
2018-12-11 12:07:08 +01:00
# LATER : relationships from attributes - not yet supported by MISP yet, but there are references in the datamodel
2018-11-12 13:25:53 +01:00
2019-05-24 16:06:07 +02:00
def object_to_entity ( o , link_label = None , link_direction = LinkDirection . InputToOutput ) :
2018-12-11 12:07:08 +01:00
# Generate a human readable display-name:
# - find the first RequiredOneOf that exists
# - if none, use the first RequiredField
# LATER further finetune the human readable version of this object
misp = get_misp_connection ( )
2019-10-06 16:57:27 +02:00
o_template = misp . get_object_template ( o [ ' template_uuid ' ] )
2018-12-11 12:07:08 +01:00
human_readable = None
try :
found = False
while not found : # the while loop is broken once something is found, or the requiredOneOf has no elements left
required_ote_type = o_template [ ' ObjectTemplate ' ] [ ' requirements ' ] [ ' requiredOneOf ' ] . pop ( 0 )
for ote in o_template [ ' ObjectTemplateElement ' ] :
if ote [ ' object_relation ' ] == required_ote_type :
required_a_type = ote [ ' type ' ]
break
for a in o [ ' Attribute ' ] :
if a [ ' type ' ] == required_a_type :
human_readable = ' {} : {} ' . format ( o [ ' name ' ] , a [ ' value ' ] )
found = True
break
except Exception :
pass
if not human_readable :
try :
found = False
parts = [ ]
for required_ote_type in o_template [ ' ObjectTemplate ' ] [ ' requirements ' ] [ ' required ' ] :
for ote in o_template [ ' ObjectTemplateElement ' ] :
if ote [ ' object_relation ' ] == required_ote_type :
required_a_type = ote [ ' type ' ]
break
for a in o [ ' Attribute ' ] :
if a [ ' type ' ] == required_a_type :
parts . append ( a [ ' value ' ] )
break
human_readable = ' {} : {} ' . format ( o [ ' name ' ] , ' | ' . join ( parts ) )
except Exception :
human_readable = o [ ' name ' ]
pass
2018-11-12 13:25:53 +01:00
return MISPObject (
2018-12-11 12:07:08 +01:00
human_readable ,
2018-11-12 13:25:53 +01:00
uuid = o [ ' uuid ' ] ,
event_id = int ( o [ ' event_id ' ] ) ,
meta_category = o . get ( ' meta_category ' ) ,
2018-12-01 20:02:15 +01:00
description = o . get ( ' description ' ) ,
comment = o . get ( ' comment ' ) ,
2019-02-07 14:57:15 +01:00
link_label = link_label ,
2019-05-24 16:06:07 +02:00
link_direction = link_direction ,
2019-02-07 14:57:15 +01:00
bookmark = Bookmark . Green
2018-11-12 13:25:53 +01:00
)
2018-12-11 12:07:08 +01:00
def object_to_attributes ( o , e ) :
2018-11-12 13:25:53 +01:00
# first process attributes from an object that belong together (eg: first-name + last-name), and remove them from the list
if o [ ' name ' ] == ' person ' :
2019-04-30 22:48:29 +02:00
first_name = get_attribute_in_object ( o , attribute_type = ' first-name ' , drop = True ) . get ( ' value ' )
last_name = get_attribute_in_object ( o , attribute_type = ' last-name ' , drop = True ) . get ( ' value ' )
2019-02-07 14:57:15 +01:00
yield entity_obj_to_entity ( Person , ' ' . join ( [ first_name , last_name ] ) . strip ( ) , ' person ' , lastname = last_name , firstnames = first_name , bookmark = Bookmark . Green )
2018-11-12 13:25:53 +01:00
# process normal attributes
for a in o [ ' Attribute ' ] :
for item in attribute_to_entity ( a ) :
yield item
2019-05-24 16:06:07 +02:00
def object_to_relations ( o , e ) :
# process forward and reverse references, so just loop over all the objects of the event
if ' Object ' in e [ ' Event ' ] :
for eo in e [ ' Event ' ] [ ' Object ' ] :
if ' ObjectReference ' in eo :
for ref in eo [ ' ObjectReference ' ] :
# we have found original object. Expand to the related object and attributes
if eo [ ' uuid ' ] == o [ ' uuid ' ] :
# the reference is an Object
if ref . get ( ' Object ' ) :
# get the full object in the event, as our objectReference included does not contain everything we need
sub_object = get_object_in_event ( ref [ ' Object ' ] [ ' uuid ' ] , e )
yield object_to_entity ( sub_object , link_label = ref [ ' relationship_type ' ] )
# the reference is an Attribute
if ref . get ( ' Attribute ' ) :
ref [ ' Attribute ' ] [ ' event_id ' ] = ref [ ' event_id ' ] # LATER remove this ugly workaround - object can't be requested directly from MISP using the uuid, and to find a full object we need the event_id
for item in attribute_to_entity ( ref [ ' Attribute ' ] , link_label = ref [ ' relationship_type ' ] ) :
yield item
# reverse-lookup - this is another objects relating the original object
if ref [ ' referenced_uuid ' ] == o [ ' uuid ' ] :
yield object_to_entity ( eo , link_label = ref [ ' relationship_type ' ] , link_direction = LinkDirection . OutputToInput )
2018-12-01 20:02:15 +01:00
2018-11-12 13:25:53 +01:00
2018-12-11 12:07:08 +01:00
def get_object_in_event ( uuid , e ) :
for o in e [ ' Event ' ] [ ' Object ' ] :
if o [ ' uuid ' ] == uuid :
return o
2019-04-30 22:48:29 +02:00
def get_attribute_in_object ( o , attribute_type = False , attribute_value = False , drop = False ) :
2018-11-12 13:25:53 +01:00
''' Gets the first attribute of a specific type within an object '''
found_attribute = { ' value ' : ' ' }
for i , a in enumerate ( o [ ' Attribute ' ] ) :
if a [ ' type ' ] == attribute_type :
found_attribute = a . copy ( )
if drop : # drop the attribute from the object
o [ ' Attribute ' ] . pop ( i )
break
2019-04-30 22:48:29 +02:00
if a [ ' value ' ] == attribute_value :
found_attribute = a . copy ( )
if drop : # drop the attribute from the object
o [ ' Attribute ' ] . pop ( i )
if ' | ' in a [ ' type ' ] or a [ ' type ' ] == ' malware-sample ' :
if attribute_value in a [ ' value ' ] . split ( ' | ' ) :
found_attribute = a . copy ( )
if drop : # drop the attribute from the object
o [ ' Attribute ' ] . pop ( i )
2018-11-12 13:25:53 +01:00
return found_attribute
2019-02-08 22:41:33 +01:00
def get_attribute_in_event ( e , attribute_value ) :
for a in e [ ' Event ' ] [ " Attribute " ] :
if a [ ' value ' ] == attribute_value :
return a
2019-04-30 22:48:29 +02:00
if ' | ' in a [ ' type ' ] or a [ ' type ' ] == ' malware-sample ' :
if attribute_value in a [ ' value ' ] . split ( ' | ' ) :
2019-02-08 22:41:33 +01:00
return a
2019-04-30 22:48:29 +02:00
2019-02-08 22:41:33 +01:00
return None
2018-12-11 15:44:48 +01:00
def convert_tags_to_note ( tags ) :
if not tags :
return None
notes = [ ]
for tag in tags :
for tag_note_prefix in tag_note_prefixes :
if tag . startswith ( tag_note_prefix ) :
notes . append ( tag )
return ' \n ' . join ( notes )
2019-02-07 14:03:30 +01:00
def tag_matches_note_prefix ( tag ) :
for tag_note_prefix in tag_note_prefixes :
if tag . startswith ( tag_note_prefix ) :
return True
return False
2019-05-24 16:17:11 +02:00
def event_to_entity ( e , link_style = LinkStyle . Normal , link_direction = LinkDirection . InputToOutput ) :
2018-12-11 15:44:48 +01:00
tags = [ ]
if ' Tag ' in e [ ' Event ' ] :
for t in e [ ' Event ' ] [ ' Tag ' ] :
tags . append ( t [ ' name ' ] )
notes = convert_tags_to_note ( tags )
2019-05-24 16:17:11 +02:00
return MISPEvent (
e [ ' Event ' ] [ ' id ' ] ,
uuid = e [ ' Event ' ] [ ' uuid ' ] ,
info = e [ ' Event ' ] [ ' info ' ] ,
link_style = link_style ,
link_direction = link_direction ,
notes = notes ,
bookmark = Bookmark . Green )
2018-11-12 13:25:53 +01:00
2019-05-02 21:20:28 +02:00
def galaxycluster_to_entity ( c , link_label = None , link_direction = LinkDirection . InputToOutput ) :
2018-12-16 11:27:32 +01:00
if ' meta ' in c and ' uuid ' in c [ ' meta ' ] :
c [ ' uuid ' ] = c [ ' meta ' ] [ ' uuid ' ] . pop ( 0 )
2019-03-26 17:59:11 +01:00
if ' meta ' in c and ' synonyms ' in c [ ' meta ' ] :
synonyms = ' , ' . join ( c [ ' meta ' ] [ ' synonyms ' ] )
else :
synonyms = ' '
2018-12-16 11:27:32 +01:00
galaxy_cluster = get_galaxy_cluster ( c [ ' uuid ' ] )
2019-12-17 21:42:24 +01:00
# map the 'icon' name from the cluster to the icon filename of the intelligence-icons repository
try :
icon_url = mapping_galaxy_icon [ galaxy_cluster [ ' icon ' ] ]
except KeyError :
icon_url = None
# it's not in our mapping, just ignore and leave the default Galaxy icon
pass
2019-03-26 17:59:11 +01:00
2019-12-17 21:42:24 +01:00
# create the right sub-galaxy: ThreatActor, Software, AttackTechnique, ... or MISPGalaxy
try :
galaxy_type = mapping_galaxy_type [ galaxy_cluster [ ' type ' ] ]
except KeyError :
galaxy_type = MISPGalaxy
return galaxy_type (
2018-11-12 13:25:53 +01:00
' {} \n {} ' . format ( c [ ' type ' ] , c [ ' value ' ] ) ,
uuid = c [ ' uuid ' ] ,
2019-03-16 18:31:46 +01:00
description = c . get ( ' description ' ) ,
cluster_type = c . get ( ' type ' ) ,
cluster_value = c . get ( ' value ' ) ,
2018-11-12 13:25:53 +01:00
synonyms = synonyms ,
tag_name = c [ ' tag_name ' ] ,
2018-12-16 11:27:32 +01:00
link_label = link_label ,
2019-05-02 21:20:28 +02:00
icon_url = icon_url ,
link_direction = link_direction
2018-11-12 13:25:53 +01:00
)
2019-05-02 21:20:28 +02:00
# LATER this uses the galaxies from github as the MISP web UI does not fully support the Galaxies in the webui.
2018-11-12 13:25:53 +01:00
# See https://github.com/MISP/MISP/issues/3801
galaxy_archive_url = ' https://github.com/MISP/misp-galaxy/archive/master.zip '
local_path_uuid_mapping = os . path . join ( local_path_root , ' MISP_maltego_galaxy_mapping.json ' )
local_path_clusters = os . path . join ( local_path_root , ' misp-galaxy-master ' , ' clusters ' )
galaxy_cluster_uuids = None
def galaxy_update_local_copy ( force = False ) :
import io
import json
import os
import requests
from zipfile import ZipFile
2018-11-21 09:32:58 +01:00
# some aging and automatic re-downloading
if not os . path . exists ( local_path_uuid_mapping ) :
force = True
else :
2019-10-12 08:13:17 +02:00
# force update if cache is older than 24 hours
2018-11-21 09:32:58 +01:00
if time . time ( ) - os . path . getmtime ( local_path_uuid_mapping ) > 60 * 60 * 24 :
force = True
2018-11-12 13:25:53 +01:00
if force :
# download the latest zip of the public galaxy
2019-04-30 10:48:14 +02:00
try :
resp = requests . get ( galaxy_archive_url )
zf = ZipFile ( io . BytesIO ( resp . content ) )
zf . extractall ( local_path_root )
zf . close ( )
except Exception :
raise ( MaltegoException ( " ERROR: Could not download Galaxy data from htts://github.com/MISP/MISP-galaxy/. Please check internet connectivity. " ) )
2018-11-12 13:25:53 +01:00
# generate the uuid mapping and save it to a file
galaxies_fnames = [ ]
for f in os . listdir ( local_path_clusters ) :
if ' .json ' in f :
galaxies_fnames . append ( f )
galaxies_fnames . sort ( )
cluster_uuids = { }
for galaxy_fname in galaxies_fnames :
2019-10-20 18:48:24 +02:00
try :
fullPathClusters = os . path . join ( local_path_clusters , galaxy_fname )
with open ( fullPathClusters ) as fp :
galaxy = json . load ( fp )
with open ( fullPathClusters . replace ( ' clusters ' , ' galaxies ' ) ) as fg :
galaxy_main = json . load ( fg )
for cluster in galaxy [ ' values ' ] :
if ' uuid ' not in cluster :
continue
# skip deprecated galaxies/clusters
if galaxy_main [ ' namespace ' ] == ' deprecated ' :
continue
# keep track of the cluster, but also enhance it to look like the cluster we receive when accessing the web.
cluster_uuids [ cluster [ ' uuid ' ] ] = cluster
cluster_uuids [ cluster [ ' uuid ' ] ] [ ' type ' ] = galaxy [ ' type ' ]
cluster_uuids [ cluster [ ' uuid ' ] ] [ ' tag_name ' ] = ' misp-galaxy: {} = " {} " ' . format ( galaxy [ ' type ' ] , cluster [ ' value ' ] )
if ' icon ' in galaxy_main :
cluster_uuids [ cluster [ ' uuid ' ] ] [ ' icon ' ] = galaxy_main [ ' icon ' ]
except Exception :
# we ignore incorrect galaxies
pass
2018-11-12 13:25:53 +01:00
with open ( local_path_uuid_mapping , ' w ' ) as f :
json . dump ( cluster_uuids , f , sort_keys = True , indent = 4 )
def galaxy_load_cluster_mapping ( ) :
galaxy_update_local_copy ( )
with open ( local_path_uuid_mapping , ' r ' ) as f :
cluster_uuids = json . load ( f )
return cluster_uuids
2019-03-16 21:00:56 +01:00
def get_galaxy_cluster ( uuid = None , tag = None ) :
global galaxy_cluster_uuids
if not galaxy_cluster_uuids :
galaxy_cluster_uuids = galaxy_load_cluster_mapping ( )
if uuid :
return galaxy_cluster_uuids . get ( uuid )
if tag :
for item in galaxy_cluster_uuids . values ( ) :
if item [ ' tag_name ' ] == tag :
return item
2019-05-02 20:37:37 +02:00
def search_galaxy_cluster ( keyword ) :
keyword = keyword . lower ( )
global galaxy_cluster_uuids
if not galaxy_cluster_uuids :
galaxy_cluster_uuids = galaxy_load_cluster_mapping ( )
for item in galaxy_cluster_uuids . values ( ) :
if keyword in item [ ' tag_name ' ] . lower ( ) :
yield item
else :
if ' meta ' in item and ' synonyms ' in item [ ' meta ' ] :
for synonym in item [ ' meta ' ] [ ' synonyms ' ] :
if keyword in synonym . lower ( ) :
yield item
2019-03-16 21:00:56 +01:00
def get_galaxies_relating ( uuid ) :
2018-11-12 13:25:53 +01:00
global galaxy_cluster_uuids
if not galaxy_cluster_uuids :
galaxy_cluster_uuids = galaxy_load_cluster_mapping ( )
2019-03-16 21:00:56 +01:00
for item in galaxy_cluster_uuids . values ( ) :
if ' related ' in item :
for related in item [ ' related ' ] :
if related [ ' dest-uuid ' ] == uuid :
yield item