misp-galaxy/tools/graph.py

196 lines
7.1 KiB
Python
Raw Normal View History

2018-10-19 14:30:05 +02:00
#!/usr/bin/env python3
# TODO
# - define strength between relations based on 'type' - similar should be closer than the others
# - use different colors / shapes
import json
import os
import argparse
from graphviz import Digraph
parser = argparse.ArgumentParser(description='Generate a DOT file to graph a Galaxy cluster and its relations.')
parser.add_argument("-u", "--uuid", help="Start UUID of a cluster.")
parser.add_argument("-a", "--all", action='store_true', help='generate all graphs as PNGs')
args = parser.parse_args()
def gen_galaxy_tag(galaxy_name, cluster_name):
# return 'misp-galaxy:{}="{}"'.format(galaxy_name, cluster_name)
return '{}={}'.format(galaxy_name, cluster_name)
files_to_ignore = ['mitre-attack-pattern.json', 'mitre-course-of-action.json', 'mitre-intrusion-set.json',
'mitre-malware.json', 'mitre-tool.json']
galaxies_fnames = []
pathClusters = '../clusters'
for f in os.listdir(pathClusters):
if '.json' in f and f not in files_to_ignore:
galaxies_fnames.append(f)
galaxies_fnames.sort()
cluster_uuids = {}
galaxies = []
for galaxy_fname in galaxies_fnames:
fullPathClusters = os.path.join(pathClusters, galaxy_fname)
with open(fullPathClusters) as fp:
json_data = json.load(fp)
galaxies.append(json_data)
for cluster in json_data['values']:
if 'uuid' not in cluster:
continue
cluster_uuids[cluster['uuid']] = {
'tag': gen_galaxy_tag(json_data['type'], cluster['value']),
'galaxy': json_data['type'],
'value': cluster['value'],
'synonyms': cluster.get('synonyms')
}
# for k, v in cluster_uuids.items():
# print("{}\t{}".format(k, v))
type_mapping = {
'ransomware': 'tool',
# 'mitre-pre-attack-relationship': '',
# 'mitre-enterprise-attack-course-of-action': '',
'mitre-enterprise-attack-intrusion-set': 'actor',
'mitre-intrusion-set': 'actor',
'rat': 'tool',
'stealer': 'tool',
'mitre-enterprise-attack-malware': 'tool',
# 'mitre-attack-pattern': '',
# 'mitre-mobile-attack-relationship': '',
# 'mitre-enterprise-attack-attack-pattern': '',
'microsoft-activity-group': 'actor',
# 'mitre-course-of-action': '',
'exploit-kit': 'tool',
'mitre-mobile-attack-tool': 'tool',
'backdoor': 'tool',
# 'mitre-pre-attack-attack-pattern': '',
'mitre-mobile-attack-intrusion-set': 'actor',
'mitre-tool': 'tool',
# 'mitre-mobile-attack-attack-pattern': '',
'mitre-mobile-attack-malware': 'tool',
'tool': 'tool',
# 'preventive-measure': '',
# 'sector': '',
'mitre-malware': 'tool',
'banker': 'tool',
# 'branded-vulnerability': '',
'botnet': 'tool',
# 'cert-eu-govsector': '',
'threat-actor': 'actor',
'mitre-enterprise-attack-tool': 'tool',
'android': 'tool',
# 'mitre-mobile-attack-course-of-action': '',
'mitre-pre-attack-intrusion-set': 'actor',
# 'mitre-enterprise-attack-relationship': '',
'tds': 'tool',
'malpedia': 'tool'
}
def gen_dot(uuid):
things_to_keep = [uuid] # '5b4ee3ea-eee3-4c8e-8323-85ae32658754' = threat-actor=Sofacy
# ' 5e0a7cf2-6107-4d5f-9dd0-9df38b1fcba8' = APT30
things_seen = things_to_keep.copy()
dot = []
while len(things_to_keep) > 0:
new_things_to_keep = []
for galaxy in galaxies:
for cluster in galaxy['values']:
if 'related' not in cluster:
continue
src_tag = gen_galaxy_tag(galaxy['type'], cluster['value'])
if cluster['uuid'] not in things_to_keep:
continue
node_params = []
node_params.append('label="{}\n{}"'.format(galaxy['type'], cluster['value']))
if type_mapping.get(galaxy['type']) == 'actor':
node_params.append('shape=octagon')
node_params.append('style=filled,color=indianred1')
elif type_mapping.get(galaxy['type']) == 'tool':
node_params.append('shape=box')
node_params.append('style=filled,color=deepskyblue')
else:
node_params.append('shape=ellipse')
2018-10-19 14:30:05 +02:00
dot.append('"{src}" [{params}];'.format(
src=src_tag,
params=','.join(node_params)
))
for relation in cluster['related']:
try:
dest_tag = cluster_uuids[relation['dest-uuid']]['tag']
extra = []
if relation['type'] == 'similar':
# make arrow bidirectional
extra.append('dir="both"')
# prevent double links for 'similar' types
if relation['dest-uuid'] in things_seen:
continue
dot.append('"{src}" -> "{dst}" [label="{lbl}",{extra}];'.format(
# dot.append('"{src}" -> "{dst}" [{extra}];'.format(
src=src_tag,
dst=dest_tag,
lbl=relation['type'],
extra=','.join(extra)
))
# FIXME - add a separate node with the color, type, format of the source-node
# prevent something to be processed twice
if relation['dest-uuid'] not in things_seen:
new_things_to_keep.append(relation['dest-uuid'])
things_seen.append(relation['dest-uuid'])
except KeyError:
# skip uuids not found
pass
# print(new_things_to_keep)
things_to_keep = new_things_to_keep.copy()
2018-10-19 14:30:05 +02:00
return dot
if args.uuid:
uuid = args.uuid
dot = []
# dot.append('digraph G {')
dot.append('concentrate=true;')
dot.append('overlap=scale;')
generated_dot = gen_dot(uuid)
if len(generated_dot) == 0:
print("Empty graph for uuid: {}".format(uuid))
exit()
2018-10-19 14:30:05 +02:00
print("Generating graph for uuid: {}".format(uuid))
dot += generated_dot
# dot.append('}')
# dg.source = '\n'.join(dot)
dg = Digraph(engine='neato', format='png', body=dot)
# print(dg.source)
dg.render(filename='graphs/{}'.format(uuid), cleanup=False)
elif args.all:
for uuid in cluster_uuids.keys():
dot = []
# dot.append('digraph G {')
dot.append('concentrate=true;')
dot.append('overlap=scale;')
generated_dot = gen_dot(uuid)
if len(generated_dot) == 0:
print("Empty graph for uuid: {}".format(uuid))
continue
2018-10-19 14:30:05 +02:00
print("Generating graph for uuid: {}".format(uuid))
dot += generated_dot
# dot.append('}')
# dg.source = '\n'.join(dot)
dg = Digraph(format='png', body=dot)
#print(dg.source)
dg.render(filename='graphs/{}'.format(uuid))
2018-10-19 14:30:05 +02:00
else:
exit("No parameters given, use --help for more info.")