mirror of https://github.com/MISP/misp-galaxy
207 lines
7.5 KiB
Python
Executable File
207 lines
7.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
'''
|
|
Author: Christophe Vandeplas
|
|
License: AGPL v3
|
|
|
|
This builds an automatic mapping between the galaxy clusters of the same type.
|
|
The mapping is made by using the synonyms documented in each cluster.
|
|
|
|
The output is saved in the cluster files themselves, if a change is done the version number is increased.
|
|
(commented out) The output is saved in a file called "mapping_XXX.json".
|
|
|
|
# TODO add a blacklist support for blacklisted mappings
|
|
'''
|
|
import json
|
|
import os
|
|
|
|
|
|
# Some galaxy clusters have overlapping synonyms, while not being of the same type.
|
|
# This type_mapping is there to distinguish galaxies based on their type.
|
|
# Example: A galaxy of type 'actor' should not map to a galaxy of type 'tool', even if the name/synonym is the same.
|
|
type_mapping = {
|
|
'ransomware': 'tool',
|
|
# 'mitre-pre-attack-relationship': '',
|
|
# 'mitre-enterprise-attack-course-of-action': '',
|
|
'mitre-enterprise-attack-intrusion-set': 'actor',
|
|
'mitre-intrusion-set': 'actor',
|
|
'rat': 'tool',
|
|
'stealer': 'tool',
|
|
'mitre-enterprise-attack-malware': 'tool',
|
|
# 'mitre-attack-pattern': '',
|
|
# 'mitre-mobile-attack-relationship': '',
|
|
# 'mitre-enterprise-attack-attack-pattern': '',
|
|
'microsoft-activity-group': 'actor',
|
|
# 'mitre-course-of-action': '',
|
|
'exploit-kit': 'tool',
|
|
'mitre-mobile-attack-tool': 'tool',
|
|
'backdoor': 'tool',
|
|
# 'mitre-pre-attack-attack-pattern': '',
|
|
'mitre-mobile-attack-intrusion-set': 'tool',
|
|
'mitre-tool': 'tool',
|
|
# 'mitre-mobile-attack-attack-pattern': '',
|
|
'mitre-mobile-attack-malware': 'tool',
|
|
'tool': 'tool',
|
|
# 'preventive-measure': '',
|
|
# 'sector': '',
|
|
'mitre-malware': 'tool',
|
|
'banker': 'tool',
|
|
# 'branded-vulnerability': '',
|
|
'botnet': 'tool',
|
|
# 'cert-eu-govsector': '',
|
|
'threat-actor': 'actor',
|
|
'mitre-enterprise-attack-tool': 'tool',
|
|
'android': 'tool',
|
|
# 'mitre-mobile-attack-course-of-action': '',
|
|
'mitre-pre-attack-intrusion-set': 'actor',
|
|
# 'mitre-enterprise-attack-relationship': '',
|
|
'tds': 'tool'
|
|
}
|
|
|
|
|
|
def loadjsons(path):
|
|
"""
|
|
Find all Jsons and load them in a dict
|
|
"""
|
|
files = []
|
|
data = []
|
|
for name in os.listdir(path):
|
|
if os.path.isfile(os.path.join(path, name)) and name.endswith('.json'):
|
|
files.append(name)
|
|
for jfile in files:
|
|
data.append(json.load(open("%s/%s" % (path, jfile))))
|
|
return data
|
|
|
|
|
|
def printjson(s):
|
|
print(json.dumps(s, sort_keys=True, indent=4, separators=(',', ': ')))
|
|
|
|
|
|
def to_tag(t, v):
|
|
return 'misp-galaxy:{}="{}"'.format(t, v)
|
|
|
|
|
|
def get_cluster_uuid(cluster):
|
|
uuid = cluster.get('uuid')
|
|
if not uuid: # FIXME are these bugs in the format? - mitre-tool.json
|
|
uuid = cluster['meta'].get('uuid')
|
|
if not uuid:
|
|
print(cluster)
|
|
exit("ERROR: missing UUID in cluster")
|
|
return uuid
|
|
|
|
|
|
if __name__ == '__main__':
|
|
path = '../clusters'
|
|
jsons = loadjsons(path)
|
|
mappings = {}
|
|
for k, v in type_mapping.items():
|
|
if v not in mappings:
|
|
mappings[v] = []
|
|
|
|
for djson in jsons:
|
|
galaxy = djson['type']
|
|
|
|
# ignore the galaxies that are not relevant for us
|
|
if galaxy not in type_mapping:
|
|
continue
|
|
|
|
# process the entries in each cluster
|
|
clusters = djson.get('values')
|
|
for cluster in clusters:
|
|
names = [cluster['value']]
|
|
|
|
if 'meta' in cluster and 'synonyms' in cluster['meta']:
|
|
names += [s for s in cluster['meta']['synonyms']]
|
|
|
|
# check if the entry is already in our mappings dict
|
|
seen_once = False
|
|
for mapping in mappings[type_mapping[galaxy]]:
|
|
seen = False
|
|
# name is known, add the synonyms and tags
|
|
for name in names:
|
|
if name in mapping['names']:
|
|
seen = True
|
|
seen_once = True
|
|
# we have a match in this mapping, add name and synonyms
|
|
if seen:
|
|
for name in names:
|
|
if name not in mapping['names']:
|
|
mapping['names'].append(name)
|
|
tag = to_tag(galaxy, cluster['value'])
|
|
if tag not in mapping['values']:
|
|
mapping['values'].append(tag)
|
|
uuid = get_cluster_uuid(cluster)
|
|
if uuid not in mapping['uuids']:
|
|
mapping['uuids'].append(uuid)
|
|
|
|
# it's not in any mapping, add it
|
|
if not seen_once:
|
|
mapping = {}
|
|
mapping['names'] = names
|
|
mapping['values'] = [to_tag(galaxy, cluster['value'])]
|
|
uuid = get_cluster_uuid(cluster)
|
|
mapping['uuids'] = [uuid]
|
|
mappings[type_mapping[galaxy]].append(mapping)
|
|
|
|
# We have our nice mapping.
|
|
# Now we only need to add it again in the original files.
|
|
for name in os.listdir(path):
|
|
# skip files that are not relevant
|
|
if not (os.path.isfile(os.path.join(path, name)) and name.endswith('.json')):
|
|
continue
|
|
|
|
# load json
|
|
with open(os.path.join(path, name), 'r') as f_in:
|
|
file_json = json.load(f_in)
|
|
galaxy = file_json['type']
|
|
|
|
# ignore the galaxies that are not relevant for us
|
|
if galaxy not in type_mapping:
|
|
continue
|
|
|
|
changed = False
|
|
for cluster in file_json['values']:
|
|
for mapping in mappings[type_mapping[galaxy]]:
|
|
cluster_uuid = get_cluster_uuid(cluster)
|
|
if cluster_uuid not in mapping['uuids']:
|
|
continue
|
|
# uuid is in the mappings
|
|
for uuid in mapping['uuids']:
|
|
# skip self
|
|
if uuid == cluster_uuid:
|
|
continue
|
|
# skip existing entries
|
|
if 'related' in cluster:
|
|
if any(v['dest-uuid'] == uuid for v in cluster['related']):
|
|
continue
|
|
# initialize array
|
|
if 'related' not in cluster:
|
|
cluster['related'] = []
|
|
# automated things are set to likely
|
|
# manual validation can upgrade to very-likely or almost-certain
|
|
cluster['related'].append({"dest-uuid": uuid,
|
|
"type": "similar",
|
|
"tags": [
|
|
"estimative-language:likelihood-probability=\"likely\""
|
|
]
|
|
})
|
|
changed = True
|
|
if changed:
|
|
file_json['version'] += 1
|
|
|
|
# save result to the original file
|
|
with open(os.path.join(path, name), 'w') as f_out:
|
|
json.dump(file_json, f_out, indent=2, sort_keys=True, ensure_ascii=False)
|
|
|
|
print("Updated file {}".format(name))
|
|
print("All done, please don't forget to ./validate_all.sh and ./jq_all_the_things.sh")
|
|
|
|
# # simply dump the mapping_json to files. This is not really needed anymore
|
|
# for galaxy_type, vals in mappings.items():
|
|
# for mapping in vals:
|
|
# mapping['names'].sort()
|
|
# mapping['values'].sort()
|
|
# with open('mapping_{}.json'.format(galaxy_type), 'w') as f:
|
|
# json.dump(vals, f, sort_keys=True, indent=4, separators=(',', ': '))
|
|
# print("File saved as mapping_{}.json".format(galaxy_type))
|