misp-galaxy/tools/gen_mapping.py

207 lines
7.5 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
'''
Author: Christophe Vandeplas
License: AGPL v3
This builds an automatic mapping between the galaxy clusters of the same type.
The mapping is made by using the synonyms documented in each cluster.
The output is saved in the cluster files themselves, if a change is done the version number is increased.
(commented out) The output is saved in a file called "mapping_XXX.json".
2018-08-14 09:38:38 +02:00
# TODO add a blacklist support for blacklisted mappings
'''
import json
import os
# Some galaxy clusters have overlapping synonyms, while not being of the same type.
# This type_mapping is there to distinguish galaxies based on their type.
# Example: A galaxy of type 'actor' should not map to a galaxy of type 'tool', even if the name/synonym is the same.
type_mapping = {
'ransomware': 'tool',
# 'mitre-pre-attack-relationship': '',
# 'mitre-enterprise-attack-course-of-action': '',
'mitre-enterprise-attack-intrusion-set': 'actor',
'mitre-intrusion-set': 'actor',
'rat': 'tool',
'stealer': 'tool',
'mitre-enterprise-attack-malware': 'tool',
# 'mitre-attack-pattern': '',
# 'mitre-mobile-attack-relationship': '',
# 'mitre-enterprise-attack-attack-pattern': '',
'microsoft-activity-group': 'actor',
# 'mitre-course-of-action': '',
'exploit-kit': 'tool',
'mitre-mobile-attack-tool': 'tool',
'backdoor': 'tool',
# 'mitre-pre-attack-attack-pattern': '',
'mitre-mobile-attack-intrusion-set': 'tool',
'mitre-tool': 'tool',
# 'mitre-mobile-attack-attack-pattern': '',
'mitre-mobile-attack-malware': 'tool',
'tool': 'tool',
# 'preventive-measure': '',
# 'sector': '',
'mitre-malware': 'tool',
'banker': 'tool',
# 'branded-vulnerability': '',
'botnet': 'tool',
# 'cert-eu-govsector': '',
'threat-actor': 'actor',
'mitre-enterprise-attack-tool': 'tool',
'android': 'tool',
# 'mitre-mobile-attack-course-of-action': '',
'mitre-pre-attack-intrusion-set': 'actor',
# 'mitre-enterprise-attack-relationship': '',
'tds': 'tool'
}
def loadjsons(path):
"""
Find all Jsons and load them in a dict
"""
files = []
data = []
for name in os.listdir(path):
if os.path.isfile(os.path.join(path, name)) and name.endswith('.json'):
files.append(name)
for jfile in files:
data.append(json.load(open("%s/%s" % (path, jfile))))
return data
def printjson(s):
print(json.dumps(s, sort_keys=True, indent=4, separators=(',', ': ')))
def to_tag(t, v):
return 'misp-galaxy:{}="{}"'.format(t, v)
def get_cluster_uuid(cluster):
uuid = cluster.get('uuid')
if not uuid: # FIXME are these bugs in the format? - mitre-tool.json
uuid = cluster['meta'].get('uuid')
if not uuid:
print(cluster)
exit("ERROR: missing UUID in cluster")
return uuid
if __name__ == '__main__':
path = '../clusters'
jsons = loadjsons(path)
mappings = {}
for k, v in type_mapping.items():
if v not in mappings:
mappings[v] = []
for djson in jsons:
galaxy = djson['type']
# ignore the galaxies that are not relevant for us
if galaxy not in type_mapping:
continue
# process the entries in each cluster
clusters = djson.get('values')
for cluster in clusters:
names = [cluster['value']]
if 'meta' in cluster and 'synonyms' in cluster['meta']:
names += [s for s in cluster['meta']['synonyms']]
# check if the entry is already in our mappings dict
seen_once = False
for mapping in mappings[type_mapping[galaxy]]:
seen = False
# name is known, add the synonyms and tags
for name in names:
if name in mapping['names']:
seen = True
seen_once = True
# we have a match in this mapping, add name and synonyms
if seen:
for name in names:
if name not in mapping['names']:
mapping['names'].append(name)
tag = to_tag(galaxy, cluster['value'])
if tag not in mapping['values']:
mapping['values'].append(tag)
uuid = get_cluster_uuid(cluster)
if uuid not in mapping['uuids']:
mapping['uuids'].append(uuid)
# it's not in any mapping, add it
if not seen_once:
mapping = {}
mapping['names'] = names
mapping['values'] = [to_tag(galaxy, cluster['value'])]
uuid = get_cluster_uuid(cluster)
mapping['uuids'] = [uuid]
mappings[type_mapping[galaxy]].append(mapping)
# We have our nice mapping.
# Now we only need to add it again in the original files.
for name in os.listdir(path):
# skip files that are not relevant
if not (os.path.isfile(os.path.join(path, name)) and name.endswith('.json')):
continue
# load json
with open(os.path.join(path, name), 'r') as f_in:
file_json = json.load(f_in)
galaxy = file_json['type']
# ignore the galaxies that are not relevant for us
if galaxy not in type_mapping:
continue
changed = False
for cluster in file_json['values']:
for mapping in mappings[type_mapping[galaxy]]:
cluster_uuid = get_cluster_uuid(cluster)
if cluster_uuid not in mapping['uuids']:
continue
# uuid is in the mappings
for uuid in mapping['uuids']:
# skip self
if uuid == cluster_uuid:
continue
# skip existing entries
if 'related' in cluster:
if any(v['dest-uuid'] == uuid for v in cluster['related']):
continue
# initialize array
if 'related' not in cluster:
cluster['related'] = []
# automated things are set to likely
# manual validation can upgrade to very-likely or almost-certain
cluster['related'].append({"dest-uuid": uuid,
"type": "similar",
"tags": [
"estimative-language:likelihood-probability=\"likely\""
]
})
changed = True
if changed:
file_json['version'] += 1
# save result to the original file
with open(os.path.join(path, name), 'w') as f_out:
json.dump(file_json, f_out, indent=2, sort_keys=True, ensure_ascii=False)
print("Updated file {}".format(name))
print("All done, please don't forget to ./validate_all.sh and ./jq_all_the_things.sh")
# # simply dump the mapping_json to files. This is not really needed anymore
# for galaxy_type, vals in mappings.items():
# for mapping in vals:
# mapping['names'].sort()
# mapping['values'].sort()
# with open('mapping_{}.json'.format(galaxy_type), 'w') as f:
# json.dump(vals, f, sort_keys=True, indent=4, separators=(',', ': '))
# print("File saved as mapping_{}.json".format(galaxy_type))