new: [Cluster] load cluster from existing cluster

pull/25/head
Christophe Vandeplas 2024-06-18 10:16:00 +02:00
parent b9791f98c1
commit 8c2f69dbfd
No known key found for this signature in database
GPG Key ID: BDC48619FFDC5A5B
3 changed files with 32 additions and 25 deletions

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from .api import Galaxies, Clusters, EncodeGalaxies, EncodeClusters, UnableToRevertMachinetag from .api import Galaxies, Galaxy, Clusters, Cluster, EncodeGalaxies, EncodeClusters, UnableToRevertMachinetag

View File

@ -357,7 +357,7 @@ class Cluster(Mapping): # type: ignore
duplicates (List[Tuple[str, str]]): A list of tuples representing duplicate values in the cluster, where each tuple contains the name of the cluster and the duplicate value. duplicates (List[Tuple[str, str]]): A list of tuples representing duplicate values in the cluster, where each tuple contains the name of the cluster and the duplicate value.
Methods: Methods:
__init__(self, cluster: Dict[str, Any], skip_duplicates: bool = False): Initializes a Cluster object. __init__(self, cluster: Dict[str, Any] | str, skip_duplicates: bool = False): Initializes a Cluster object from a dict or existing cluster file
search(self, query: str, return_tags: bool = False) -> Union[List[ClusterValue], List[str]]: Searches for values in the cluster that match the given query. search(self, query: str, return_tags: bool = False) -> Union[List[ClusterValue], List[str]]: Searches for values in the cluster that match the given query.
machinetags(self) -> List[str]: Returns a list of machine tags for the cluster. machinetags(self) -> List[str]: Returns a list of machine tags for the cluster.
get_by_external_id(self, external_id: str) -> ClusterValue: Returns the cluster value with the specified external ID. get_by_external_id(self, external_id: str) -> ClusterValue: Returns the cluster value with the specified external ID.
@ -368,33 +368,40 @@ class Cluster(Mapping): # type: ignore
to_json(self) -> str: Converts the Cluster object to a JSON string. to_json(self) -> str: Converts the Cluster object to a JSON string.
to_dict(self) -> Dict[str, Any]: Converts the Cluster object to a dictionary. to_dict(self) -> Dict[str, Any]: Converts the Cluster object to a dictionary.
""" """
def __init__(self, cluster: Dict[str, Any], skip_duplicates: bool = False): def __init__(self, cluster: Dict[str, Any] | str, skip_duplicates: bool = False):
""" """
Initializes a Cluster object. Initializes a Cluster object.
Args: Args:
cluster (Dict[str, Any]): A dictionary containing the cluster data. cluster (Dict[str, Any] | str): A dictionary containing the cluster data, or the name of the existing cluster to load from the data folder.
skip_duplicates (bool, optional): Flag indicating whether to skip duplicate values. Defaults to False. skip_duplicates (bool, optional): Flag indicating whether to skip duplicate values. Defaults to False.
""" """
self.cluster = cluster if isinstance(cluster, str):
self.name = self.cluster['name'] root_dir_clusters = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), 'data', 'misp-galaxy', 'clusters')
self.type = self.cluster['type'] cluster_file = os.path.join(root_dir_clusters, f"{cluster}.json")
self.source = self.cluster['source'] with open(cluster_file, 'r') as f:
self.authors = self.cluster['authors'] self.__init__(json.load(f), skip_duplicates=skip_duplicates)
self.description = self.cluster['description']
self.uuid = self.cluster['uuid'] else:
self.version = self.cluster['version'] self.cluster = cluster
self.category = self.cluster['category'] self.name = self.cluster['name']
self.cluster_values = {} self.type = self.cluster['type']
self.duplicates = [] self.source = self.cluster['source']
for value in self.cluster['values']: self.authors = self.cluster['authors']
new_cluster_value = ClusterValue(value) self.description = self.cluster['description']
if self.get(new_cluster_value.value): self.uuid = self.cluster['uuid']
if skip_duplicates: self.version = self.cluster['version']
self.duplicates.append((self.name, new_cluster_value.value)) self.category = self.cluster['category']
else: self.cluster_values = {}
raise PyMISPGalaxiesError("Duplicate value ({}) in cluster: {}".format(new_cluster_value.value, self.name)) self.duplicates = []
self.cluster_values[new_cluster_value.value] = new_cluster_value for value in self.cluster['values']:
new_cluster_value = ClusterValue(value)
if self.get(new_cluster_value.value):
if skip_duplicates:
self.duplicates.append((self.name, new_cluster_value.value))
else:
raise PyMISPGalaxiesError("Duplicate value ({}) in cluster: {}".format(new_cluster_value.value, self.name))
self.cluster_values[new_cluster_value.value] = new_cluster_value
@overload @overload
def search(self, query: str, return_tags: Literal[False] = False) -> List[ClusterValue]: def search(self, query: str, return_tags: Literal[False] = False) -> List[ClusterValue]:

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import unittest import unittest
from pymispgalaxies import Galaxies, Clusters, UnableToRevertMachinetag from pymispgalaxies import Galaxies, Clusters, Cluster, UnableToRevertMachinetag
from glob import glob from glob import glob
import os import os
import json import json
@ -129,7 +129,7 @@ class TestPyMISPGalaxies(unittest.TestCase):
self.assertFalse(errors) self.assertFalse(errors)
def test_get_by_external_id(self): def test_get_by_external_id(self):
cluster = self.clusters.get('mitre-attack-pattern') cluster = Cluster(cluster='mitre-attack-pattern')
self.assertIsNotNone(cluster) self.assertIsNotNone(cluster)
cluster_by_external_id = cluster.get_by_external_id('T1525') cluster_by_external_id = cluster.get_by_external_id('T1525')
cluster_by_value = cluster.get('Implant Internal Image - T1525') cluster_by_value = cluster.get('Implant Internal Image - T1525')