new: [Cluster, Galaxy] new has_changed function

pull/28/head
Christophe Vandeplas 2024-06-25 12:21:55 +02:00
parent 21e1966ce2
commit 92cc8e36d3
No known key found for this signature in database
GPG Key ID: BDC48619FFDC5A5B
2 changed files with 63 additions and 21 deletions

View File

@ -8,7 +8,7 @@ import sys
from collections.abc import Mapping from collections.abc import Mapping
from glob import glob from glob import glob
import re import re
from typing import List, Dict, Optional, Any, Tuple, Iterator, overload, Union from typing import List, Dict, Optional, Any, Tuple, Iterator, overload, Union, Set
if sys.version_info >= (3, 8): if sys.version_info >= (3, 8):
from typing import Literal from typing import Literal
@ -56,10 +56,11 @@ class Galaxy():
name (str): The name of the galaxy. name (str): The name of the galaxy.
icon (str): The icon of the galaxy. icon (str): The icon of the galaxy.
description (str): The description of the galaxy. description (str): The description of the galaxy.
version (str): The version of the galaxy. version (int): The version of the galaxy.
uuid (str): The UUID of the galaxy. uuid (str): The UUID of the galaxy.
namespace (str, optional): The namespace of the galaxy. namespace (str, optional): The namespace of the galaxy.
kill_chain_order (str, optional): The kill chain order of the galaxy. kill_chain_order (Dict, optional): The kill chain order of the galaxy.
__init_hash (int): The hash of the json representation of the galaxy at __init__().
""" """
def __init__(self, galaxy: Union[str, Dict[str, str]]): def __init__(self, galaxy: Union[str, Dict[str, str]]):
@ -81,24 +82,37 @@ class Galaxy():
self.name = self.galaxy['name'] self.name = self.galaxy['name']
self.icon = self.galaxy['icon'] self.icon = self.galaxy['icon']
self.description = self.galaxy['description'] self.description = self.galaxy['description']
self.version = self.galaxy['version'] self.version: int = self.galaxy['version']
self.uuid = self.galaxy['uuid'] self.uuid = self.galaxy['uuid']
self.namespace = self.galaxy.pop('namespace', None) self.namespace = self.galaxy.pop('namespace', None)
self.kill_chain_order = self.galaxy.pop('kill_chain_order', None) self.kill_chain_order = self.galaxy.pop('kill_chain_order', None)
self.__init_hash: int = hash(self.to_json())
def save(self, file_name: str) -> None: def save(self, file_name: str, update_version: bool = True) -> None:
""" """
Saves the galaxy to a file <file_name>.json Saves the galaxy to a file <file_name>.json
Args: Args:
file_name (str): The name of the file to save the galaxy to. file_name (str): The name of the file to save the galaxy to.
update_version (bool, optional): Flag indicating whether to update the version if the galaxy changed. Defaults to True.
""" """
if update_version and self.has_changed():
self.version += 1
root_dir_galaxies = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), 'data', 'misp-galaxy', 'galaxies') # type: ignore [type-var, arg-type] root_dir_galaxies = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), 'data', 'misp-galaxy', 'galaxies') # type: ignore [type-var, arg-type]
galaxy_file = os.path.join(root_dir_galaxies, f"{file_name}.json") galaxy_file = os.path.join(root_dir_galaxies, f"{file_name}.json")
with open(galaxy_file, 'w') as f: with open(galaxy_file, 'w') as f:
json.dump(self, f, cls=EncodeGalaxies, indent=2, sort_keys=True, ensure_ascii=False) json.dump(self, f, cls=EncodeGalaxies, indent=2, sort_keys=True, ensure_ascii=False)
f.write('\n') # needed for the beauty and to be compliant with jq_all_the_things f.write('\n') # needed for the beauty and to be compliant with jq_all_the_things
def has_changed(self) -> bool:
"""
Checks if the galaxy has changed since initialization.
Returns:
bool: True if the galaxy has changed, False otherwise.
"""
return hash(self.to_json()) != self.__init_hash
def to_json(self) -> str: def to_json(self) -> str:
""" """
Converts the galaxy object to a JSON string. Converts the galaxy object to a JSON string.
@ -312,7 +326,7 @@ class ClusterValue():
self.value = v['value'] self.value = v['value']
self.description = v.get('description') self.description = v.get('description')
self.meta = self.__init_meta(v.get('meta')) self.meta = self.__init_meta(v.get('meta'))
self.related = [] self.related: List[Dict[str, str]] = []
try: try:
# LATER convert related to a class? # LATER convert related to a class?
self.related = v['related'] self.related = v['related']
@ -392,21 +406,21 @@ class Cluster(Mapping): # type: ignore
Represents a cluster in the PyMISPGalaxies library. Represents a cluster in the PyMISPGalaxies library.
Attributes: Attributes:
cluster (Dict[str, Any]): The dictionary containing the cluster data. cluster (Union[Dict[str, Any], str]): The dictionary containing the cluster data or the name of the existing cluster to load from the data folder.
cluster (str): The name of the existing cluster to load from the data folder.
name (str): The name of the cluster. name (str): The name of the cluster.
type (str): The type of the cluster. type (str): The type of the cluster.
source (str): The source of the cluster. source (str): The source of the cluster.
authors (str): The authors of the cluster. authors (List[str]): The authors of the cluster.
description (str): The description of the cluster. description (str): The description of the cluster.
uuid (str): The UUID of the cluster. uuid (str): The UUID of the cluster.
version (str): The version of the cluster. version (int): The version of the cluster.
category (str): The category of the cluster. category (str): The category of the cluster.
cluster_values (Dict[str, ClusterValue]): A dictionary containing the cluster values, where the keys are the values of the cluster and the values are instances of the ClusterValue class. cluster_values (Dict[str, ClusterValue]): A dictionary containing the cluster values, where the keys are the values of the cluster and the values are instances of the ClusterValue class.
duplicates (List[Tuple[str, str]]): A list of tuples representing duplicate values in the cluster, where each tuple contains the name of the cluster and the duplicate value. duplicates (List[Tuple[str, str]]): A list of tuples representing duplicate values in the cluster, where each tuple contains the name of the cluster and the duplicate value.
__init_hash (int): The hash of the json representation of the cluster at __init__().
Methods: Methods:
__init__(self, cluster: Dict[str, Any] | str, skip_duplicates: bool = False): Initializes a Cluster object from a dict or existing cluster file __init__(self, cluster: Union[Dict[str, Any], str], skip_duplicates: bool = False): Initializes a Cluster object from a dict or existing cluster file
search(self, query: str, return_tags: bool = False) -> Union[List[ClusterValue], List[str]]: Searches for values in the cluster that match the given query. search(self, query: str, return_tags: bool = False) -> Union[List[ClusterValue], List[str]]: Searches for values in the cluster that match the given query.
machinetags(self) -> List[str]: Returns a list of machine tags for the cluster. machinetags(self) -> List[str]: Returns a list of machine tags for the cluster.
get_by_external_id(self, external_id: str) -> ClusterValue: Returns the cluster value with the specified external ID. get_by_external_id(self, external_id: str) -> ClusterValue: Returns the cluster value with the specified external ID.
@ -435,14 +449,14 @@ class Cluster(Mapping): # type: ignore
self.cluster = json.load(f) self.cluster = json.load(f)
else: else:
self.cluster = cluster self.cluster = cluster
self.name = self.cluster['name'] self.name: str = self.cluster['name']
self.type = self.cluster['type'] self.type: str = self.cluster['type']
self.source = self.cluster['source'] self.source: str = self.cluster['source']
self.authors = self.cluster['authors'] self.authors: Set[str] = set(self.cluster['authors'])
self.description = self.cluster['description'] self.description: str = self.cluster['description']
self.uuid = self.cluster['uuid'] self.uuid: str = self.cluster['uuid']
self.version = self.cluster['version'] self.version: int = self.cluster['version']
self.category = self.cluster['category'] self.category: str = self.cluster['category']
self.cluster_values: Dict[str, Any] = {} self.cluster_values: Dict[str, Any] = {}
self.duplicates: List[Tuple[str, str]] = [] self.duplicates: List[Tuple[str, str]] = []
try: try:
@ -451,6 +465,7 @@ class Cluster(Mapping): # type: ignore
self.append(new_cluster_value, skip_duplicates) self.append(new_cluster_value, skip_duplicates)
except KeyError: except KeyError:
pass pass
self.__init_hash: int = hash(self.to_json())
@overload @overload
def search(self, query: str, return_tags: Literal[False] = False) -> List[ClusterValue]: def search(self, query: str, return_tags: Literal[False] = False) -> List[ClusterValue]:
@ -559,19 +574,31 @@ class Cluster(Mapping): # type: ignore
raise PyMISPGalaxiesError("Duplicate value ({}) in cluster: {}".format(cv.value, self.name)) raise PyMISPGalaxiesError("Duplicate value ({}) in cluster: {}".format(cv.value, self.name))
self.cluster_values[cv.value.lower()] = cv self.cluster_values[cv.value.lower()] = cv
def save(self, name: str) -> None: def save(self, name: str, update_version: bool = True) -> None:
""" """
Saves the cluster to a file <name>.json Saves the cluster to a file <name>.json
Args: Args:
name (str): The name of the file to save the cluster to. name (str): The name of the file to save the cluster to.
update_version (bool, optional): Flag indicating whether to update the version if the cluster changed. Defaults to True.
""" """
if update_version and self.has_changed():
self.version += 1
root_dir_clusters = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), 'data', 'misp-galaxy', 'clusters') # type: ignore [type-var, arg-type] root_dir_clusters = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), 'data', 'misp-galaxy', 'clusters') # type: ignore [type-var, arg-type]
cluster_file = os.path.join(root_dir_clusters, f"{name}.json") cluster_file = os.path.join(root_dir_clusters, f"{name}.json")
with open(cluster_file, 'w') as f: with open(cluster_file, 'w') as f:
json.dump(self, f, cls=EncodeClusters, indent=2, sort_keys=True, ensure_ascii=False) json.dump(self, f, cls=EncodeClusters, indent=2, sort_keys=True, ensure_ascii=False)
f.write('\n') # needed for the beauty and to be compliant with jq_all_the_things f.write('\n') # needed for the beauty and to be compliant with jq_all_the_things
def has_changed(self) -> bool:
"""
Checks if the cluster has changed since initialization.
Returns:
bool: True if the cluster has changed, False otherwise.
"""
return hash(self.to_json()) != self.__init_hash
def __str__(self) -> str: def __str__(self) -> str:
""" """
Returns a string representation of the cluster. Returns a string representation of the cluster.
@ -628,7 +655,7 @@ class Cluster(Mapping): # type: ignore
Dict[str, Any]: The dictionary representation of the Cluster object. Dict[str, Any]: The dictionary representation of the Cluster object.
""" """
to_return = {'name': self.name, 'type': self.type, 'source': self.source, to_return = {'name': self.name, 'type': self.type, 'source': self.source,
'authors': self.authors, 'description': self.description, 'authors': sorted(list(self.authors)), 'description': self.description,
'uuid': self.uuid, 'version': self.version, 'category': self.category, 'uuid': self.uuid, 'version': self.version, 'category': self.category,
'values': []} 'values': []}
to_return['values'] = [v for v in self.values()] to_return['values'] = [v for v in self.values()]

View File

@ -67,3 +67,18 @@ class TestPyMISPGalaxiesApi(unittest.TestCase):
self.assertEqual(rel['type'], 'similar-to') self.assertEqual(rel['type'], 'similar-to')
else: else:
self.fail(f"Unexpected related: {rel}") self.fail(f"Unexpected related: {rel}")
def test_cluster_has_changed(self):
cluster = Cluster(cluster='backdoor')
cv = cluster.get('WellMess')
self.assertFalse(cluster.has_changed())
cv.description = 'new description'
self.assertTrue(cluster.has_changed())
def test_galaxy_has_changed(self):
galaxy = self.galaxies.get('backdoor')
self.assertFalse(galaxy.has_changed())
galaxy.description = 'new description'
self.assertTrue(galaxy.has_changed())