mirror of https://github.com/MISP/misp-galaxy
chg: [tool] gen_relationships is now interactive
parent
bea5fda2ab
commit
7d98ac013c
|
@ -22,6 +22,7 @@ import argparse
|
|||
import os
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
|
||||
|
||||
def relation_exists(cluster_a, cluster_b):
|
||||
|
@ -106,42 +107,96 @@ for filename in args.files:
|
|||
cluster_files[filename] = f_content
|
||||
cluster_files_changed_tracking[filename] = False
|
||||
|
||||
# process each cluster one by one
|
||||
logging.info("Processing clusters.")
|
||||
for cluster_filename, clusters in cluster_files.items():
|
||||
logging.debug(f"Processing cluster {cluster_filename}.")
|
||||
for cluster in clusters['values']:
|
||||
values_to_lookup = [cluster['value'].lower()]
|
||||
if args.synonyms_source:
|
||||
try:
|
||||
values_to_lookup.extend([value.lower() for value in cluster['meta']['synonyms']])
|
||||
except KeyError:
|
||||
pass
|
||||
for lookup_cluster_filename, lookup_clusters in cluster_files.items():
|
||||
if lookup_cluster_filename == cluster_filename: # skip current cluster
|
||||
continue
|
||||
for lookup_cluster in lookup_clusters['values']:
|
||||
lookup_cluster_values = [lookup_cluster['value'].lower()]
|
||||
if args.synonyms_destination:
|
||||
try:
|
||||
lookup_cluster_values.extend(
|
||||
[value.lower() for value in lookup_cluster['meta']['synonyms']])
|
||||
except KeyError:
|
||||
pass
|
||||
if any(item in values_to_lookup for item in lookup_cluster_values):
|
||||
# we have a match from any of our source strings in the lookup cluster
|
||||
if not relation_exists(cluster, lookup_cluster): # no relation yet, create it
|
||||
logging.info(f"Found non-existing match for {cluster_filename} {values_to_lookup} in {lookup_cluster_filename} {lookup_cluster_values}. Creating it")
|
||||
# FIXME ask the user if they want to create the relationship, bidirectionally, with which rel_type and tag(s)
|
||||
if args.yes:
|
||||
if args.synonyms_destination or args.synonyms_source:
|
||||
tags = ["estimative-language:likelihood-probability=\"likely\""]
|
||||
# tags by default
|
||||
if args.synonyms_destination or args.synonyms_source:
|
||||
tags = ["estimative-language:likelihood-probability=\"likely\""]
|
||||
else:
|
||||
tags = ["estimative-language:likelihood-probability=\"almost-certain\""]
|
||||
rel_type = 'similar'
|
||||
# process each cluster one by one
|
||||
try:
|
||||
for cluster_filename, clusters in cluster_files.items():
|
||||
logging.debug(f"Processing cluster {cluster_filename}.")
|
||||
for cluster in clusters['values']:
|
||||
values_to_lookup = [cluster['value'].lower()]
|
||||
if args.synonyms_source:
|
||||
try:
|
||||
values_to_lookup.extend([value.lower() for value in cluster['meta']['synonyms']])
|
||||
except KeyError:
|
||||
pass
|
||||
for lookup_cluster_filename, lookup_clusters in cluster_files.items():
|
||||
if lookup_cluster_filename == cluster_filename: # skip current cluster
|
||||
continue
|
||||
for lookup_cluster in lookup_clusters['values']:
|
||||
lookup_cluster_values = [lookup_cluster['value'].lower()]
|
||||
if args.synonyms_destination:
|
||||
try:
|
||||
lookup_cluster_values.extend(
|
||||
[value.lower() for value in lookup_cluster['meta']['synonyms']])
|
||||
except KeyError:
|
||||
pass
|
||||
if any(item in values_to_lookup for item in lookup_cluster_values):
|
||||
# we have a match from any of our source strings in the lookup cluster
|
||||
if not relation_exists(cluster, lookup_cluster): # no relation yet, create it
|
||||
if args.yes:
|
||||
logging.info(f"Found non-existing match for {cluster_filename} {values_to_lookup} in {lookup_cluster_filename} {lookup_cluster_values}. Creating it")
|
||||
create_relationship = True
|
||||
else:
|
||||
tags = ["estimative-language:likelihood-probability=\"almost-certain\""]
|
||||
if create_relation(cluster, lookup_cluster, tags=tags):
|
||||
cluster_files_changed_tracking[cluster_filename] = True
|
||||
if create_relation(lookup_cluster, cluster, tags=tags):
|
||||
cluster_files_changed_tracking[lookup_cluster_filename] = True
|
||||
# interactive prompt to ask what to do
|
||||
# TODO add question which tags and relationship type and more
|
||||
print(f"Found non-existing match for {cluster_filename} {values_to_lookup} in {lookup_cluster_filename} {lookup_cluster_values}.")
|
||||
while True:
|
||||
user_input = input(f"Create relation? [\u0332yes] / \u0332no / \u0332details / \u0332tags / \u0332relation: ").lower().strip()
|
||||
if user_input in ['yes', 'y', '']:
|
||||
create_relationship = True
|
||||
logging.info(" creating it.")
|
||||
break
|
||||
if user_input in ['no', 'n']:
|
||||
create_relationship = False
|
||||
break
|
||||
if user_input in ['tags', 'tag', 't']:
|
||||
tags = sorted(tags)
|
||||
while True:
|
||||
print(f"Current tags: ")
|
||||
[print(f" [{i}]: {t}") for i, t in enumerate(tags)]
|
||||
tag_input = input(f"Change tags? [\u0332no] / \u0332add / \u0332delete #: ").lower().strip()
|
||||
if tag_input in ['n', 'no', '']:
|
||||
break
|
||||
if tag_input.startswith('delete ') or tag_input.startswith('d '):
|
||||
try:
|
||||
tag_delete_ids = [int(n) for n in tag_input.split(' ')[1:]]
|
||||
tag_delete_ids.sort(reverse=True)
|
||||
for i in tag_delete_ids:
|
||||
del tags[i]
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
if tag_input in ['add', 'a']:
|
||||
new_tag = input("Enter tag to add: ").strip()
|
||||
if re.match(r'[^:]+:[^:]+="[^"]+"', new_tag) or re.match(r'[\w]+:[\w]+', new_tag):
|
||||
tags.append(new_tag)
|
||||
else:
|
||||
print("ERROR: Tag is not in the proper structure.")
|
||||
if user_input in ['relation', 'r']:
|
||||
new_relation = input(f"Current relation is '{rel_type}'. Enter new: ").lower().strip()
|
||||
if new_relation:
|
||||
rel_type = new_relation
|
||||
if user_input in ['details', 'd']:
|
||||
print("Is:")
|
||||
print(f" {cluster_filename} with values: {values_to_lookup}:")
|
||||
print(f" {cluster.get('description')}")
|
||||
print(f"{rel_type}:")
|
||||
print(f" {lookup_cluster_filename} with values: {lookup_cluster_values}:")
|
||||
print(f" {lookup_cluster.get('description')}")
|
||||
print("")
|
||||
print(f"Tags: {tags}")
|
||||
if create_relationship:
|
||||
if create_relation(cluster, lookup_cluster, rel_type=rel_type, tags=tags):
|
||||
cluster_files_changed_tracking[cluster_filename] = True
|
||||
if create_relation(lookup_cluster, cluster, rel_type=rel_type, tags=tags):
|
||||
cluster_files_changed_tracking[lookup_cluster_filename] = True
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
# save all to file, and increment version number if something changed
|
||||
for cluster_filename, changed in cluster_files_changed_tracking.items():
|
||||
|
|
Loading…
Reference in New Issue