chg: [Galaxy, Cluster]: load, save, get_kill_chain_tactics

pull/25/head
Christophe Vandeplas 2024-06-18 14:32:09 +02:00
parent 9d5c6a1b5d
commit 1ab1976343
No known key found for this signature in database
GPG Key ID: BDC48619FFDC5A5B
1 changed files with 91 additions and 13 deletions

View File

@ -62,6 +62,17 @@ class Galaxy():
kill_chain_order (str, optional): The kill chain order of the galaxy. kill_chain_order (str, optional): The kill chain order of the galaxy.
""" """
@overload
def __init__(self, galaxy: str):
"""
Initializes a Galaxy object from an existing galaxy.
Args:
galaxy (str): The name of the existing galaxy to load from the data folder.
"""
...
@overload
def __init__(self, galaxy: Dict[str, str]): def __init__(self, galaxy: Dict[str, str]):
""" """
Initializes a new instance of the Galaxy class. Initializes a new instance of the Galaxy class.
@ -69,15 +80,37 @@ class Galaxy():
Args: Args:
galaxy (Dict[str, str]): The dictionary containing the galaxy data. galaxy (Dict[str, str]): The dictionary containing the galaxy data.
""" """
self.galaxy = galaxy ...
self.type = self.galaxy['type']
self.name = self.galaxy['name'] def __init__(self, galaxy):
self.icon = self.galaxy['icon'] if isinstance(galaxy, str):
self.description = self.galaxy['description'] root_dir_galaxies = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), 'data', 'misp-galaxy', 'galaxies')
self.version = self.galaxy['version'] galaxy_file = os.path.join(root_dir_galaxies, f"{galaxy}.json")
self.uuid = self.galaxy['uuid'] with open(galaxy_file, 'r') as f:
self.namespace = self.galaxy.pop('namespace', None) self.__init__(json.load(f))
self.kill_chain_order = self.galaxy.pop('kill_chain_order', None) else:
self.galaxy = galaxy
self.type = self.galaxy['type']
self.name = self.galaxy['name']
self.icon = self.galaxy['icon']
self.description = self.galaxy['description']
self.version = self.galaxy['version']
self.uuid = self.galaxy['uuid']
self.namespace = self.galaxy.pop('namespace', None)
self.kill_chain_order = self.galaxy.pop('kill_chain_order', None)
def save(self, name: str) -> None:
"""
Saves the galaxy to a file <name>.json
Args:
name (str): The name of the file to save the galaxy to.
"""
root_dir_galaxies = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), 'data', 'misp-galaxy', 'galaxies')
galaxy_file = os.path.join(root_dir_galaxies, f"{name}.json")
with open(galaxy_file, 'w') as f:
json.dump(self, f, cls=EncodeGalaxies, indent=2, sort_keys=True, ensure_ascii=False)
f.write('\n') # needed for the beauty and to be compliant with jq_all_the_things
def to_json(self) -> str: def to_json(self) -> str:
""" """
@ -292,6 +325,12 @@ class ClusterValue():
self.value = v['value'] self.value = v['value']
self.description = v.get('description') self.description = v.get('description')
self.meta = self.__init_meta(v.get('meta')) self.meta = self.__init_meta(v.get('meta'))
self.related = []
try:
# LATER convert related to a class?
self.related = v['related']
except KeyError:
pass
self.searchable = [self.value] self.searchable = [self.value]
if self.uuid: if self.uuid:
self.searchable.append(self.uuid) self.searchable.append(self.uuid)
@ -336,6 +375,8 @@ class ClusterValue():
to_return['description'] = self.description to_return['description'] = self.description
if self.meta: if self.meta:
to_return['meta'] = self.meta to_return['meta'] = self.meta
if self.related:
to_return['related'] = self.related
return to_return return to_return
@ -362,6 +403,7 @@ class Cluster(Mapping): # type: ignore
search(self, query: str, return_tags: bool = False) -> Union[List[ClusterValue], List[str]]: Searches for values in the cluster that match the given query. search(self, query: str, return_tags: bool = False) -> Union[List[ClusterValue], List[str]]: Searches for values in the cluster that match the given query.
machinetags(self) -> List[str]: Returns a list of machine tags for the cluster. machinetags(self) -> List[str]: Returns a list of machine tags for the cluster.
get_by_external_id(self, external_id: str) -> ClusterValue: Returns the cluster value with the specified external ID. get_by_external_id(self, external_id: str) -> ClusterValue: Returns the cluster value with the specified external ID.
save(self, name:str) -> None: Saves the cluster to a file <name>.json
__str__(self) -> str: Returns a string representation of the cluster. __str__(self) -> str: Returns a string representation of the cluster.
__getitem__(self, name: str) -> ClusterValue: Returns the cluster value with the specified name. __getitem__(self, name: str) -> ClusterValue: Returns the cluster value with the specified name.
__len__(self) -> int: Returns the number of cluster values in the cluster. __len__(self) -> int: Returns the number of cluster values in the cluster.
@ -412,7 +454,7 @@ class Cluster(Mapping): # type: ignore
try: try:
for value in self.cluster['values']: for value in self.cluster['values']:
new_cluster_value = ClusterValue(value) new_cluster_value = ClusterValue(value)
self.add(new_cluster_value, skip_duplicates) self.append(new_cluster_value, skip_duplicates)
except KeyError: except KeyError:
pass pass
@ -478,15 +520,38 @@ class Cluster(Mapping): # type: ignore
return value return value
raise KeyError('No value with external_id: {}'.format(external_id)) raise KeyError('No value with external_id: {}'.format(external_id))
def get_kill_chain_tactics(self) -> dict:
"""
Returns the sorted kill chain tactics associated with the cluster.
Returns:
List[str]: A list of kill chain tactics.
"""
items = set()
for v in self.cluster_values.values():
if v.meta and v.meta.additional_properties and v.meta.additional_properties.get('kill_chain'):
for item in v.meta.additional_properties.get('kill_chain'):
items.add(item)
result = {}
for item in items:
key, value = item.split(':')
if key not in result:
result[key] = []
result[key].append(value)
for key in result.keys():
result[key] = sorted(result[key])
return result
@overload @overload
def add(self, cv: dict, skip_duplicates: bool = False) -> None: def append(self, cv: dict, skip_duplicates: bool = False) -> None:
... ...
@overload @overload
def add(self, cv: ClusterValue, skip_duplicates: bool = False) -> None: def append(self, cv: ClusterValue, skip_duplicates: bool = False) -> None:
... ...
def add(self, cv, skip_duplicates: bool = False) -> None: def append(self, cv, skip_duplicates: bool = False) -> None:
""" """
Adds a cluster value to the cluster. Adds a cluster value to the cluster.
""" """
@ -499,6 +564,19 @@ class Cluster(Mapping): # type: ignore
raise PyMISPGalaxiesError("Duplicate value ({}) in cluster: {}".format(cv.value, self.name)) raise PyMISPGalaxiesError("Duplicate value ({}) in cluster: {}".format(cv.value, self.name))
self.cluster_values[cv.value] = cv self.cluster_values[cv.value] = cv
def save(self, name: str) -> None:
"""
Saves the cluster to a file <name>.json
Args:
name (str): The name of the file to save the cluster to.
"""
root_dir_clusters = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), 'data', 'misp-galaxy', 'clusters')
cluster_file = os.path.join(root_dir_clusters, f"{name}.json")
with open(cluster_file, 'w') as f:
json.dump(self, f, cls=EncodeClusters, indent=2, sort_keys=True, ensure_ascii=False)
f.write('\n') # needed for the beauty and to be compliant with jq_all_the_things
def __str__(self) -> str: def __str__(self) -> str:
""" """
Returns a string representation of the cluster. Returns a string representation of the cluster.