Merge pull request #5 from NMD03/refactor

Refactor
pull/941/head
Niclas Dauster 2024-03-05 10:24:12 +01:00 committed by GitHub
commit 58bdd6c155
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 441 additions and 476 deletions

View File

@ -1,68 +1,35 @@
#!/usr/bin/python
from modules.universe import Universe
from modules.site import IndexSite, StatisticsSite
from utils.helper import generate_relations_table
from modules.galaxy import Galaxy
from modules.statistics import Statistics
import multiprocessing
from multiprocessing import Pool
from concurrent.futures import ThreadPoolExecutor
import json
import os
import time
import sys
sys.setrecursionlimit(10000)
FILES_TO_IGNORE = []
CLUSTER_PATH = "../../clusters"
SITE_PATH = "./site/docs"
GALAXY_PATH = "../../galaxies"
def write_relations_table(cluster):
if cluster.relationships:
print(f"Writing {cluster.uuid}.md")
with open(os.path.join(relation_path, f"{cluster.uuid}.md"), "w") as index:
index.write(generate_relations_table(cluster.relationships))
FILES_TO_IGNORE = [] # if you want to skip a specific cluster in the generation
INTRO = """
# MISP Galaxy
The MISP galaxy offers a streamlined approach for representing large entities, known as clusters, which can be linked to MISP events or attributes. Each cluster consists of one or more elements, represented as key-value pairs. MISP galaxy comes with a default knowledge base, encompassing areas like Threat Actors, Tools, Ransomware, and ATT&CK matrices. However, users have the flexibility to modify, update, replace, or share these elements according to their needs.
Clusters and vocabularies within MISP galaxy can be utilized in their original form or as a foundational knowledge base. The distribution settings for each cluster can be adjusted, allowing for either restricted or wide dissemination.
Additionally, MISP galaxies enable the representation of existing standards like the MITRE ATT&CK framework, as well as custom matrices.
The aim is to provide a core set of clusters for organizations embarking on analysis, which can be further tailored to include localized, private information or additional, shareable data.
Clusters serve as an open and freely accessible knowledge base, which can be utilized and expanded within [MISP](https://www.misp-project.org/) or other threat intelligence platforms.
![Overview of the integration of MISP galaxy in the MISP Threat Intelligence Sharing Platform](https://raw.githubusercontent.com/MISP/misp-galaxy/aa41337fd78946a60aef3783f58f337d2342430a/doc/images/galaxy.png)
## Publicly available clusters
"""
STATISTICS = """
## Statistics
You can find some statistics about MISP galaxies [here](./statistics.md).
"""
CONTRIBUTING = """
# Contributing
In the dynamic realm of threat intelligence, a variety of models and approaches exist to systematically organize, categorize, and delineate threat actors, hazards, or activity groups. We embrace innovative methodologies for articulating threat intelligence. The galaxy model is particularly versatile, enabling you to leverage and integrate methodologies that you trust and are already utilizing within your organization or community.
We encourage collaboration and contributions to the [MISP Galaxy JSON files](https://github.com/MISP/misp-galaxy/). Feel free to fork the project, enhance existing elements or clusters, or introduce new ones. Your insights are valuable - share them with us through a pull-request.
"""
def write_galaxy_entry(galaxy, site_path, cluster_dict):
galaxy.write_entry(site_path, cluster_dict)
return f"Finished writing entry for {galaxy.name}"
def create_index(galaxies):
index_output = INTRO
for galaxy in galaxies:
index_output += f"- [{galaxy.name}](./{galaxy.json_file_name}/index.md)\n"
index_output += STATISTICS
index_output += CONTRIBUTING
return index_output
def get_cluster_relationships(cluster_data):
galaxy, cluster = cluster_data
relationships = universe.get_relationships_with_levels(universe.galaxies[galaxy].clusters[cluster])
print(f"Processed {galaxy}, {cluster}")
return cluster, galaxy, relationships
def get_deprecated_galaxy_files():
deprecated_galaxy_files = []
@ -74,9 +41,9 @@ def get_deprecated_galaxy_files():
return deprecated_galaxy_files
def main():
if __name__ == "__main__":
start_time = time.time()
universe = Universe()
FILES_TO_IGNORE.extend(get_deprecated_galaxy_files())
galaxies_fnames = []
@ -85,45 +52,77 @@ def main():
galaxies_fnames.append(f)
galaxies_fnames.sort()
galaxies = []
# Create the universe of clusters and galaxies
for galaxy in galaxies_fnames:
with open(os.path.join(CLUSTER_PATH, galaxy)) as fr:
galaxy_json = json.load(fr)
galaxies.append(
Galaxy(
cluster_list=galaxy_json["values"],
authors=galaxy_json["authors"],
description=galaxy_json["description"],
name=galaxy_json["name"],
json_file_name=galaxy.split(".")[0],
)
universe.add_galaxy(galaxy_name=galaxy_json["name"], json_file_name=galaxy, authors=galaxy_json["authors"], description=galaxy_json["description"])
for cluster in galaxy_json["values"]:
universe.add_cluster(
galaxy_name=galaxy_json.get("name", None),
uuid=cluster.get("uuid", None),
description=cluster.get("description", None),
value=cluster.get("value", None),
meta=cluster.get("meta", None)
)
cluster_dict = {}
for galaxy in galaxies:
for cluster in galaxy.clusters:
cluster_dict[cluster.uuid] = cluster
statistics = Statistics(cluster_dict=cluster_dict)
for galaxy in galaxies:
for cluster in galaxy.clusters:
statistics.add_cluster(cluster)
# Define the relationships between clusters
for galaxy in galaxies_fnames:
with open(os.path.join(CLUSTER_PATH, galaxy)) as fr:
galaxy_json = json.load(fr)
for cluster in galaxy_json["values"]:
if "related" in cluster:
for related in cluster["related"]:
universe.define_relationship(cluster["uuid"], related["dest-uuid"])
# Write files
tasks = []
for galaxy_name, galaxy in universe.galaxies.items():
for cluster_name, cluster in galaxy.clusters.items():
tasks.append((galaxy_name, cluster_name))
with Pool(processes=multiprocessing.cpu_count()) as pool:
result = pool.map(get_cluster_relationships, tasks)
for cluster, galaxy, relationships in result:
universe.galaxies[galaxy].clusters[cluster].relationships = relationships
print("All clusters processed.")
print(f"Finished relations in {time.time() - start_time} seconds")
# Write output
if not os.path.exists(SITE_PATH):
os.mkdir(SITE_PATH)
index = IndexSite(SITE_PATH)
index.add_content("# MISP Galaxy\n\nThe MISP galaxy offers a streamlined approach for representing large entities, known as clusters, which can be linked to MISP events or attributes. Each cluster consists of one or more elements, represented as key-value pairs. MISP galaxy comes with a default knowledge base, encompassing areas like Threat Actors, Tools, Ransomware, and ATT&CK matrices. However, users have the flexibility to modify, update, replace, or share these elements according to their needs.\n\nClusters and vocabularies within MISP galaxy can be utilized in their original form or as a foundational knowledge base. The distribution settings for each cluster can be adjusted, allowing for either restricted or wide dissemination.\n\nAdditionally, MISP galaxies enable the representation of existing standards like the MITRE ATT&CK™ framework, as well as custom matrices.\n\nThe aim is to provide a core set of clusters for organizations embarking on analysis, which can be further tailored to include localized, private information or additional, shareable data.\n\nClusters serve as an open and freely accessible knowledge base, which can be utilized and expanded within [MISP](https://www.misp-project.org/) or other threat intelligence platforms.\n\n![Overview of the integration of MISP galaxy in the MISP Threat Intelligence Sharing Platform](https://raw.githubusercontent.com/MISP/misp-galaxy/aa41337fd78946a60aef3783f58f337d2342430a/doc/images/galaxy.png)\n\n## Publicly available clusters\n")
index.add_toc(universe.galaxies.values())
index.add_content("## Statistics\n\nYou can find some statistics about MISP galaxies [here](./statistics.md).\n\n")
index.add_content("# Contributing\n\nIn the dynamic realm of threat intelligence, a variety of models and approaches exist to systematically organize, categorize, and delineate threat actors, hazards, or activity groups. We embrace innovative methodologies for articulating threat intelligence. The galaxy model is particularly versatile, enabling you to leverage and integrate methodologies that you trust and are already utilizing within your organization or community.\n\nWe encourage collaboration and contributions to the [MISP Galaxy JSON files](https://github.com/MISP/misp-galaxy/). Feel free to fork the project, enhance existing elements or clusters, or introduce new ones. Your insights are valuable - share them with us through a pull-request.\n")
index.write_entry()
for galaxy in galaxies:
galaxy.write_entry(SITE_PATH, cluster_dict)
statistics = StatisticsSite(SITE_PATH)
statistics.add_cluster_statistics(len([cluster for galaxy in universe.galaxies.values() for cluster in galaxy.clusters.values()]), len(universe.private_clusters))
statistics.add_galaxy_statistics(universe.galaxies.values())
statistics.add_relation_statistics([cluster for galaxy in universe.galaxies.values() for cluster in galaxy.clusters.values()])
statistics.add_synonym_statistics([cluster for galaxy in universe.galaxies.values() for cluster in galaxy.clusters.values()])
statistics.write_entry()
index_output = create_index(galaxies)
for galaxy in universe.galaxies.values():
galaxy.write_entry(SITE_PATH)
statistics.write_entry(SITE_PATH)
for galaxy in universe.galaxies.values():
galaxy_path = os.path.join(SITE_PATH, f"{galaxy.json_file_name}".replace(".json", ""))
if not os.path.exists(galaxy_path):
os.mkdir(galaxy_path)
relation_path = os.path.join(galaxy_path, "relations")
if not os.path.exists(relation_path):
os.mkdir(relation_path)
with open(os.path.join(relation_path, ".pages"), "w") as index:
index.write(f"hide: true\n")
with open(os.path.join(SITE_PATH, "index.md"), "w") as index:
index.write(index_output)
print(f"Finished file creation in {time.time() - start_time} seconds")
with ThreadPoolExecutor(max_workers=(multiprocessing.cpu_count() * 4)) as executor:
executor.map(write_relations_table, galaxy.clusters.values())
if __name__ == "__main__":
main()
print(f"Finished in {time.time() - start_time} seconds")

View File

@ -1,246 +1,109 @@
import os
import validators
class Cluster:
def __init__(
self, description, uuid, date, value, related_list, meta, galaxy
):
self.description = description
def __init__(self, uuid, galaxy, description=None, value=None, meta=None):
self.uuid = uuid
self.date = date
self.description = description
self.value = value
self.related_list = related_list
self.meta = meta
self.galaxy = galaxy
self.entry = ""
self.statistics = None
self.galaxy = galaxy # Reference to the Galaxy object this cluster belongs to
self.outbound_relationships = set()
self.inbound_relationships = set()
self.relationships = set()
def __lt__(self, other):
return self.uuid < other.uuid
def add_outbound_relationship(self, cluster):
self.outbound_relationships.add(cluster)
def set_statistics(self, statistics):
self.statistics = statistics
def add_inbound_relationship(self, cluster):
self.inbound_relationships.add(cluster)
def save_relationships(self, relationships):
self.relationships = relationships
def generate_entry(self):
entry = ""
entry += self._create_title_entry()
entry += self._create_description_entry()
entry += self._create_synonyms_entry()
entry += self._create_uuid_entry()
entry += self._create_refs_entry()
entry += self._create_associated_metadata_entry()
if self.relationships:
entry += self._create_related_entry()
return entry
def _create_title_entry(self):
self.entry += f"## {self.value}\n"
self.entry += f"\n"
entry = ""
entry += f"## {self.value}\n"
entry += f"\n"
return entry
def _create_description_entry(self):
entry = ""
if self.description:
self.entry += f"{self.description}\n"
entry += f"{self.description}\n"
return entry
def _create_synonyms_entry(self):
entry = ""
if isinstance(self.meta, dict) and self.meta.get("synonyms"):
self.entry += f"\n"
self.entry += f'??? info "Synonyms"\n'
self.entry += f"\n"
self.entry += f' "synonyms" in the meta part typically refer to alternate names or labels that are associated with a particular {self.value}.\n\n'
self.entry += f" | Known Synonyms |\n"
self.entry += f" |---------------------|\n"
entry += f"\n"
entry += f'??? info "Synonyms"\n'
entry += f"\n"
entry += f' "synonyms" in the meta part typically refer to alternate names or labels that are associated with a particular {self.value}.\n\n'
entry += f" | Known Synonyms |\n"
entry += f" |---------------------|\n"
synonyms_count = 0
for synonym in sorted(self.meta["synonyms"]):
synonyms_count += 1
self.entry += f" | `{synonym}` |\n"
self.statistics.synonyms_count_dict[self.uuid] = synonyms_count
entry += f" | `{synonym}` |\n"
return entry
def _create_uuid_entry(self):
entry = ""
if self.uuid:
self.entry += f"\n"
self.entry += f'??? tip "Internal MISP references"\n'
self.entry += f"\n"
self.entry += f" UUID `{self.uuid}` which can be used as unique global reference for `{self.value}` in MISP communities and other software using the MISP galaxy\n"
self.entry += f"\n"
entry += f"\n"
entry += f'??? tip "Internal MISP references"\n'
entry += f"\n"
entry += f" UUID `{self.uuid}` which can be used as unique global reference for `{self.value}` in MISP communities and other software using the MISP galaxy\n"
entry += f"\n"
return entry
def _create_refs_entry(self):
entry = ""
if isinstance(self.meta, dict) and self.meta.get("refs"):
self.entry += f"\n"
self.entry += f'??? info "External references"\n'
self.entry += f"\n"
entry += f"\n"
entry += f'??? info "External references"\n'
entry += f"\n"
for ref in self.meta["refs"]:
if validators.url(ref):
self.entry += f" - [{ref}]({ref}) - :material-archive: :material-arrow-right: [webarchive](https://web.archive.org/web/*/{ref})\n"
entry += f" - [{ref}]({ref}) - :material-archive: :material-arrow-right: [webarchive](https://web.archive.org/web/*/{ref})\n"
else:
self.entry += f" - {ref}\n"
entry += f" - {ref}\n"
self.entry += f"\n"
entry += f"\n"
return entry
def _create_associated_metadata_entry(self):
entry = ""
if isinstance(self.meta, dict):
excluded_meta = ["synonyms", "refs"]
self.entry += f"\n"
self.entry += f'??? info "Associated metadata"\n'
self.entry += f"\n"
self.entry += f" |Metadata key {{ .no-filter }} |Value|\n"
self.entry += f" |-----------------------------------|-----|\n"
entry += f"\n"
entry += f'??? info "Associated metadata"\n'
entry += f"\n"
entry += f" |Metadata key {{ .no-filter }} |Value|\n"
entry += f" |-----------------------------------|-----|\n"
for meta in sorted(self.meta.keys()):
if meta not in excluded_meta:
self.entry += f" | {meta} | {self.meta[meta]} |\n"
def get_related_clusters(
self, cluster_dict, depth=-1, visited=None, level=1, related_private_clusters={}
):
empty_uuids = 0
if visited is None:
visited = {}
related_clusters = []
if depth == 0 or not self.related_list:
return related_clusters
if self.uuid in visited and visited[self.uuid] <= level:
return related_clusters
else:
visited[self.uuid] = level
for cluster in self.related_list:
dest_uuid = cluster["dest-uuid"]
# Cluster is private
if dest_uuid not in cluster_dict:
# Check if UUID is empty
if not dest_uuid:
empty_uuids += 1
continue
self.statistics.private_relations_count += 1
if dest_uuid not in self.statistics.private_clusters:
self.statistics.private_clusters.append(dest_uuid)
if dest_uuid in related_private_clusters:
related_clusters.append(
(
self,
related_private_clusters[dest_uuid],
level,
)
)
else:
related_clusters.append(
(
self,
Cluster(
value="Private Cluster",
uuid=dest_uuid,
date=None,
description=None,
related_list=None,
meta=None,
galaxy=None,
),
level,
)
)
related_private_clusters[dest_uuid] = related_clusters[-1][1]
continue
related_cluster = cluster_dict[dest_uuid]
self.statistics.public_relations_count += 1
related_clusters.append((self, related_cluster, level))
if (depth > 1 or depth == -1) and (
cluster["dest-uuid"] not in visited
or visited[cluster["dest-uuid"]] > level + 1
):
new_depth = depth - 1 if depth > 1 else -1
if cluster["dest-uuid"] in cluster_dict:
related_clusters += cluster_dict[
cluster["dest-uuid"]
].get_related_clusters(
cluster_dict,
new_depth,
visited,
level + 1,
related_private_clusters,
)
if empty_uuids > 0:
self.statistics.empty_uuids_dict[self.value] = empty_uuids
return self._remove_duplicates(related_clusters)
def _remove_duplicates(self, related_clusters):
cluster_dict = {}
for cluster in related_clusters:
key = tuple(sorted((cluster[0], cluster[1])))
if key in cluster_dict:
if cluster_dict[key][2] > cluster[2]:
cluster_dict[key] = cluster
else:
cluster_dict[key] = cluster
related_clusters = list(cluster_dict.values())
return related_clusters
entry += f" | {meta} | {self.meta[meta]} |\n"
return entry
def _create_related_entry(self):
self.entry += f"\n"
self.entry += f'??? info "Related clusters"\n'
self.entry += f"\n"
self.entry += f" To see the related clusters, click [here](./relations/{self.uuid}.md).\n"
def _get_related_entry(self, relations):
output = ""
output += f"## Related clusters for {self.value}\n"
output += f"\n"
output += f"| Cluster A | Galaxy A | Cluster B | Galaxy B | Level {{ .graph }} |\n"
output += f"|-----------|----------|-----------|----------|-------------------|\n"
for relation in relations:
placeholder = "__TMP__"
cluster_a_section = (
relation[0]
.value.lower()
.replace(" - ", placeholder) # Replace " - " first
.replace(" ", "-")
.replace("/", "")
.replace(":", "")
.replace(placeholder, "-")
) # Replace the placeholder with "-"
cluster_b_section = (
relation[1]
.value.lower()
.replace(" - ", placeholder) # Replace " - " first
.replace(" ", "-")
.replace("/", "")
.replace(":", "")
.replace(placeholder, "-")
) # Replace the placeholder with "-"
if cluster_b_section != "private-cluster":
output += f"| [{relation[0].value} ({relation[0].uuid})](../../{relation[0].galaxy.json_file_name}/index.md#{cluster_a_section}) | [{relation[0].galaxy.name}](../../{relation[0].galaxy.json_file_name}/index.md) | [{relation[1].value} ({relation[1].uuid})](../../{relation[1].galaxy.json_file_name}/index.md#{cluster_b_section}) | [{relation[1].galaxy.name}](../../{relation[1].galaxy.json_file_name}/index.md) | {relation[2]} |\n"
else:
output += f"| [{relation[0].value} ({relation[0].uuid})](../../{relation[0].galaxy.json_file_name}/index.md#{cluster_a_section}) | [{relation[0].galaxy.name}](../../{relation[0].galaxy.json_file_name}/index.md) |{relation[1].value} ({relation[1].uuid}) | unknown | {relation[2]} |\n"
return output
def create_entry(self, cluster_dict, path):
if not self.statistics:
raise ValueError("Statistics not set")
self._create_title_entry()
self._create_description_entry()
self._create_synonyms_entry()
self._create_uuid_entry()
self._create_refs_entry()
self._create_associated_metadata_entry()
if self.related_list:
self._create_related_entry()
self._write_relations(cluster_dict, path)
return self.entry
def _write_relations(self, cluster_dict, path):
related_clusters = self.get_related_clusters(cluster_dict)
self.statistics.relation_count_dict[self.uuid] = len(related_clusters)
galaxy_path = os.path.join(path, self.galaxy.json_file_name)
if not os.path.exists(galaxy_path):
os.mkdir(galaxy_path)
relation_path = os.path.join(galaxy_path, "relations")
if not os.path.exists(relation_path):
os.mkdir(relation_path)
with open(os.path.join(relation_path, ".pages"), "w") as index:
index.write(f"hide: true\n")
with open(os.path.join(relation_path, f"{self.uuid}.md"), "w") as index:
index.write(self._get_related_entry(related_clusters))
entry = ""
entry += f"\n"
entry += f'??? info "Related clusters"\n'
entry += f"\n"
entry += f" To see the related clusters, click [here](./relations/{self.uuid}.md).\n"
return entry

View File

@ -3,72 +3,67 @@ from typing import List
import os
class Galaxy:
def __init__(
self, cluster_list: List[dict], authors, description, name, json_file_name
):
self.cluster_list = cluster_list
def __init__(self, galaxy_name: str, json_file_name: str, authors: List[str], description: str):
self.galaxy_name = galaxy_name
self.json_file_name = json_file_name
self.authors = authors
self.description = description
self.name = name
self.json_file_name = json_file_name
self.clusters = self._create_clusters()
self.entry = ""
def _create_metadata_entry(self):
self.entry += "---\n"
self.entry += f"title: {self.name}\n"
meta_description = self.description.replace('"', "-")
self.entry += f"description: {meta_description}\n"
self.entry += "---\n"
self.clusters = {} # Maps uuid to Cluster objects
def _create_title_entry(self):
self.entry += f"# {self.name}\n"
def add_cluster(self, uuid, description, value, meta):
if uuid not in self.clusters:
self.clusters[uuid] = Cluster(uuid=uuid, galaxy=self, description=description, value=value, meta=meta)
def _create_description_entry(self):
self.entry += f"{self.description}\n"
def _create_authors_entry(self):
if self.authors:
self.entry += f"\n"
self.entry += f'??? info "Authors"\n'
self.entry += f"\n"
self.entry += f" | Authors and/or Contributors|\n"
self.entry += f" |----------------------------|\n"
for author in self.authors:
self.entry += f" |{author}|\n"
def _create_clusters(self):
clusters = []
for cluster in self.cluster_list:
clusters.append(
Cluster(
value=cluster.get("value", None),
description=cluster.get("description", None),
uuid=cluster.get("uuid", None),
date=cluster.get("date", None),
related_list=cluster.get("related", None),
meta=cluster.get("meta", None),
galaxy=self,
)
)
return clusters
def _create_clusters_entry(self, cluster_dict, path):
for cluster in self.clusters:
self.entry += cluster.create_entry(cluster_dict, path)
def create_entry(self, cluster_dict, path):
self._create_metadata_entry()
self._create_title_entry()
self._create_description_entry()
self._create_authors_entry()
self._create_clusters_entry(cluster_dict, path)
return self.entry
def write_entry(self, path, cluster_dict):
self.create_entry(cluster_dict, path)
galaxy_path = os.path.join(path, self.json_file_name)
def write_entry(self, path):
galaxy_path = os.path.join(path, f"{self.json_file_name}".replace(".json", ""))
if not os.path.exists(galaxy_path):
os.mkdir(galaxy_path)
with open(os.path.join(galaxy_path, "index.md"), "w") as index:
index.write(self.entry)
index.write(self.generate_entry())
def generate_entry(self):
entry = ""
entry += self._create_metadata_entry()
entry += self._create_title_entry()
entry += self._create_description_entry()
entry += self._create_authors_entry()
entry += self._create_clusters_entry()
return entry
def _create_metadata_entry(self):
entry = ""
entry += "---\n"
entry += f"title: {self.galaxy_name}\n"
meta_description = self.description.replace('"', "-")
entry += f"description: {meta_description}\n"
entry += "---\n"
return entry
def _create_title_entry(self):
entry = ""
entry += f"# {self.galaxy_name}\n"
return entry
def _create_description_entry(self):
entry = ""
entry += f"{self.description}\n"
return entry
def _create_authors_entry(self):
entry = ""
if self.authors:
entry += f"\n"
entry += f'??? info "Authors"\n'
entry += f"\n"
entry += f" | Authors and/or Contributors|\n"
entry += f" |----------------------------|\n"
for author in self.authors:
entry += f" |{author}|\n"
return entry
def _create_clusters_entry(self):
entry = ""
for cluster in self.clusters.values():
entry += cluster.generate_entry()
return entry

View File

@ -0,0 +1,82 @@
import os
from utils.helper import create_bar_chart, get_top_x, create_pie_chart
class Site:
def __init__(self, path, name) -> None:
self.path = path
self.name = name
self.content = ""
def add_content(self, content):
self.content += content
def write_entry(self):
if not os.path.exists(self.path):
os.makedirs(self.path)
with open(os.path.join(self.path, self.name), "w") as index:
index.write(self.content)
class IndexSite(Site):
def __init__(self, path) -> None:
super().__init__(path=path, name="index.md")
def add_toc(self, galaxies):
for galaxy in galaxies:
galaxy_folder = galaxy.json_file_name.replace(".json", "")
self.add_content(f"- [{galaxy.galaxy_name}](./{galaxy_folder}/index.md)\n")
self.add_content("\n")
class StatisticsSite(Site):
def __init__(self, path) -> None:
super().__init__(path=path, name="statistics.md")
def add_galaxy_statistics(self, galaxies):
galaxy_cluster_count = {galaxy: len(galaxy.clusters) for galaxy in galaxies}
top_20 = get_top_x(galaxy_cluster_count, 20)
flop_20 = get_top_x(galaxy_cluster_count, 20, False)
self.add_content(f"# Galaxy statistics\n")
self.add_content(f"## Galaxies with the most clusters\n\n")
self.add_content(create_bar_chart(x_axis="Galaxy", y_axis="Count", values=top_20, galaxy=True))
self.add_content(f"## Galaxies with the least clusters\n\n")
self.add_content(create_bar_chart(x_axis="Galaxy", y_axis="Count", values=flop_20, galaxy=True))
def add_cluster_statistics(self, public_clusters, private_clusters):
values = {"Public clusters": public_clusters, "Private clusters": private_clusters}
self.add_content(f"# Cluster statistics\n")
self.add_content(f"## Number of clusters\n")
self.add_content(f"Here you can find the total number of clusters including public and private clusters.The number of public clusters has been calculated based on the number of unique Clusters in the MISP galaxy JSON files. The number of private clusters could only be approximated based on the number of relations to non-existing clusters. Therefore the number of private clusters is not accurate and only an approximation.\n\n")
self.add_content(create_pie_chart(sector="Type", unit="Count", values=values))
def add_relation_statistics(self, clusters):
cluster_relations = {}
private_relations = 0
public_relations = 0
for cluster in clusters:
cluster_relations[cluster] = len(cluster.relationships)
for relation in cluster.relationships:
if relation[1].value == "Private Cluster":
private_relations += 1
else:
public_relations += 1
top_20 = get_top_x(cluster_relations, 20)
flop_20 = get_top_x(cluster_relations, 20, False)
self.add_content(f"# Relation statistics\n")
self.add_content(f"Here you can find the total number of relations including public and private relations. The number includes relations between public clusters and relations between public and private clusters. Therefore relatons between private clusters are not included in the statistics.\n\n")
self.add_content(f"## Number of relations\n\n")
self.add_content(create_pie_chart(sector="Type", unit="Count", values={"Public relations": public_relations, "Private relations": private_relations}))
self.add_content(f"**Average number of relations per cluster**: {int(sum(cluster_relations.values()) / len(cluster_relations))}\n")
self.add_content(f"## Cluster with the most relations\n\n")
self.add_content(create_bar_chart(x_axis="Cluster", y_axis="Count", values=top_20))
self.add_content(f"## Cluster with the least relations\n\n")
self.add_content(create_bar_chart(x_axis="Cluster", y_axis="Count", values=flop_20))
def add_synonym_statistics(self, clusters):
synonyms = {}
for cluster in clusters:
if cluster.meta and cluster.meta.get("synonyms"):
synonyms[cluster] = len(cluster.meta["synonyms"])
top_20 = get_top_x(synonyms, 20)
self.add_content(f"# Synonym statistics\n")
self.add_content(f"## Cluster with the most synonyms\n\n")
self.add_content(create_bar_chart(x_axis="Cluster", y_axis="Count", values=top_20))

View File

@ -1,118 +0,0 @@
from utils.helper import get_top_x, name_to_section
import os
class Statistics:
def __init__(self, cluster_dict):
self.public_relations_count = 0
self.private_relations_count = 0
self.private_clusters = []
self.public_clusters_dict = {}
self.relation_count_dict = {}
self.synonyms_count_dict = {}
self.empty_uuids_dict = {}
self.cluster_dict = cluster_dict
self.entry = ""
def create_entry(self):
self.entry += f"# MISP Galaxy statistics\n"
self.entry += "The MISP galaxy statistics are automatically generated based on the MISP galaxy JSON files. Therefore the statistics only include detailed infomration about public clusters and relations. Some statistics about private clusters and relations is included but only as an approximation based on the information gathered from the public clusters.\n"
self.entry += "\n"
self._create_cluster_statistics()
self._create_galaxy_statistics()
self._create_relation_statistics()
self._create_synonym_statistics()
def _create_galaxy_statistics(self):
self.entry += f"# Galaxy statistics\n"
self.entry += f"## Galaxies with the most clusters\n"
galaxy_counts = {}
for galaxy in self.public_clusters_dict.values():
galaxy_counts[galaxy] = galaxy_counts.get(galaxy, 0) + 1
top_galaxies, top_galaxies_values = get_top_x(galaxy_counts, 20)
self.entry += f" | No. | Galaxy | Count {{ .log-bar-chart }}|\n"
self.entry += f" |----|--------|-------|\n"
for i, galaxy in enumerate(top_galaxies, 1):
galaxy_section = name_to_section(galaxy.json_file_name)
self.entry += f" | {i} | [{galaxy.name}](../{galaxy_section}) | {top_galaxies_values[i-1]} |\n"
self.entry += f"\n"
self.entry += f"## Galaxies with the least clusters\n"
flop_galaxies, flop_galaxies_values = get_top_x(galaxy_counts, 20, False)
self.entry += f" | No. | Galaxy | Count {{ .bar-chart }}|\n"
self.entry += f" |----|--------|-------|\n"
for i, galaxy in enumerate(flop_galaxies, 1):
galaxy_section = name_to_section(galaxy.json_file_name)
self.entry += f" | {i} | [{galaxy.name}](../{galaxy_section}) | {flop_galaxies_values[i-1]} |\n"
self.entry += f"\n"
def _create_cluster_statistics(self):
self.entry += f"# Cluster statistics\n"
self.entry += f"## Number of clusters\n"
self.entry += f"Here you can find the total number of clusters including public and private clusters. The number of public clusters has been calculated based on the number of unique Clusters in the MISP galaxy JSON files. The number of private clusters could only be approximated based on the number of relations to non-existing clusters. Therefore the number of private clusters is not accurate and only an approximation.\n"
self.entry += f"\n"
self.entry += f"| No. | Type | Count {{ .pie-chart }}|\n"
self.entry += f"|-----|------|-----------------------|\n"
self.entry += f"| 1 | Public clusters | {len(self.public_clusters_dict)} |\n"
self.entry += f"| 2 | Private clusters | {len(self.private_clusters)} |\n"
self.entry += f"\n"
def _create_relation_statistics(self):
self.entry += f"# Relation statistics\n"
self.entry += f"Here you can find the total number of relations including public and private relations. The number includes relations between public clusters and relations between public and private clusters. Therefore relatons between private clusters are not included in the statistics.\n"
self.entry += f"\n"
self.entry += f"## Number of relations\n"
self.entry += f"| No. | Type | Count {{ .pie-chart }}|\n"
self.entry += f"|----|------|-------|\n"
self.entry += f"| 1 | Public relations | {self.public_relations_count} |\n"
self.entry += f"| 2 | Private relations | {self.private_relations_count} |\n"
self.entry += f"\n"
self.entry += f"**Average number of relations per cluster**: {int(sum(self.relation_count_dict.values()) / len(self.relation_count_dict))}\n"
self.entry += f"## Cluster with the most relations\n"
relation_count_dict_names = {
self.cluster_dict[uuid].value: count
for uuid, count in self.relation_count_dict.items()
}
top_25_relation, top_25_relation_values = get_top_x(
relation_count_dict_names, 20
)
self.entry += f" | No. | Cluster | Count {{ .bar-chart }}|\n"
self.entry += f" |----|--------|-------|\n"
relation_count_dict_galaxies = {
self.cluster_dict[uuid].value: self.cluster_dict[uuid].galaxy.json_file_name
for uuid in self.relation_count_dict.keys()
}
for i, cluster in enumerate(top_25_relation, 1):
cluster_section = name_to_section(cluster)
self.entry += f" | {i} | [{cluster}](../{relation_count_dict_galaxies[cluster]}/#{cluster_section}) | {top_25_relation_values[i-1]} |\n"
self.entry += f"\n"
def _create_synonym_statistics(self):
self.entry += f"# Synonym statistics\n"
self.entry += f"## Cluster with the most synonyms\n"
synonyms_count_dict_names = {
self.cluster_dict[uuid].value: count
for uuid, count in self.synonyms_count_dict.items()
}
top_synonyms, top_synonyms_values = get_top_x(synonyms_count_dict_names, 20)
self.entry += f" | No. | Cluster | Count {{ .bar-chart }}|\n"
self.entry += f" |----|--------|-------|\n"
synonyms_count_dict_galaxies = {
self.cluster_dict[uuid].value: self.cluster_dict[uuid].galaxy.json_file_name
for uuid in self.synonyms_count_dict.keys()
}
for i, cluster in enumerate(top_synonyms, 1):
cluster_section = name_to_section(cluster)
self.entry += f" | {i} | [{cluster}](../{synonyms_count_dict_galaxies[cluster]}/#{cluster_section}) | {top_synonyms_values[i-1]} |\n"
self.entry += f"\n"
def write_entry(self, path):
self.create_entry()
with open(os.path.join(path, "statistics.md"), "w") as index:
index.write(self.entry)
def add_cluster(self, cluster):
self.public_clusters_dict[cluster.uuid] = cluster.galaxy
cluster.statistics = self

View File

@ -0,0 +1,89 @@
from modules.galaxy import Galaxy
from modules.cluster import Cluster
from collections import defaultdict, deque
class Universe:
def __init__(self, add_inbound_relationship=False):
self.galaxies = {} # Maps galaxy_name to Galaxy objects
self.add_inbound_relationship = add_inbound_relationship
self.private_clusters = {}
def add_galaxy(self, galaxy_name, json_file_name, authors, description):
if galaxy_name not in self.galaxies:
self.galaxies[galaxy_name] = Galaxy(galaxy_name=galaxy_name, json_file_name=json_file_name, authors=authors, description=description)
def add_cluster(self, galaxy_name, uuid, description, value, meta):
if galaxy_name in self.galaxies:
self.galaxies[galaxy_name].add_cluster(uuid=uuid, description=description, value=value, meta=meta)
def define_relationship(self, cluster_a_id, cluster_b_id):
cluster_a = None
cluster_b = None
if cluster_a_id == cluster_b_id:
return
# Search for Cluster A and Cluster B in all galaxies
for galaxy in self.galaxies.values():
if cluster_a_id in galaxy.clusters:
cluster_a = galaxy.clusters[cluster_a_id]
if cluster_b_id in galaxy.clusters:
cluster_b = galaxy.clusters[cluster_b_id]
if cluster_a and cluster_b:
break
# If both clusters are found, define the relationship
if cluster_a and cluster_b:
cluster_a.add_outbound_relationship(cluster_b)
cluster_b.add_inbound_relationship(cluster_a)
else:
if cluster_a:
# private_cluster = self.add_cluster(uuid=cluster_b_id, galaxy_name="Unknown", description=None, value="Private Cluster", meta=None)
private_cluster = Cluster(uuid=cluster_b_id, galaxy=None, description=None, value="Private Cluster", meta=None)
self.private_clusters[cluster_b_id] = private_cluster
cluster_a.add_outbound_relationship(private_cluster)
else:
raise ValueError(f"Cluster {cluster_a} not found in any galaxy")
def get_relationships_with_levels(self, start_cluster):
def bfs_with_undirected_relationships(start_cluster):
visited = set() # Tracks whether a cluster has been visited
relationships = defaultdict(lambda: float('inf')) # Tracks the lowest level for each cluster pair
queue = deque([(start_cluster, 0)]) # Queue of (cluster, level)
while queue:
current_cluster, level = queue.popleft()
if current_cluster not in visited:
visited.add(current_cluster)
# Process all relationships regardless of direction
if self.add_inbound_relationship:
neighbors = current_cluster.outbound_relationships.union(current_cluster.inbound_relationships)
else:
neighbors = current_cluster.outbound_relationships
for neighbor in neighbors:
link = frozenset([current_cluster, neighbor])
if level + 1 < relationships[link]:
relationships[link] = level + 1
if neighbor not in visited and neighbor.value != "Private Cluster":
queue.append((neighbor, level + 1))
# Convert the defaultdict to a list of tuples, ignoring direction
processed_relationships = []
for link, lvl in relationships.items():
# Extract clusters from the frozenset; direction is irrelevant
clusters = list(link)
# Arbitrarily choose the first cluster as 'source' for consistency
if clusters[0].value == "Private Cluster":
processed_relationships.append((clusters[1], clusters[0], lvl))
else:
processed_relationships.append((clusters[0], clusters[1], lvl))
return processed_relationships
return bfs_with_undirected_relationships(start_cluster)

View File

@ -82,7 +82,9 @@ document$.subscribe(function () {
path: nodePaths[id]
}));
const Parent_Node = nodes[0];
let header = document.querySelector('h1').textContent;
const parentUUID = header.replace(/\s+/g, '-').charAt(0).toLowerCase() + header.replace(/\s+/g, '-').slice(1);
const Parent_Node = nodes.find(node => node.id.includes(parentUUID));
var links = data.map(d => ({ source: d.source, target: d.target }));

View File

@ -4,10 +4,8 @@ def get_top_x(dict, x, big_to_small=True):
sorted_dict = sorted(
dict.items(), key=operator.itemgetter(1), reverse=big_to_small
)[:x]
top_x = [key for key, value in sorted_dict]
top_x_values = sorted(dict.values(), reverse=big_to_small)[:x]
return top_x, top_x_values
top_x = {key: value for key, value in sorted_dict}
return top_x
def name_to_section(name):
placeholder = "__TMP__"
@ -18,4 +16,59 @@ def name_to_section(name):
.replace("/", "")
.replace(":", "")
.replace(placeholder, "-")
) # Replace the placeholder with "-"
) # Replace the placeholder with "-"
def create_bar_chart(x_axis, y_axis, values, log=False, galaxy=False):
if not log:
chart = f"| No. | {x_axis} | {y_axis} {{ .bar-chart }}|\n"
else:
chart = f"| No. | {x_axis} | {y_axis} {{ .log-bar-chart }}|\n"
chart += f"|----|--------|-------|\n"
for i, (x, y) in enumerate(values.items()):
if galaxy:
chart += f"| {i+1} | {galaxy_transform_to_link(x)} | {y} |\n"
else:
chart += f"| {i+1} | {cluster_transform_to_link(x)} | {y} |\n"
chart += "\n"
return chart
def create_pie_chart(sector, unit, values):
chart = f"| No. | {sector} | {unit} {{ .pie-chart }}|\n"
chart += f"|----|--------|-------|\n"
for i, (x, y) in enumerate(values.items()):
chart += f"| {i+1} | {x} | {y} |\n"
chart += "\n"
return chart
def cluster_transform_to_link(cluster, uuid=False):
placeholder = "__TMP__"
section = (
cluster
.value.lower()
.replace(" - ", placeholder) # Replace " - " first
.replace(" ", "-")
.replace("/", "")
.replace(":", "")
.replace(placeholder, "-")
)
galaxy_folder = cluster.galaxy.json_file_name.replace(".json", "")
if uuid:
return f"[{cluster.value} ({cluster.uuid})](../../{galaxy_folder}/index.md#{section})"
else:
return f"[{cluster.value}](../../{galaxy_folder}/index.md#{section})"
def galaxy_transform_to_link(galaxy):
galaxy_folder = galaxy.json_file_name.replace(".json", "")
return f"[{galaxy.galaxy_name}](../../{galaxy_folder}/index.md)"
def generate_relations_table(relationships):
markdown = "|Cluster A | Galaxy A | Cluster B | Galaxy B | Level { .graph } |\n"
markdown += "| --- | --- | --- | --- | --- |\n"
for from_cluster, to_cluster, level in relationships:
from_galaxy = from_cluster.galaxy
if to_cluster.value != "Private Cluster":
to_galaxy = to_cluster.galaxy
markdown += f"{cluster_transform_to_link(from_cluster, uuid=True)} | {galaxy_transform_to_link(from_galaxy)} | {cluster_transform_to_link(to_cluster, uuid=True)} | {galaxy_transform_to_link(to_galaxy)} | {level}\n"
else:
markdown += f"{cluster_transform_to_link(from_cluster, uuid=True)} | {galaxy_transform_to_link(from_galaxy)} | {to_cluster.value} | Unknown | {level}\n"
return markdown