mirror of https://github.com/MISP/PyMISPGalaxies
Merge pull request #28 from cvandeplas/main
new: New functions, better RFC compliance and more coherent outputpull/29/head
commit
9fa4a1be29
|
@ -8,7 +8,7 @@ import sys
|
||||||
from collections.abc import Mapping
|
from collections.abc import Mapping
|
||||||
from glob import glob
|
from glob import glob
|
||||||
import re
|
import re
|
||||||
from typing import List, Dict, Optional, Any, Tuple, Iterator, overload, Union
|
from typing import List, Dict, Optional, Any, Tuple, Iterator, overload, Union, Set
|
||||||
|
|
||||||
if sys.version_info >= (3, 8):
|
if sys.version_info >= (3, 8):
|
||||||
from typing import Literal
|
from typing import Literal
|
||||||
|
@ -31,7 +31,12 @@ class EncodeGalaxies(JSONEncoder):
|
||||||
|
|
||||||
class EncodeClusters(JSONEncoder):
|
class EncodeClusters(JSONEncoder):
|
||||||
def default(self, obj: Any) -> Dict[str, str]:
|
def default(self, obj: Any) -> Dict[str, str]:
|
||||||
if isinstance(obj, (Cluster, ClusterValue, ClusterValueMeta)):
|
if isinstance(obj, list):
|
||||||
|
obj.sort()
|
||||||
|
return JSONEncoder.default(self, obj)
|
||||||
|
elif isinstance(obj, set):
|
||||||
|
return JSONEncoder.default(self, sorted(list(obj)))
|
||||||
|
elif isinstance(obj, (Cluster, ClusterValue, ClusterValueMeta)):
|
||||||
return obj.to_dict()
|
return obj.to_dict()
|
||||||
return JSONEncoder.default(self, obj)
|
return JSONEncoder.default(self, obj)
|
||||||
|
|
||||||
|
@ -56,10 +61,11 @@ class Galaxy():
|
||||||
name (str): The name of the galaxy.
|
name (str): The name of the galaxy.
|
||||||
icon (str): The icon of the galaxy.
|
icon (str): The icon of the galaxy.
|
||||||
description (str): The description of the galaxy.
|
description (str): The description of the galaxy.
|
||||||
version (str): The version of the galaxy.
|
version (int): The version of the galaxy.
|
||||||
uuid (str): The UUID of the galaxy.
|
uuid (str): The UUID of the galaxy.
|
||||||
namespace (str, optional): The namespace of the galaxy.
|
namespace (str, optional): The namespace of the galaxy.
|
||||||
kill_chain_order (str, optional): The kill chain order of the galaxy.
|
kill_chain_order (Dict, optional): The kill chain order of the galaxy.
|
||||||
|
_init_hash (int): The hash of the json representation of the galaxy at __init__().
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, galaxy: Union[str, Dict[str, str]]):
|
def __init__(self, galaxy: Union[str, Dict[str, str]]):
|
||||||
|
@ -81,24 +87,37 @@ class Galaxy():
|
||||||
self.name = self.galaxy['name']
|
self.name = self.galaxy['name']
|
||||||
self.icon = self.galaxy['icon']
|
self.icon = self.galaxy['icon']
|
||||||
self.description = self.galaxy['description']
|
self.description = self.galaxy['description']
|
||||||
self.version = self.galaxy['version']
|
self.version: int = self.galaxy['version']
|
||||||
self.uuid = self.galaxy['uuid']
|
self.uuid = self.galaxy['uuid']
|
||||||
self.namespace = self.galaxy.pop('namespace', None)
|
self.namespace = self.galaxy.pop('namespace', None)
|
||||||
self.kill_chain_order = self.galaxy.pop('kill_chain_order', None)
|
self.kill_chain_order = self.galaxy.pop('kill_chain_order', None)
|
||||||
|
self._init_hash: int = hash(self.to_json())
|
||||||
|
|
||||||
def save(self, file_name: str) -> None:
|
def save(self, file_name: str, update_version: bool = True) -> None:
|
||||||
"""
|
"""
|
||||||
Saves the galaxy to a file <file_name>.json
|
Saves the galaxy to a file <file_name>.json
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
file_name (str): The name of the file to save the galaxy to.
|
file_name (str): The name of the file to save the galaxy to.
|
||||||
|
update_version (bool, optional): Flag indicating whether to update the version if the galaxy changed. Defaults to True.
|
||||||
"""
|
"""
|
||||||
|
if update_version and self.has_changed():
|
||||||
|
self.version += 1
|
||||||
root_dir_galaxies = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), 'data', 'misp-galaxy', 'galaxies') # type: ignore [type-var, arg-type]
|
root_dir_galaxies = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), 'data', 'misp-galaxy', 'galaxies') # type: ignore [type-var, arg-type]
|
||||||
galaxy_file = os.path.join(root_dir_galaxies, f"{file_name}.json")
|
galaxy_file = os.path.join(root_dir_galaxies, f"{file_name}.json")
|
||||||
with open(galaxy_file, 'w') as f:
|
with open(galaxy_file, 'w') as f:
|
||||||
json.dump(self, f, cls=EncodeGalaxies, indent=2, sort_keys=True, ensure_ascii=False)
|
json.dump(self, f, cls=EncodeGalaxies, indent=2, sort_keys=True, ensure_ascii=False)
|
||||||
f.write('\n') # needed for the beauty and to be compliant with jq_all_the_things
|
f.write('\n') # needed for the beauty and to be compliant with jq_all_the_things
|
||||||
|
|
||||||
|
def has_changed(self) -> bool:
|
||||||
|
"""
|
||||||
|
Checks if the galaxy has changed since initialization.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if the galaxy has changed, False otherwise.
|
||||||
|
"""
|
||||||
|
return hash(self.to_json()) != self._init_hash
|
||||||
|
|
||||||
def to_json(self) -> str:
|
def to_json(self) -> str:
|
||||||
"""
|
"""
|
||||||
Converts the galaxy object to a JSON string.
|
Converts the galaxy object to a JSON string.
|
||||||
|
@ -106,9 +125,9 @@ class Galaxy():
|
||||||
Returns:
|
Returns:
|
||||||
str: The JSON representation of the galaxy object.
|
str: The JSON representation of the galaxy object.
|
||||||
"""
|
"""
|
||||||
return json.dumps(self, cls=EncodeGalaxies)
|
return json.dumps(self, cls=EncodeGalaxies, sort_keys=True)
|
||||||
|
|
||||||
def to_dict(self) -> Dict[str, str]:
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Converts the galaxy object to a dictionary.
|
Converts the galaxy object to a dictionary.
|
||||||
|
|
||||||
|
@ -215,7 +234,7 @@ class Galaxies(Mapping): # type: ignore
|
||||||
|
|
||||||
class ClusterValueMeta():
|
class ClusterValueMeta():
|
||||||
|
|
||||||
def __init__(self, m: Dict[str, str]):
|
def __init__(self, m: Dict[str, Any]):
|
||||||
self.type = m.pop('type', None)
|
self.type = m.pop('type', None)
|
||||||
self.complexity = m.pop('complexity', None)
|
self.complexity = m.pop('complexity', None)
|
||||||
self.effectiveness = m.pop('effectiveness', None)
|
self.effectiveness = m.pop('effectiveness', None)
|
||||||
|
@ -224,8 +243,9 @@ class ClusterValueMeta():
|
||||||
self.colour = m.pop('colour', None)
|
self.colour = m.pop('colour', None)
|
||||||
self.motive = m.pop('motive', None)
|
self.motive = m.pop('motive', None)
|
||||||
self.impact = m.pop('impact', None)
|
self.impact = m.pop('impact', None)
|
||||||
self.refs = m.pop('refs', None)
|
self.refs: Set[str] = set(m.pop('refs', []))
|
||||||
self.synonyms = m.pop('synonyms', None)
|
self.official_refs: Set[str] = set(m.pop('official-refs', []))
|
||||||
|
self.synonyms: Set[str] = set(m.pop('synonyms', []))
|
||||||
self.derivated_from = m.pop('derivated_from', None)
|
self.derivated_from = m.pop('derivated_from', None)
|
||||||
self.status = m.pop('status', None)
|
self.status = m.pop('status', None)
|
||||||
self.date = m.pop('date', None)
|
self.date = m.pop('date', None)
|
||||||
|
@ -237,9 +257,9 @@ class ClusterValueMeta():
|
||||||
self.additional_properties = m
|
self.additional_properties = m
|
||||||
|
|
||||||
def to_json(self) -> str:
|
def to_json(self) -> str:
|
||||||
return json.dumps(self, cls=EncodeClusters)
|
return json.dumps(self, cls=EncodeClusters, sort_keys=True)
|
||||||
|
|
||||||
def to_dict(self) -> Dict[str, str]:
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
to_return = {}
|
to_return = {}
|
||||||
if self.type:
|
if self.type:
|
||||||
to_return['type'] = self.type
|
to_return['type'] = self.type
|
||||||
|
@ -258,9 +278,11 @@ class ClusterValueMeta():
|
||||||
if self.impact:
|
if self.impact:
|
||||||
to_return['impact'] = self.impact
|
to_return['impact'] = self.impact
|
||||||
if self.refs:
|
if self.refs:
|
||||||
to_return['refs'] = self.refs
|
to_return['refs'] = sorted(list(self.refs))
|
||||||
|
if self.official_refs:
|
||||||
|
to_return['official-refs'] = sorted(list(self.official_refs))
|
||||||
if self.synonyms:
|
if self.synonyms:
|
||||||
to_return['synonyms'] = self.synonyms
|
to_return['synonyms'] = sorted(list(self.synonyms))
|
||||||
if self.derivated_from:
|
if self.derivated_from:
|
||||||
to_return['derivated_from'] = self.derivated_from
|
to_return['derivated_from'] = self.derivated_from
|
||||||
if self.status:
|
if self.status:
|
||||||
|
@ -274,7 +296,11 @@ class ClusterValueMeta():
|
||||||
if self.ransomnotes:
|
if self.ransomnotes:
|
||||||
to_return['ransomnotes'] = self.ransomnotes
|
to_return['ransomnotes'] = self.ransomnotes
|
||||||
if self.additional_properties:
|
if self.additional_properties:
|
||||||
to_return.update(self.additional_properties)
|
for key, value in self.additional_properties.items():
|
||||||
|
if isinstance(value, list):
|
||||||
|
to_return[key] = sorted(value)
|
||||||
|
else:
|
||||||
|
to_return[key] = value
|
||||||
return to_return
|
return to_return
|
||||||
|
|
||||||
|
|
||||||
|
@ -312,7 +338,7 @@ class ClusterValue():
|
||||||
self.value = v['value']
|
self.value = v['value']
|
||||||
self.description = v.get('description')
|
self.description = v.get('description')
|
||||||
self.meta = self.__init_meta(v.get('meta'))
|
self.meta = self.__init_meta(v.get('meta'))
|
||||||
self.related = []
|
self.related: List[Dict[str, str]] = []
|
||||||
try:
|
try:
|
||||||
# LATER convert related to a class?
|
# LATER convert related to a class?
|
||||||
self.related = v['related']
|
self.related = v['related']
|
||||||
|
@ -339,6 +365,26 @@ class ClusterValue():
|
||||||
return None
|
return None
|
||||||
return ClusterValueMeta(m)
|
return ClusterValueMeta(m)
|
||||||
|
|
||||||
|
def merge(self, new: 'ClusterValue') -> None:
|
||||||
|
"""
|
||||||
|
Merges the new cluster value with the existing one. Practically it replaces the existing one but merges relations
|
||||||
|
"""
|
||||||
|
# backup relations
|
||||||
|
related_backup = self.related.copy()
|
||||||
|
# overwrite itself
|
||||||
|
self.__init__(new.to_dict()) # type: ignore [misc]
|
||||||
|
# merge relations with backup # LATER conver related to a class of Hashable type, as that would be much more efficient in keeping uniques
|
||||||
|
for rel in related_backup:
|
||||||
|
# if uuid exists, skip, as we already copied it
|
||||||
|
exists = False
|
||||||
|
for existing_item in self.related:
|
||||||
|
if rel['dest-uuid'] == existing_item['dest-uuid']:
|
||||||
|
exists = True
|
||||||
|
break
|
||||||
|
# else append rel to list
|
||||||
|
if not exists:
|
||||||
|
self.related.append(rel)
|
||||||
|
|
||||||
def to_json(self) -> str:
|
def to_json(self) -> str:
|
||||||
"""
|
"""
|
||||||
Converts the ClusterValue object to a JSON string.
|
Converts the ClusterValue object to a JSON string.
|
||||||
|
@ -346,7 +392,7 @@ class ClusterValue():
|
||||||
Returns:
|
Returns:
|
||||||
str: The JSON representation of the ClusterValue object.
|
str: The JSON representation of the ClusterValue object.
|
||||||
"""
|
"""
|
||||||
return json.dumps(self, cls=EncodeClusters)
|
return json.dumps(self, cls=EncodeClusters, sort_keys=True)
|
||||||
|
|
||||||
def to_dict(self) -> Dict[str, Any]:
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
|
@ -361,9 +407,9 @@ class ClusterValue():
|
||||||
if self.description:
|
if self.description:
|
||||||
to_return['description'] = self.description
|
to_return['description'] = self.description
|
||||||
if self.meta:
|
if self.meta:
|
||||||
to_return['meta'] = self.meta
|
to_return['meta'] = self.meta.to_dict()
|
||||||
if self.related:
|
if self.related:
|
||||||
to_return['related'] = self.related
|
to_return['related'] = sorted(self.related, key=lambda x: x['dest-uuid'])
|
||||||
return to_return
|
return to_return
|
||||||
|
|
||||||
|
|
||||||
|
@ -372,21 +418,21 @@ class Cluster(Mapping): # type: ignore
|
||||||
Represents a cluster in the PyMISPGalaxies library.
|
Represents a cluster in the PyMISPGalaxies library.
|
||||||
|
|
||||||
Attributes:
|
Attributes:
|
||||||
cluster (Dict[str, Any]): The dictionary containing the cluster data.
|
cluster (Union[Dict[str, Any], str]): The dictionary containing the cluster data or the name of the existing cluster to load from the data folder.
|
||||||
cluster (str): The name of the existing cluster to load from the data folder.
|
|
||||||
name (str): The name of the cluster.
|
name (str): The name of the cluster.
|
||||||
type (str): The type of the cluster.
|
type (str): The type of the cluster.
|
||||||
source (str): The source of the cluster.
|
source (str): The source of the cluster.
|
||||||
authors (str): The authors of the cluster.
|
authors (List[str]): The authors of the cluster.
|
||||||
description (str): The description of the cluster.
|
description (str): The description of the cluster.
|
||||||
uuid (str): The UUID of the cluster.
|
uuid (str): The UUID of the cluster.
|
||||||
version (str): The version of the cluster.
|
version (int): The version of the cluster.
|
||||||
category (str): The category of the cluster.
|
category (str): The category of the cluster.
|
||||||
cluster_values (Dict[str, ClusterValue]): A dictionary containing the cluster values, where the keys are the values of the cluster and the values are instances of the ClusterValue class.
|
cluster_values (Dict[str, ClusterValue]): A dictionary containing the cluster values, where the keys are the values of the cluster and the values are instances of the ClusterValue class.
|
||||||
duplicates (List[Tuple[str, str]]): A list of tuples representing duplicate values in the cluster, where each tuple contains the name of the cluster and the duplicate value.
|
duplicates (List[Tuple[str, str]]): A list of tuples representing duplicate values in the cluster, where each tuple contains the name of the cluster and the duplicate value.
|
||||||
|
_init_hash (int): The hash of the json representation of the cluster at __init__().
|
||||||
|
|
||||||
Methods:
|
Methods:
|
||||||
__init__(self, cluster: Dict[str, Any] | str, skip_duplicates: bool = False): Initializes a Cluster object from a dict or existing cluster file
|
__init__(self, cluster: Union[Dict[str, Any], str], skip_duplicates: bool = False): Initializes a Cluster object from a dict or existing cluster file
|
||||||
search(self, query: str, return_tags: bool = False) -> Union[List[ClusterValue], List[str]]: Searches for values in the cluster that match the given query.
|
search(self, query: str, return_tags: bool = False) -> Union[List[ClusterValue], List[str]]: Searches for values in the cluster that match the given query.
|
||||||
machinetags(self) -> List[str]: Returns a list of machine tags for the cluster.
|
machinetags(self) -> List[str]: Returns a list of machine tags for the cluster.
|
||||||
get_by_external_id(self, external_id: str) -> ClusterValue: Returns the cluster value with the specified external ID.
|
get_by_external_id(self, external_id: str) -> ClusterValue: Returns the cluster value with the specified external ID.
|
||||||
|
@ -415,14 +461,14 @@ class Cluster(Mapping): # type: ignore
|
||||||
self.cluster = json.load(f)
|
self.cluster = json.load(f)
|
||||||
else:
|
else:
|
||||||
self.cluster = cluster
|
self.cluster = cluster
|
||||||
self.name = self.cluster['name']
|
self.name: str = self.cluster['name']
|
||||||
self.type = self.cluster['type']
|
self.type: str = self.cluster['type']
|
||||||
self.source = self.cluster['source']
|
self.source: str = self.cluster['source']
|
||||||
self.authors = self.cluster['authors']
|
self.authors: Set[str] = set(self.cluster['authors'])
|
||||||
self.description = self.cluster['description']
|
self.description: str = self.cluster['description']
|
||||||
self.uuid = self.cluster['uuid']
|
self.uuid: str = self.cluster['uuid']
|
||||||
self.version = self.cluster['version']
|
self.version: int = self.cluster['version']
|
||||||
self.category = self.cluster['category']
|
self.category: str = self.cluster['category']
|
||||||
self.cluster_values: Dict[str, Any] = {}
|
self.cluster_values: Dict[str, Any] = {}
|
||||||
self.duplicates: List[Tuple[str, str]] = []
|
self.duplicates: List[Tuple[str, str]] = []
|
||||||
try:
|
try:
|
||||||
|
@ -431,6 +477,7 @@ class Cluster(Mapping): # type: ignore
|
||||||
self.append(new_cluster_value, skip_duplicates)
|
self.append(new_cluster_value, skip_duplicates)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
self._init_hash: int = hash(self.to_json())
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
def search(self, query: str, return_tags: Literal[False] = False) -> List[ClusterValue]:
|
def search(self, query: str, return_tags: Literal[False] = False) -> List[ClusterValue]:
|
||||||
|
@ -519,30 +566,51 @@ class Cluster(Mapping): # type: ignore
|
||||||
|
|
||||||
def append(self, cv: Union[Dict[str, Any], ClusterValue], skip_duplicates: bool = False) -> None:
|
def append(self, cv: Union[Dict[str, Any], ClusterValue], skip_duplicates: bool = False) -> None:
|
||||||
"""
|
"""
|
||||||
Adds a cluster value to the cluster.
|
Adds a cluster value to the cluster, and merge it if it already exists.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
cv (Union[Dict[str, Any], ClusterValue]): The cluster value to add.
|
||||||
|
skip_duplicates (bool, optional): Flag indicating whether to skip duplicate values. Defaults to False.
|
||||||
"""
|
"""
|
||||||
if isinstance(cv, dict):
|
if isinstance(cv, dict):
|
||||||
cv = ClusterValue(cv)
|
cv = ClusterValue(cv)
|
||||||
if self.get(cv.value):
|
existing = self.get(cv.value)
|
||||||
if skip_duplicates:
|
if existing:
|
||||||
|
if cv.uuid == existing.uuid:
|
||||||
|
# merge the existing
|
||||||
|
self.cluster_values[cv.value.lower()].merge(cv)
|
||||||
|
return
|
||||||
|
elif skip_duplicates:
|
||||||
self.duplicates.append((self.name, cv.value))
|
self.duplicates.append((self.name, cv.value))
|
||||||
else:
|
else:
|
||||||
raise PyMISPGalaxiesError("Duplicate value ({}) in cluster: {}".format(cv.value, self.name))
|
raise PyMISPGalaxiesError("Duplicate value ({}) in cluster: {}".format(cv.value, self.name))
|
||||||
self.cluster_values[cv.value.lower()] = cv
|
self.cluster_values[cv.value.lower()] = cv
|
||||||
|
|
||||||
def save(self, name: str) -> None:
|
def save(self, name: str, update_version: bool = True) -> None:
|
||||||
"""
|
"""
|
||||||
Saves the cluster to a file <name>.json
|
Saves the cluster to a file <name>.json
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
name (str): The name of the file to save the cluster to.
|
name (str): The name of the file to save the cluster to.
|
||||||
|
update_version (bool, optional): Flag indicating whether to update the version if the cluster changed. Defaults to True.
|
||||||
"""
|
"""
|
||||||
|
if update_version and self.has_changed():
|
||||||
|
self.version += 1
|
||||||
root_dir_clusters = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), 'data', 'misp-galaxy', 'clusters') # type: ignore [type-var, arg-type]
|
root_dir_clusters = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), 'data', 'misp-galaxy', 'clusters') # type: ignore [type-var, arg-type]
|
||||||
cluster_file = os.path.join(root_dir_clusters, f"{name}.json")
|
cluster_file = os.path.join(root_dir_clusters, f"{name}.json")
|
||||||
with open(cluster_file, 'w') as f:
|
with open(cluster_file, 'w') as f:
|
||||||
json.dump(self, f, cls=EncodeClusters, indent=2, sort_keys=True, ensure_ascii=False)
|
json.dump(self, f, cls=EncodeClusters, indent=2, sort_keys=True, ensure_ascii=False)
|
||||||
f.write('\n') # needed for the beauty and to be compliant with jq_all_the_things
|
f.write('\n') # needed for the beauty and to be compliant with jq_all_the_things
|
||||||
|
|
||||||
|
def has_changed(self) -> bool:
|
||||||
|
"""
|
||||||
|
Checks if the cluster has changed since initialization.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if the cluster has changed, False otherwise.
|
||||||
|
"""
|
||||||
|
return hash(self.to_json()) != self._init_hash
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
"""
|
"""
|
||||||
Returns a string representation of the cluster.
|
Returns a string representation of the cluster.
|
||||||
|
@ -589,7 +657,7 @@ class Cluster(Mapping): # type: ignore
|
||||||
Returns:
|
Returns:
|
||||||
str: The JSON representation of the Cluster object.
|
str: The JSON representation of the Cluster object.
|
||||||
"""
|
"""
|
||||||
return json.dumps(self, cls=EncodeClusters)
|
return json.dumps(self, cls=EncodeClusters, sort_keys=True)
|
||||||
|
|
||||||
def to_dict(self) -> Dict[str, Any]:
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
|
@ -599,7 +667,7 @@ class Cluster(Mapping): # type: ignore
|
||||||
Dict[str, Any]: The dictionary representation of the Cluster object.
|
Dict[str, Any]: The dictionary representation of the Cluster object.
|
||||||
"""
|
"""
|
||||||
to_return = {'name': self.name, 'type': self.type, 'source': self.source,
|
to_return = {'name': self.name, 'type': self.type, 'source': self.source,
|
||||||
'authors': self.authors, 'description': self.description,
|
'authors': sorted(list(self.authors)), 'description': self.description,
|
||||||
'uuid': self.uuid, 'version': self.version, 'category': self.category,
|
'uuid': self.uuid, 'version': self.version, 'category': self.category,
|
||||||
'values': []}
|
'values': []}
|
||||||
to_return['values'] = [v for v in self.values()]
|
to_return['values'] = [v for v in self.values()]
|
||||||
|
|
|
@ -2,13 +2,15 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
from pymispgalaxies import Galaxies, Clusters, UnableToRevertMachinetag
|
from pymispgalaxies import Galaxies, Clusters, UnableToRevertMachinetag, Galaxy, Cluster
|
||||||
from glob import glob
|
from glob import glob
|
||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
from collections import Counter, defaultdict
|
from collections import Counter, defaultdict
|
||||||
import warnings
|
import warnings
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
import filecmp
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
|
||||||
class TestPyMISPGalaxies(unittest.TestCase):
|
class TestPyMISPGalaxies(unittest.TestCase):
|
||||||
|
@ -48,6 +50,16 @@ class TestPyMISPGalaxies(unittest.TestCase):
|
||||||
out = g.to_dict()
|
out = g.to_dict()
|
||||||
self.assertDictEqual(out, galaxies_from_files[g.type])
|
self.assertDictEqual(out, galaxies_from_files[g.type])
|
||||||
|
|
||||||
|
@unittest.skip("We don't want to enforce it.")
|
||||||
|
def test_save_galaxies(self):
|
||||||
|
for galaxy_file in glob(os.path.join(self.galaxies.root_dir_galaxies, '*.json')):
|
||||||
|
with open(galaxy_file, 'r') as f:
|
||||||
|
galaxy = Galaxy(json.load(f))
|
||||||
|
with tempfile.NamedTemporaryFile(suffix='.json') as temp_file:
|
||||||
|
temp_file_no_suffix = temp_file.name[:-5]
|
||||||
|
galaxy.save(temp_file_no_suffix)
|
||||||
|
self.assertTrue(filecmp.cmp(galaxy_file, temp_file.name), msg=f"{galaxy_file} different when saving using Galaxy.save(). Maybe an sorting issue?")
|
||||||
|
|
||||||
def test_dump_clusters(self):
|
def test_dump_clusters(self):
|
||||||
clusters_from_files = {}
|
clusters_from_files = {}
|
||||||
for cluster_file in glob(os.path.join(self.clusters.root_dir_clusters, '*.json')):
|
for cluster_file in glob(os.path.join(self.clusters.root_dir_clusters, '*.json')):
|
||||||
|
@ -59,6 +71,16 @@ class TestPyMISPGalaxies(unittest.TestCase):
|
||||||
print(name, c.name)
|
print(name, c.name)
|
||||||
self.assertCountEqual(out, clusters_from_files[c.name])
|
self.assertCountEqual(out, clusters_from_files[c.name])
|
||||||
|
|
||||||
|
@unittest.skip("We don't want to enforce it.")
|
||||||
|
def test_save_clusters(self):
|
||||||
|
for cluster_file in glob(os.path.join(self.clusters.root_dir_clusters, '*.json')):
|
||||||
|
with open(cluster_file, 'r') as f:
|
||||||
|
cluster = Cluster(json.load(f))
|
||||||
|
with tempfile.NamedTemporaryFile(suffix='.json') as temp_file:
|
||||||
|
temp_file_no_suffix = temp_file.name[:-5]
|
||||||
|
cluster.save(temp_file_no_suffix)
|
||||||
|
self.assertTrue(filecmp.cmp(cluster_file, temp_file.name), msg=f"{cluster_file} different when saving using Cluster.save(). Maybe a sorting issue?")
|
||||||
|
|
||||||
def test_validate_schema_clusters(self):
|
def test_validate_schema_clusters(self):
|
||||||
self.clusters.validate_with_schema()
|
self.clusters.validate_with_schema()
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
from pymispgalaxies import Galaxies, Clusters, Cluster
|
from pymispgalaxies import Galaxies, Clusters, Cluster, ClusterValue
|
||||||
|
|
||||||
|
|
||||||
class TestPyMISPGalaxiesApi(unittest.TestCase):
|
class TestPyMISPGalaxiesApi(unittest.TestCase):
|
||||||
|
@ -21,3 +21,99 @@ class TestPyMISPGalaxiesApi(unittest.TestCase):
|
||||||
|
|
||||||
with self.assertRaises(KeyError):
|
with self.assertRaises(KeyError):
|
||||||
cluster.get_by_external_id('XXXXXX')
|
cluster.get_by_external_id('XXXXXX')
|
||||||
|
|
||||||
|
def test_merge_cv(self):
|
||||||
|
cv_1 = ClusterValue({
|
||||||
|
'uuid': '1234',
|
||||||
|
'value': 'old value',
|
||||||
|
'description': 'old description',
|
||||||
|
'related': [
|
||||||
|
{
|
||||||
|
'dest-uuid': '1',
|
||||||
|
'type': 'subtechnique-of'
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'dest-uuid': '2',
|
||||||
|
'type': 'old-type'
|
||||||
|
}
|
||||||
|
]
|
||||||
|
})
|
||||||
|
|
||||||
|
cv_2 = ClusterValue({
|
||||||
|
'uuid': '1234',
|
||||||
|
'value': 'new value',
|
||||||
|
'description': 'new description',
|
||||||
|
'related': [
|
||||||
|
{
|
||||||
|
'dest-uuid': '2',
|
||||||
|
'type': 'new-type'
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'dest-uuid': '3',
|
||||||
|
'type': 'similar-to'
|
||||||
|
}
|
||||||
|
]
|
||||||
|
})
|
||||||
|
|
||||||
|
cv_1.merge(cv_2)
|
||||||
|
self.assertEqual(cv_1.value, 'new value')
|
||||||
|
self.assertEqual(cv_1.description, 'new description')
|
||||||
|
for rel in cv_1.related:
|
||||||
|
if rel['dest-uuid'] == '1':
|
||||||
|
self.assertEqual(rel['type'], 'subtechnique-of')
|
||||||
|
elif rel['dest-uuid'] == '2':
|
||||||
|
self.assertEqual(rel['type'], 'new-type')
|
||||||
|
elif rel['dest-uuid'] == '3':
|
||||||
|
self.assertEqual(rel['type'], 'similar-to')
|
||||||
|
else:
|
||||||
|
self.fail(f"Unexpected related: {rel}")
|
||||||
|
|
||||||
|
def test_cluster_has_changed(self):
|
||||||
|
cluster = Cluster(cluster='backdoor')
|
||||||
|
cv = cluster.get('WellMess')
|
||||||
|
self.assertFalse(cluster.has_changed())
|
||||||
|
|
||||||
|
cv.description = 'new description'
|
||||||
|
self.assertTrue(cluster.has_changed())
|
||||||
|
|
||||||
|
def test_galaxy_has_changed(self):
|
||||||
|
galaxy = self.galaxies.get('backdoor')
|
||||||
|
self.assertFalse(galaxy.has_changed())
|
||||||
|
|
||||||
|
galaxy.description = 'new description'
|
||||||
|
self.assertTrue(galaxy.has_changed())
|
||||||
|
|
||||||
|
def test_clustervalue_sort_related(self):
|
||||||
|
cv = ClusterValue({'value': 'test'})
|
||||||
|
item_1 = {
|
||||||
|
'dest-uuid': '1',
|
||||||
|
'type': 'subtechnique-of'
|
||||||
|
}
|
||||||
|
item_2 = {
|
||||||
|
'dest-uuid': '2',
|
||||||
|
'type': 'similar-to'
|
||||||
|
}
|
||||||
|
cv.related = []
|
||||||
|
cv.related.append(item_2)
|
||||||
|
cv.related.append(item_1)
|
||||||
|
self.assertListEqual(cv.related, [item_2, item_1])
|
||||||
|
d = cv.to_dict()
|
||||||
|
self.assertListEqual(d['related'], [item_1, item_2])
|
||||||
|
|
||||||
|
def test_cluster_sort_synonyms(self):
|
||||||
|
cv = ClusterValue({
|
||||||
|
'value': 'test',
|
||||||
|
'meta': {
|
||||||
|
'synonyms': ['b', 'a', 'c']
|
||||||
|
}})
|
||||||
|
d = cv.to_dict()
|
||||||
|
self.assertListEqual(d['meta']['synonyms'], ['a', 'b', 'c'])
|
||||||
|
|
||||||
|
def test_cluster_sort_additional_property(self):
|
||||||
|
cv = ClusterValue({
|
||||||
|
'value': 'test',
|
||||||
|
'meta': {
|
||||||
|
'hello_world': ['b', 'a', 'c']
|
||||||
|
}})
|
||||||
|
d = cv.to_dict()
|
||||||
|
self.assertListEqual(d['meta']['hello_world'], ['a', 'b', 'c'])
|
||||||
|
|
Loading…
Reference in New Issue