mirror of https://github.com/MISP/misp-galaxy
				
				
				
			
		
			
				
	
	
		
			196 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
			
		
		
	
	
			196 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
| #!/usr/bin/env python3
 | |
| # TODO
 | |
| # - define strength between relations based on 'type' - similar should be closer than the others
 | |
| # - use different colors / shapes
 | |
| 
 | |
| import json
 | |
| import os
 | |
| import argparse
 | |
| from graphviz import Digraph
 | |
| 
 | |
| 
 | |
| parser = argparse.ArgumentParser(description='Generate a DOT file to graph a Galaxy cluster and its relations.')
 | |
| parser.add_argument("-u", "--uuid", help="Start UUID of a cluster.")
 | |
| parser.add_argument("-a", "--all", action='store_true', help='generate all graphs as PNGs')
 | |
| args = parser.parse_args()
 | |
| 
 | |
| 
 | |
| def gen_galaxy_tag(galaxy_name, cluster_name):
 | |
|     # return 'misp-galaxy:{}="{}"'.format(galaxy_name, cluster_name)
 | |
|     return '{}={}'.format(galaxy_name, cluster_name)
 | |
| 
 | |
| files_to_ignore = ['mitre-attack-pattern.json', 'mitre-course-of-action.json', 'mitre-intrusion-set.json',
 | |
|                    'mitre-malware.json', 'mitre-tool.json']
 | |
| 
 | |
| galaxies_fnames = []
 | |
| pathClusters = '../clusters'
 | |
| for f in os.listdir(pathClusters):
 | |
|     if '.json' in f and f not in files_to_ignore:
 | |
|         galaxies_fnames.append(f)
 | |
| galaxies_fnames.sort()
 | |
| 
 | |
| cluster_uuids = {}
 | |
| galaxies = []
 | |
| for galaxy_fname in galaxies_fnames:
 | |
|     fullPathClusters = os.path.join(pathClusters, galaxy_fname)
 | |
|     with open(fullPathClusters) as fp:
 | |
|         json_data = json.load(fp)
 | |
|     galaxies.append(json_data)
 | |
|     for cluster in json_data['values']:
 | |
|         if 'uuid' not in cluster:
 | |
|             continue
 | |
|         cluster_uuids[cluster['uuid']] = {
 | |
|             'tag': gen_galaxy_tag(json_data['type'], cluster['value']),
 | |
|             'galaxy': json_data['type'],
 | |
|             'value': cluster['value'],
 | |
|             'synonyms': cluster.get('synonyms')
 | |
|         }
 | |
| 
 | |
| 
 | |
| 
 | |
| # for k, v in cluster_uuids.items():
 | |
| #     print("{}\t{}".format(k, v))
 | |
| 
 | |
| 
 | |
| type_mapping = {
 | |
|     'ransomware': 'tool',
 | |
|     # 'mitre-pre-attack-relationship': '',
 | |
|     # 'mitre-enterprise-attack-course-of-action': '',
 | |
|     'mitre-enterprise-attack-intrusion-set': 'actor',
 | |
|     'mitre-intrusion-set': 'actor',
 | |
|     'rat': 'tool',
 | |
|     'stealer': 'tool',
 | |
|     'mitre-enterprise-attack-malware': 'tool',
 | |
|     # 'mitre-attack-pattern': '',
 | |
|     # 'mitre-mobile-attack-relationship': '',
 | |
|     # 'mitre-enterprise-attack-attack-pattern': '',
 | |
|     'microsoft-activity-group': 'actor',
 | |
|     # 'mitre-course-of-action': '',
 | |
|     'exploit-kit': 'tool',
 | |
|     'mitre-mobile-attack-tool': 'tool',
 | |
|     'backdoor': 'tool',
 | |
|     # 'mitre-pre-attack-attack-pattern': '',
 | |
|     'mitre-mobile-attack-intrusion-set': 'actor',
 | |
|     'mitre-tool': 'tool',
 | |
|     # 'mitre-mobile-attack-attack-pattern': '',
 | |
|     'mitre-mobile-attack-malware': 'tool',
 | |
|     'tool': 'tool',
 | |
|     # 'preventive-measure': '',
 | |
|     # 'sector': '',
 | |
|     'mitre-malware': 'tool',
 | |
|     'banker': 'tool',
 | |
|     # 'branded-vulnerability': '',
 | |
|     'botnet': 'tool',
 | |
|     # 'cert-eu-govsector': '',
 | |
|     'threat-actor': 'actor',
 | |
|     'mitre-enterprise-attack-tool': 'tool',
 | |
|     'android': 'tool',
 | |
|     # 'mitre-mobile-attack-course-of-action': '',
 | |
|     'mitre-pre-attack-intrusion-set': 'actor',
 | |
|     # 'mitre-enterprise-attack-relationship': '',
 | |
|     'tds': 'tool',
 | |
|     'malpedia': 'tool'
 | |
| }
 | |
| 
 | |
| 
 | |
| def gen_dot(uuid):
 | |
|     things_to_keep = [uuid]  # '5b4ee3ea-eee3-4c8e-8323-85ae32658754' = threat-actor=Sofacy
 | |
|                              # ' 5e0a7cf2-6107-4d5f-9dd0-9df38b1fcba8' = APT30
 | |
|     things_seen = things_to_keep.copy()
 | |
| 
 | |
|     dot = []
 | |
|     while len(things_to_keep) > 0:
 | |
|         new_things_to_keep = []
 | |
|         for galaxy in galaxies:
 | |
|             for cluster in galaxy['values']:
 | |
|                 if 'related' not in cluster:
 | |
|                     continue
 | |
|                 src_tag = gen_galaxy_tag(galaxy['type'], cluster['value'])
 | |
|                 if cluster['uuid'] not in things_to_keep:
 | |
|                     continue
 | |
|                 node_params = []
 | |
|                 node_params.append('label="{}\n{}"'.format(galaxy['type'], cluster['value']))
 | |
|                 if type_mapping.get(galaxy['type']) == 'actor':
 | |
|                     node_params.append('shape=octagon')
 | |
|                     node_params.append('style=filled,color=indianred1')
 | |
|                 elif type_mapping.get(galaxy['type']) == 'tool':
 | |
|                     node_params.append('shape=box')
 | |
|                     node_params.append('style=filled,color=deepskyblue')
 | |
|                 else:
 | |
|                     node_params.append('shape=ellipse')
 | |
|                 dot.append('"{src}" [{params}];'.format(
 | |
|                             src=src_tag,
 | |
|                             params=','.join(node_params)
 | |
|                         ))
 | |
|                 for relation in cluster['related']:
 | |
|                     try:
 | |
|                         dest_tag = cluster_uuids[relation['dest-uuid']]['tag']
 | |
|                         extra = []
 | |
|                         if relation['type'] == 'similar':
 | |
|                             # make arrow bidirectional
 | |
|                             extra.append('dir="both"')
 | |
|                             # prevent double links for 'similar' types
 | |
|                             if relation['dest-uuid'] in things_seen:
 | |
|                                 continue
 | |
|                         dot.append('"{src}" -> "{dst}" [label="{lbl}",{extra}];'.format(
 | |
|                         # dot.append('"{src}" -> "{dst}" [{extra}];'.format(
 | |
|                             src=src_tag,
 | |
|                             dst=dest_tag,
 | |
|                             lbl=relation['type'],
 | |
|                             extra=','.join(extra)
 | |
|                         ))
 | |
|                         # FIXME - add a separate node with the color, type, format of the source-node
 | |
| 
 | |
|                         # prevent something to be processed twice
 | |
|                         if relation['dest-uuid'] not in things_seen:
 | |
|                             new_things_to_keep.append(relation['dest-uuid'])
 | |
|                         things_seen.append(relation['dest-uuid'])
 | |
|                     except KeyError:
 | |
|                         # skip uuids not found
 | |
|                         pass
 | |
|         # print(new_things_to_keep)
 | |
|         things_to_keep = new_things_to_keep.copy()
 | |
| 
 | |
| 
 | |
|     return dot
 | |
| 
 | |
| if args.uuid:
 | |
|     uuid = args.uuid
 | |
|     dot = []
 | |
|     # dot.append('digraph G {')
 | |
|     dot.append('concentrate=true;')
 | |
|     dot.append('overlap=scale;')
 | |
|     generated_dot = gen_dot(uuid)
 | |
|     if len(generated_dot) == 0:
 | |
|         print("Empty graph for uuid: {}".format(uuid))
 | |
|         exit()
 | |
|     print("Generating graph for uuid: {}".format(uuid))
 | |
|     dot += generated_dot
 | |
|     # dot.append('}')
 | |
|     # dg.source = '\n'.join(dot)
 | |
|     dg = Digraph(engine='neato', format='png', body=dot)
 | |
|     # print(dg.source)
 | |
|     dg.render(filename='graphs/{}'.format(uuid), cleanup=False)
 | |
| 
 | |
| elif args.all:
 | |
|     for uuid in cluster_uuids.keys():
 | |
|         dot = []
 | |
|         # dot.append('digraph G {')
 | |
|         dot.append('concentrate=true;')
 | |
|         dot.append('overlap=scale;')
 | |
|         generated_dot = gen_dot(uuid)
 | |
|         if len(generated_dot) == 0:
 | |
|             print("Empty      graph for uuid: {}".format(uuid))
 | |
|             continue
 | |
| 
 | |
|         print("Generating graph for uuid: {}".format(uuid))
 | |
|         dot += generated_dot
 | |
|         # dot.append('}')
 | |
|         # dg.source = '\n'.join(dot)
 | |
| 
 | |
|         dg = Digraph(format='png', body=dot)
 | |
|         #print(dg.source)
 | |
|         dg.render(filename='graphs/{}'.format(uuid))
 | |
| else:
 | |
|     exit("No parameters given, use --help for more info.")
 |