mirror of https://github.com/MISP/PyMISPGalaxies
new: [Cluster] new get_by_external_id function + docstrings
parent
c6741af1af
commit
104cb81239
|
@ -47,8 +47,28 @@ class UnableToRevertMachinetag(PyMISPGalaxiesError):
|
|||
|
||||
|
||||
class Galaxy():
|
||||
"""
|
||||
Represents a galaxy in the PyMISPGalaxies library.
|
||||
|
||||
Attributes:
|
||||
galaxy (Dict[str, str]): The dictionary containing the galaxy data.
|
||||
type (str): The type of the galaxy.
|
||||
name (str): The name of the galaxy.
|
||||
icon (str): The icon of the galaxy.
|
||||
description (str): The description of the galaxy.
|
||||
version (str): The version of the galaxy.
|
||||
uuid (str): The UUID of the galaxy.
|
||||
namespace (str, optional): The namespace of the galaxy.
|
||||
kill_chain_order (str, optional): The kill chain order of the galaxy.
|
||||
"""
|
||||
|
||||
def __init__(self, galaxy: Dict[str, str]):
|
||||
"""
|
||||
Initializes a new instance of the Galaxy class.
|
||||
|
||||
Args:
|
||||
galaxy (Dict[str, str]): The dictionary containing the galaxy data.
|
||||
"""
|
||||
self.galaxy = galaxy
|
||||
self.type = self.galaxy['type']
|
||||
self.name = self.galaxy['name']
|
||||
|
@ -60,9 +80,21 @@ class Galaxy():
|
|||
self.kill_chain_order = self.galaxy.pop('kill_chain_order', None)
|
||||
|
||||
def to_json(self) -> str:
|
||||
"""
|
||||
Converts the galaxy object to a JSON string.
|
||||
|
||||
Returns:
|
||||
str: The JSON representation of the galaxy object.
|
||||
"""
|
||||
return json.dumps(self, cls=EncodeGalaxies)
|
||||
|
||||
def to_dict(self) -> Dict[str, str]:
|
||||
"""
|
||||
Converts the galaxy object to a dictionary.
|
||||
|
||||
Returns:
|
||||
Dict[str, str]: The dictionary representation of the galaxy object.
|
||||
"""
|
||||
to_return = {'type': self.type, 'name': self.name, 'description': self.description,
|
||||
'version': self.version, 'uuid': self.uuid, 'icon': self.icon}
|
||||
if self.namespace:
|
||||
|
@ -73,8 +105,32 @@ class Galaxy():
|
|||
|
||||
|
||||
class Galaxies(Mapping): # type: ignore
|
||||
"""
|
||||
A class representing a collection of MISP galaxies.
|
||||
|
||||
Parameters:
|
||||
- galaxies: A list of dictionaries representing the galaxies. Each dictionary should contain the name and other properties of a galaxy.
|
||||
If left empty, the galaxies are loaded from the data folder.
|
||||
|
||||
Attributes:
|
||||
- galaxies: A dictionary containing the galaxies, where the keys are the names of the galaxies and the values are instances of the Galaxy class.
|
||||
- root_dir_galaxies: The root directory of the MISP galaxies.
|
||||
|
||||
Methods:
|
||||
- validate_with_schema: Validates the galaxies against the schema.
|
||||
- __getitem__: Returns the galaxy with the specified name.
|
||||
- __iter__: Returns an iterator over the galaxy names.
|
||||
- __len__: Returns the number of galaxies in the collection.
|
||||
"""
|
||||
|
||||
def __init__(self, galaxies: List[Dict[str, str]] = []):
|
||||
"""
|
||||
Initializes a new instance of the Galaxies class.
|
||||
|
||||
Parameters:
|
||||
- galaxies: A list of dictionaries representing the galaxies. Each dictionary should contain the name and other properties of a galaxy.
|
||||
If left empty, the galaxies are loaded from the data folder.
|
||||
"""
|
||||
if not galaxies:
|
||||
galaxies = []
|
||||
self.root_dir_galaxies = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
|
||||
|
@ -88,6 +144,12 @@ class Galaxies(Mapping): # type: ignore
|
|||
self.galaxies[galaxy['name']] = Galaxy(galaxy)
|
||||
|
||||
def validate_with_schema(self) -> None:
|
||||
"""
|
||||
Validates the galaxies against the schema.
|
||||
|
||||
Raises:
|
||||
- ImportError: If the jsonschema module is not installed.
|
||||
"""
|
||||
if not HAS_JSONSCHEMA:
|
||||
raise ImportError('jsonschema is required: pip install jsonschema')
|
||||
schema = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
|
||||
|
@ -98,12 +160,36 @@ class Galaxies(Mapping): # type: ignore
|
|||
jsonschema.validate(g.galaxy, loaded_schema)
|
||||
|
||||
def __getitem__(self, name: str) -> Galaxy:
|
||||
"""
|
||||
Returns the galaxy with the specified name.
|
||||
|
||||
Parameters:
|
||||
- name: The name of the galaxy.
|
||||
|
||||
Returns:
|
||||
- The Galaxy instance with the specified name.
|
||||
|
||||
Raises:
|
||||
- KeyError: If the galaxy with the specified name does not exist.
|
||||
"""
|
||||
return self.galaxies[name]
|
||||
|
||||
def __iter__(self) -> Iterator[str]:
|
||||
"""
|
||||
Returns an iterator over the galaxy names.
|
||||
|
||||
Returns:
|
||||
- An iterator over the galaxy names.
|
||||
"""
|
||||
return iter(self.galaxies)
|
||||
|
||||
def __len__(self) -> int:
|
||||
"""
|
||||
Returns the number of galaxies in the collection.
|
||||
|
||||
Returns:
|
||||
- The number of galaxies in the collection.
|
||||
"""
|
||||
return len(self.galaxies)
|
||||
|
||||
|
||||
|
@ -173,8 +259,33 @@ class ClusterValueMeta():
|
|||
|
||||
|
||||
class ClusterValue():
|
||||
"""
|
||||
Represents a cluster value.
|
||||
|
||||
Attributes:
|
||||
uuid (str): The UUID of the cluster value.
|
||||
value (Any): The value of the cluster.
|
||||
description (str): The description of the cluster value.
|
||||
meta (ClusterValueMeta): The metadata associated with the cluster value.
|
||||
searchable (List[str]): A list of searchable terms for the cluster value.
|
||||
|
||||
Methods:
|
||||
__init__(self, v: Dict[str, Any]): Initializes a ClusterValue object.
|
||||
__init_meta(self, m: Optional[Dict[str, str]]) -> Optional[ClusterValueMeta]: Initializes the metadata for the cluster value.
|
||||
to_json(self) -> str: Converts the ClusterValue object to a JSON string.
|
||||
to_dict(self) -> Dict[str, Any]: Converts the ClusterValue object to a dictionary.
|
||||
"""
|
||||
|
||||
def __init__(self, v: Dict[str, Any]):
|
||||
"""
|
||||
Initializes a ClusterValue object.
|
||||
|
||||
Args:
|
||||
v (Dict[str, Any]): A dictionary containing the cluster value information.
|
||||
|
||||
Raises:
|
||||
PyMISPGalaxiesError: If the cluster value is invalid (no value).
|
||||
"""
|
||||
if not v['value']:
|
||||
raise PyMISPGalaxiesError("Invalid cluster (no value): {}".format(v))
|
||||
self.uuid = v.get('uuid', None)
|
||||
|
@ -189,14 +300,35 @@ class ClusterValue():
|
|||
self.searchable = list(set(self.searchable))
|
||||
|
||||
def __init_meta(self, m: Optional[Dict[str, str]]) -> Optional[ClusterValueMeta]:
|
||||
"""
|
||||
Initializes the metadata for the cluster value.
|
||||
|
||||
Args:
|
||||
m (Optional[Dict[str, str]]): A dictionary containing the metadata for the cluster value.
|
||||
|
||||
Returns:
|
||||
Optional[ClusterValueMeta]: The initialized ClusterValueMeta object or None if no metadata is provided.
|
||||
"""
|
||||
if not m:
|
||||
return None
|
||||
return ClusterValueMeta(m)
|
||||
|
||||
def to_json(self) -> str:
|
||||
"""
|
||||
Converts the ClusterValue object to a JSON string.
|
||||
|
||||
Returns:
|
||||
str: The JSON representation of the ClusterValue object.
|
||||
"""
|
||||
return json.dumps(self, cls=EncodeClusters)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Converts the ClusterValue object to a dictionary.
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: The dictionary representation of the ClusterValue object.
|
||||
"""
|
||||
to_return = {'value': self.value}
|
||||
if self.uuid:
|
||||
to_return['uuid'] = self.uuid
|
||||
|
@ -208,8 +340,42 @@ class ClusterValue():
|
|||
|
||||
|
||||
class Cluster(Mapping): # type: ignore
|
||||
"""
|
||||
Represents a cluster in the PyMISPGalaxies library.
|
||||
|
||||
Attributes:
|
||||
cluster (Dict[str, Any]): The dictionary containing the cluster data.
|
||||
name (str): The name of the cluster.
|
||||
type (str): The type of the cluster.
|
||||
source (str): The source of the cluster.
|
||||
authors (str): The authors of the cluster.
|
||||
description (str): The description of the cluster.
|
||||
uuid (str): The UUID of the cluster.
|
||||
version (str): The version of the cluster.
|
||||
category (str): The category of the cluster.
|
||||
cluster_values (Dict[str, ClusterValue]): A dictionary containing the cluster values, where the keys are the values of the cluster and the values are instances of the ClusterValue class.
|
||||
duplicates (List[Tuple[str, str]]): A list of tuples representing duplicate values in the cluster, where each tuple contains the name of the cluster and the duplicate value.
|
||||
|
||||
Methods:
|
||||
__init__(self, cluster: Dict[str, Any], skip_duplicates: bool = False): Initializes a Cluster object.
|
||||
search(self, query: str, return_tags: bool = False) -> Union[List[ClusterValue], List[str]]: Searches for values in the cluster that match the given query.
|
||||
machinetags(self) -> List[str]: Returns a list of machine tags for the cluster.
|
||||
get_by_external_id(self, external_id: str) -> ClusterValue: Returns the cluster value with the specified external ID.
|
||||
__str__(self) -> str: Returns a string representation of the cluster.
|
||||
__getitem__(self, name: str) -> ClusterValue: Returns the cluster value with the specified name.
|
||||
__len__(self) -> int: Returns the number of cluster values in the cluster.
|
||||
__iter__(self) -> Iterator[str]: Returns an iterator over the cluster values.
|
||||
to_json(self) -> str: Converts the Cluster object to a JSON string.
|
||||
to_dict(self) -> Dict[str, Any]: Converts the Cluster object to a dictionary.
|
||||
"""
|
||||
def __init__(self, cluster: Dict[str, Any], skip_duplicates: bool = False):
|
||||
"""
|
||||
Initializes a Cluster object.
|
||||
|
||||
Args:
|
||||
cluster (Dict[str, Any]): A dictionary containing the cluster data.
|
||||
skip_duplicates (bool, optional): Flag indicating whether to skip duplicate values. Defaults to False.
|
||||
"""
|
||||
self.cluster = cluster
|
||||
self.name = self.cluster['name']
|
||||
self.type = self.cluster['type']
|
||||
|
@ -243,38 +409,110 @@ class Cluster(Mapping): # type: ignore
|
|||
...
|
||||
|
||||
def search(self, query: str, return_tags: bool = False) -> Union[List[ClusterValue], List[str]]:
|
||||
"""
|
||||
Searches for values in the cluster that match the given query.
|
||||
|
||||
Args:
|
||||
query (str): The query to search for.
|
||||
return_tags (bool, optional): Flag indicating whether to return machine tags instead of cluster values. Defaults to False.
|
||||
|
||||
Returns:
|
||||
Union[List[ClusterValue], List[str]]: A list of matching cluster values or machine tags.
|
||||
"""
|
||||
matching = []
|
||||
for v in self.values():
|
||||
if [s for s in v.searchable if query.lower() in s.lower()]:
|
||||
if return_tags:
|
||||
matching.append('misp-galaxy:{}="{}"'.format(self.type, v.value))
|
||||
pass
|
||||
else:
|
||||
matching.append(v)
|
||||
return matching
|
||||
|
||||
def machinetags(self) -> List[str]:
|
||||
"""
|
||||
Returns a list of machine tags for the cluster.
|
||||
|
||||
Returns:
|
||||
List[str]: A list of machine tags.
|
||||
"""
|
||||
to_return = []
|
||||
for v in self.values():
|
||||
to_return.append('misp-galaxy:{}="{}"'.format(self.type, v.value))
|
||||
return to_return
|
||||
|
||||
def get_by_external_id(self, external_id: str) -> ClusterValue:
|
||||
"""
|
||||
Returns the cluster value with the specified external ID.
|
||||
|
||||
Args:
|
||||
external_id (str): The external ID to search for.
|
||||
|
||||
Returns:
|
||||
ClusterValue: The cluster value with the specified external ID.
|
||||
|
||||
Raises:
|
||||
KeyError: If no value with the specified external ID is found.
|
||||
"""
|
||||
for value in self.cluster_values.values():
|
||||
if value.meta and value.meta.additional_properties and value.meta.additional_properties.get('external_id') == external_id:
|
||||
return value
|
||||
raise KeyError('No value with external_id: {}'.format(external_id))
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""
|
||||
Returns a string representation of the cluster.
|
||||
|
||||
Returns:
|
||||
str: A string representation of the cluster.
|
||||
"""
|
||||
return '\n'.join(self.machinetags())
|
||||
|
||||
def __getitem__(self, name: str) -> ClusterValue:
|
||||
"""
|
||||
Returns the cluster value with the specified name.
|
||||
|
||||
Args:
|
||||
name (str): The name of the cluster value.
|
||||
|
||||
Returns:
|
||||
ClusterValue: The cluster value with the specified name.
|
||||
"""
|
||||
return self.cluster_values[name]
|
||||
|
||||
def __len__(self) -> int:
|
||||
"""
|
||||
Returns the number of cluster values in the cluster.
|
||||
|
||||
Returns:
|
||||
int: The number of cluster values.
|
||||
"""
|
||||
return len(self.cluster_values)
|
||||
|
||||
def __iter__(self) -> Iterator[str]:
|
||||
"""
|
||||
Returns an iterator over the cluster values.
|
||||
|
||||
Returns:
|
||||
Iterator[str]: An iterator over the cluster values.
|
||||
"""
|
||||
return iter(self.cluster_values)
|
||||
|
||||
def to_json(self) -> str:
|
||||
"""
|
||||
Converts the Cluster object to a JSON string.
|
||||
|
||||
Returns:
|
||||
str: The JSON representation of the Cluster object.
|
||||
"""
|
||||
return json.dumps(self, cls=EncodeClusters)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Converts the Cluster object to a dictionary.
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: The dictionary representation of the Cluster object.
|
||||
"""
|
||||
to_return = {'name': self.name, 'type': self.type, 'source': self.source,
|
||||
'authors': self.authors, 'description': self.description,
|
||||
'uuid': self.uuid, 'version': self.version, 'category': self.category,
|
||||
|
@ -286,6 +524,13 @@ class Cluster(Mapping): # type: ignore
|
|||
class Clusters(Mapping): # type: ignore
|
||||
|
||||
def __init__(self, clusters: List[Dict[str, str]] = [], skip_duplicates: bool = False):
|
||||
"""
|
||||
Allows to interact with a group of clusters.
|
||||
|
||||
Args:
|
||||
clusters (List[Dict[str, str]], optional): A list of dictionaries representing clusters. If left empty, load the clusters from the data folder.
|
||||
skip_duplicates (bool, optional): Flag indicating whether to skip duplicate clusters. Defaults to False.
|
||||
"""
|
||||
if not clusters:
|
||||
clusters = []
|
||||
self.root_dir_clusters = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
|
||||
|
@ -298,6 +543,12 @@ class Clusters(Mapping): # type: ignore
|
|||
self.clusters[cluster['type']] = Cluster(cluster, skip_duplicates=skip_duplicates)
|
||||
|
||||
def validate_with_schema(self) -> None:
|
||||
"""
|
||||
Validates the clusters against the schema.
|
||||
|
||||
Raises:
|
||||
ImportError: If jsonschema is not installed.
|
||||
"""
|
||||
if not HAS_JSONSCHEMA:
|
||||
raise ImportError('jsonschema is required: pip install jsonschema')
|
||||
schema = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
|
||||
|
@ -308,9 +559,27 @@ class Clusters(Mapping): # type: ignore
|
|||
jsonschema.validate(c.cluster, loaded_schema)
|
||||
|
||||
def all_machinetags(self) -> List[str]:
|
||||
"""
|
||||
Returns a list of all machinetags in the clusters.
|
||||
|
||||
Returns:
|
||||
List[str]: A list of machinetags.
|
||||
"""
|
||||
return [cluster.machinetags() for cluster in self.values()]
|
||||
|
||||
def revert_machinetag(self, machinetag: str) -> Tuple[Cluster, ClusterValue]:
|
||||
"""
|
||||
Reverts a machinetag to its original cluster and value.
|
||||
|
||||
Args:
|
||||
machinetag (str): The machinetag to revert.
|
||||
|
||||
Returns:
|
||||
Tuple[Cluster, ClusterValue]: A tuple containing the original cluster and value.
|
||||
|
||||
Raises:
|
||||
UnableToRevertMachinetag: If the machinetag could not be found.
|
||||
"""
|
||||
try:
|
||||
_, cluster_type, cluster_value = re.findall('^([^:]*):([^=]*)="([^"]*)"$', machinetag)[0]
|
||||
cluster: Cluster = self[cluster_type]
|
||||
|
@ -320,6 +589,17 @@ class Clusters(Mapping): # type: ignore
|
|||
raise UnableToRevertMachinetag('The machinetag {} could not be found.'.format(machinetag))
|
||||
|
||||
def search(self, query: str, return_tags: bool = False) -> List[Tuple[Cluster, str]]:
|
||||
"""
|
||||
Searches for clusters and values matching the given query.
|
||||
|
||||
Args:
|
||||
query (str): The query to search for.
|
||||
return_tags (bool, optional): Flag indicating whether to return the matching tags. Defaults to False.
|
||||
|
||||
Returns:
|
||||
List[Tuple[Cluster, str]]: A list of tuples containing the matching cluster and value.
|
||||
|
||||
"""
|
||||
to_return = []
|
||||
for cluster in self.values():
|
||||
values = cluster.search(query, return_tags)
|
||||
|
@ -329,15 +609,45 @@ class Clusters(Mapping): # type: ignore
|
|||
return to_return
|
||||
|
||||
def __getitem__(self, name: str) -> Cluster:
|
||||
"""
|
||||
Returns the cluster with the specified name.
|
||||
|
||||
Args:
|
||||
name (str): The name of the cluster.
|
||||
|
||||
Returns:
|
||||
Cluster: The cluster object.
|
||||
|
||||
Raises:
|
||||
KeyError: If the cluster with the specified name does not exist.
|
||||
"""
|
||||
return self.clusters[name]
|
||||
|
||||
def __iter__(self) -> Iterator[str]:
|
||||
"""
|
||||
Returns an iterator over the cluster names.
|
||||
|
||||
Returns:
|
||||
Iterator[str]: An iterator over the cluster names.
|
||||
"""
|
||||
return iter(self.clusters)
|
||||
|
||||
def __len__(self) -> int:
|
||||
"""
|
||||
Returns the number of clusters.
|
||||
|
||||
Returns:
|
||||
int: The number of clusters.
|
||||
"""
|
||||
return len(self.clusters)
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""
|
||||
Returns a string representation of the Clusters object.
|
||||
|
||||
Returns:
|
||||
str: A string representation of the Clusters object.
|
||||
"""
|
||||
to_print = ''
|
||||
for cluster in self.values():
|
||||
to_print += '{}\n\n'.format(cluster)
|
||||
|
|
|
@ -127,3 +127,13 @@ class TestPyMISPGalaxies(unittest.TestCase):
|
|||
errors[uuid] = entries
|
||||
print(json.dumps(errors, indent=2))
|
||||
self.assertFalse(errors)
|
||||
|
||||
def test_get_by_external_id(self):
|
||||
cluster = self.clusters.get('mitre-attack-pattern')
|
||||
self.assertIsNotNone(cluster)
|
||||
cluster_by_external_id = cluster.get_by_external_id('T1525')
|
||||
cluster_by_value = cluster.get('Implant Internal Image - T1525')
|
||||
self.assertEqual(cluster_by_external_id, cluster_by_value)
|
||||
|
||||
with self.assertRaises(KeyError):
|
||||
cluster.get_by_external_id('XXXXXX')
|
||||
|
|
Loading…
Reference in New Issue