mirror of https://github.com/MISP/misp-galaxy
new: [tool] Initial version of a Relationship generator.
parent
bf7c5f1dd9
commit
bea5fda2ab
|
@ -0,0 +1,156 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
#
|
||||
# Facilitates the creation and maintenance of relationships.
|
||||
# Copyright (C) 2022 MISP Project
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
|
||||
|
||||
def relation_exists(cluster_a, cluster_b):
|
||||
"""
|
||||
Checks if there is already a relationship from a to b.
|
||||
Note: you might want to run this function from a to b and from b to a.
|
||||
"""
|
||||
try:
|
||||
for rel in cluster_a['related']:
|
||||
if cluster_b['uuid'] == rel['dest-uuid']:
|
||||
return True
|
||||
except KeyError: # no relations yet
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
def create_relation(cluster_a, cluster_b, rel_type="similar", tags=None):
|
||||
"""
|
||||
Creates unidirectional relationship, with a (optional) tags
|
||||
"""
|
||||
if not relation_exists(cluster_a, cluster_b):
|
||||
rel = {"dest-uuid": cluster_b['uuid'],
|
||||
"type": rel_type}
|
||||
if tags:
|
||||
rel["tags"] = tags
|
||||
if 'related' not in cluster_a:
|
||||
cluster_a['related'] = []
|
||||
cluster_a['related'].append(rel)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class AtLeastTwoItemsAction(argparse.Action):
|
||||
def __call__(self, parser, namespace, values, option_string=None):
|
||||
if len(values) < 2:
|
||||
parser.error('argument "{}" requires at least 2 values'.format(self.dest))
|
||||
else:
|
||||
filenames = []
|
||||
for value in values:
|
||||
if not os.path.isfile(value):
|
||||
parser.error('file "{}" does not exist'.format(value))
|
||||
filenames.append(value)
|
||||
setattr(namespace, self.dest, filenames)
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser(description="MISP Galaxy relationship tool.")
|
||||
parser.add_argument("files", nargs='+',
|
||||
help="The names of the clusters. (filename or cluster-name)",
|
||||
action=AtLeastTwoItemsAction)
|
||||
parser.add_argument("-ss", "--synonyms-source",
|
||||
help="Also use synonyms from the source cluster",
|
||||
action='store_true')
|
||||
parser.add_argument("-sd", "--synonyms-destination",
|
||||
help="Also use synonyms from the destination cluster from which we are looking up",
|
||||
action='store_true')
|
||||
parser.add_argument("-y", "--yes",
|
||||
help="Assume yes to all the questions, so create relationships without asking.",
|
||||
action='store_true') # FIXME develop this feature
|
||||
parser.add_argument('-v', '--verbose', action='count', default=0)
|
||||
args = parser.parse_args()
|
||||
|
||||
levels = [logging.WARNING, logging.INFO, logging.DEBUG]
|
||||
level = levels[min(args.verbose, len(levels) - 1)] # cap to last level index
|
||||
logging.basicConfig(level=level, format="%(message)s")
|
||||
|
||||
cluster_files = {}
|
||||
cluster_files_changed_tracking = {}
|
||||
# load all non-deprecated Clusters in memory
|
||||
logging.info("Reading all non-deprecated cluster files in memory.")
|
||||
for filename in args.files:
|
||||
# skip if deprecated
|
||||
galaxy_filename = os.path.join(os.path.dirname(filename), '..', 'galaxies', os.path.basename(filename))
|
||||
with open(galaxy_filename, 'r') as f:
|
||||
f_content = json.load(f)
|
||||
if f_content.get('namespace') == 'deprecated':
|
||||
logging.debug(f"Skipping file {filename} as Galaxy is deprecated.")
|
||||
continue
|
||||
# load file in memory
|
||||
logging.debug(f"Loading {filename} in memory.")
|
||||
with open(filename, 'r') as f:
|
||||
f_content = json.load(f)
|
||||
cluster_files[filename] = f_content
|
||||
cluster_files_changed_tracking[filename] = False
|
||||
|
||||
# process each cluster one by one
|
||||
logging.info("Processing clusters.")
|
||||
for cluster_filename, clusters in cluster_files.items():
|
||||
logging.debug(f"Processing cluster {cluster_filename}.")
|
||||
for cluster in clusters['values']:
|
||||
values_to_lookup = [cluster['value'].lower()]
|
||||
if args.synonyms_source:
|
||||
try:
|
||||
values_to_lookup.extend([value.lower() for value in cluster['meta']['synonyms']])
|
||||
except KeyError:
|
||||
pass
|
||||
for lookup_cluster_filename, lookup_clusters in cluster_files.items():
|
||||
if lookup_cluster_filename == cluster_filename: # skip current cluster
|
||||
continue
|
||||
for lookup_cluster in lookup_clusters['values']:
|
||||
lookup_cluster_values = [lookup_cluster['value'].lower()]
|
||||
if args.synonyms_destination:
|
||||
try:
|
||||
lookup_cluster_values.extend(
|
||||
[value.lower() for value in lookup_cluster['meta']['synonyms']])
|
||||
except KeyError:
|
||||
pass
|
||||
if any(item in values_to_lookup for item in lookup_cluster_values):
|
||||
# we have a match from any of our source strings in the lookup cluster
|
||||
if not relation_exists(cluster, lookup_cluster): # no relation yet, create it
|
||||
logging.info(f"Found non-existing match for {cluster_filename} {values_to_lookup} in {lookup_cluster_filename} {lookup_cluster_values}. Creating it")
|
||||
# FIXME ask the user if they want to create the relationship, bidirectionally, with which rel_type and tag(s)
|
||||
if args.yes:
|
||||
if args.synonyms_destination or args.synonyms_source:
|
||||
tags = ["estimative-language:likelihood-probability=\"likely\""]
|
||||
else:
|
||||
tags = ["estimative-language:likelihood-probability=\"almost-certain\""]
|
||||
if create_relation(cluster, lookup_cluster, tags=tags):
|
||||
cluster_files_changed_tracking[cluster_filename] = True
|
||||
if create_relation(lookup_cluster, cluster, tags=tags):
|
||||
cluster_files_changed_tracking[lookup_cluster_filename] = True
|
||||
|
||||
# save all to file, and increment version number if something changed
|
||||
for cluster_filename, changed in cluster_files_changed_tracking.items():
|
||||
if not changed:
|
||||
continue
|
||||
logging.debug(f"File {cluster_filename} has changed. Saving...")
|
||||
with open(cluster_filename, 'w') as f:
|
||||
cluster_files[cluster_filename]['version'] += 1
|
||||
json.dump(cluster_files[cluster_filename], f, indent=2, sort_keys=True, ensure_ascii=False)
|
||||
f.write('\n') # only needed for the beauty and to be compliant with jq_all_the_things
|
||||
|
||||
print("All done, please don't forget to ./jq_all_the_things.sh, commit, and then ./validate_all.sh.")
|
Loading…
Reference in New Issue