PyMISPGalaxies/pymispgalaxies/api.py

345 lines
12 KiB
Python
Raw Normal View History

2018-10-19 12:35:36 +02:00
#!/usr/bin/env python3
2017-07-25 18:04:15 +02:00
# -*- coding: utf-8 -*-
import json
2017-07-25 18:21:23 +02:00
from json import JSONEncoder
2017-07-25 18:04:15 +02:00
import os
import sys
from collections.abc import Mapping
2017-07-25 18:04:15 +02:00
from glob import glob
2017-07-25 18:57:18 +02:00
import re
from typing import List, Dict, Optional, Any, Tuple, Iterator, overload, Union
if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal
2017-07-25 18:04:15 +02:00
try:
import jsonschema # type: ignore
2017-07-25 18:04:15 +02:00
HAS_JSONSCHEMA = True
except ImportError:
HAS_JSONSCHEMA = False
2017-07-25 18:21:23 +02:00
class EncodeGalaxies(JSONEncoder):
2021-05-13 20:43:34 +02:00
def default(self, obj: Any) -> Dict[str, str]:
2017-11-01 19:20:09 +01:00
if isinstance(obj, Galaxy):
return obj.to_dict()
return JSONEncoder.default(self, obj)
2017-11-01 19:20:09 +01:00
class EncodeClusters(JSONEncoder):
2021-05-13 20:43:34 +02:00
def default(self, obj: Any) -> Dict[str, str]:
2017-11-01 19:20:09 +01:00
if isinstance(obj, (Cluster, ClusterValue, ClusterValueMeta)):
return obj.to_dict()
return JSONEncoder.default(self, obj)
2017-07-25 18:21:23 +02:00
2017-07-25 20:27:58 +02:00
class PyMISPGalaxiesError(Exception):
def __init__(self, message: str):
2017-07-25 20:27:58 +02:00
super(PyMISPGalaxiesError, self).__init__(message)
self.message = message
class UnableToRevertMachinetag(PyMISPGalaxiesError):
pass
2017-07-25 18:04:15 +02:00
class Galaxy():
def __init__(self, galaxy: Dict[str, str]):
2017-07-25 18:04:15 +02:00
self.galaxy = galaxy
self.type = self.galaxy['type']
self.name = self.galaxy['name']
2017-10-09 16:57:25 +02:00
self.icon = self.galaxy['icon']
2017-07-25 18:04:15 +02:00
self.description = self.galaxy['description']
self.version = self.galaxy['version']
self.uuid = self.galaxy['uuid']
self.namespace = self.galaxy.pop('namespace', None)
2019-03-09 06:31:03 +01:00
self.kill_chain_order = self.galaxy.pop('kill_chain_order', None)
2017-07-25 18:04:15 +02:00
def to_json(self) -> str:
2017-11-01 19:26:45 +01:00
return json.dumps(self, cls=EncodeGalaxies)
def to_dict(self) -> Dict[str, str]:
to_return = {'type': self.type, 'name': self.name, 'description': self.description,
'version': self.version, 'uuid': self.uuid, 'icon': self.icon}
if self.namespace:
to_return['namespace'] = self.namespace
2019-03-09 06:31:03 +01:00
if self.kill_chain_order:
to_return['kill_chain_order'] = self.kill_chain_order
return to_return
2017-07-25 18:04:15 +02:00
2021-05-13 20:43:34 +02:00
class Galaxies(Mapping): # type: ignore
2017-07-25 18:04:15 +02:00
2024-06-18 07:44:47 +02:00
def __init__(self, galaxies: List[Dict[str, str]] = []):
if not galaxies:
galaxies = []
2021-12-28 17:12:40 +01:00
self.root_dir_galaxies = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
'data', 'misp-galaxy', 'galaxies')
for galaxy_file in glob(os.path.join(self.root_dir_galaxies, '*.json')):
with open(galaxy_file, 'r') as f:
galaxies.append(json.load(f))
2017-07-25 18:04:15 +02:00
self.galaxies = {}
for galaxy in galaxies:
2017-07-25 18:04:15 +02:00
self.galaxies[galaxy['name']] = Galaxy(galaxy)
2021-05-13 20:43:34 +02:00
def validate_with_schema(self) -> None:
2017-07-25 18:04:15 +02:00
if not HAS_JSONSCHEMA:
raise ImportError('jsonschema is required: pip install jsonschema')
2021-12-28 17:12:40 +01:00
schema = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
2017-07-25 18:04:15 +02:00
'data', 'misp-galaxy', 'schema_galaxies.json')
with open(schema, 'r') as f:
loaded_schema = json.load(f)
for g in self.galaxies.values():
jsonschema.validate(g.galaxy, loaded_schema)
2021-05-13 20:43:34 +02:00
def __getitem__(self, name: str) -> Galaxy:
2017-07-25 18:04:15 +02:00
return self.galaxies[name]
2021-05-13 20:43:34 +02:00
def __iter__(self) -> Iterator[str]:
2017-07-25 18:04:15 +02:00
return iter(self.galaxies)
2021-05-13 20:43:34 +02:00
def __len__(self) -> int:
2017-07-25 18:04:15 +02:00
return len(self.galaxies)
class ClusterValueMeta():
def __init__(self, m: Dict[str, str]):
2017-07-25 18:04:15 +02:00
self.type = m.pop('type', None)
self.complexity = m.pop('complexity', None)
self.effectiveness = m.pop('effectiveness', None)
self.country = m.pop('country', None)
self.possible_issues = m.pop('possible_issues', None)
self.colour = m.pop('colour', None)
self.motive = m.pop('motive', None)
self.impact = m.pop('impact', None)
self.refs = m.pop('refs', None)
self.synonyms = m.pop('synonyms', None)
self.derivated_from = m.pop('derivated_from', None)
self.status = m.pop('status', None)
self.date = m.pop('date', None)
self.encryption = m.pop('encryption', None)
self.extensions = m.pop('extensions', None)
self.ransomnotes = m.pop('ransomnotes', None)
# NOTE: meta can have aditional properties. We only load the ones
# defined on the schema
self.additional_properties = m
def to_json(self) -> str:
2017-11-01 19:26:45 +01:00
return json.dumps(self, cls=EncodeClusters)
def to_dict(self) -> Dict[str, str]:
2017-07-25 18:04:15 +02:00
to_return = {}
if self.type:
to_return['type'] = self.type
if self.complexity:
to_return['complexity'] = self.complexity
if self.effectiveness:
to_return['effectiveness'] = self.effectiveness
if self.country:
to_return['country'] = self.country
if self.possible_issues:
to_return['possible_issues'] = self.possible_issues
if self.colour:
to_return['colour'] = self.colour
if self.motive:
to_return['motive'] = self.motive
if self.impact:
to_return['impact'] = self.impact
if self.refs:
to_return['refs'] = self.refs
if self.synonyms:
to_return['synonyms'] = self.synonyms
if self.derivated_from:
to_return['derivated_from'] = self.derivated_from
if self.status:
to_return['status'] = self.status
if self.date:
to_return['date'] = self.date
if self.encryption:
to_return['encryption'] = self.encryption
if self.extensions:
to_return['extensions'] = self.extensions
if self.ransomnotes:
to_return['ransomnotes'] = self.ransomnotes
if self.additional_properties:
to_return.update(self.additional_properties)
return to_return
class ClusterValue():
def __init__(self, v: Dict[str, Any]):
2017-07-25 20:27:58 +02:00
if not v['value']:
raise PyMISPGalaxiesError("Invalid cluster (no value): {}".format(v))
2018-04-05 11:36:24 +02:00
self.uuid = v.get('uuid', None)
2017-07-25 18:04:15 +02:00
self.value = v['value']
self.description = v.get('description')
self.meta = self.__init_meta(v.get('meta'))
self.searchable = [self.value]
2018-04-05 11:36:24 +02:00
if self.uuid:
self.searchable.append(self.uuid)
if self.meta and self.meta.synonyms:
self.searchable += self.meta.synonyms
2018-07-06 15:28:53 +02:00
self.searchable = list(set(self.searchable))
2017-07-25 18:04:15 +02:00
def __init_meta(self, m: Optional[Dict[str, str]]) -> Optional[ClusterValueMeta]:
2017-07-25 18:04:15 +02:00
if not m:
return None
return ClusterValueMeta(m)
def to_json(self) -> str:
2017-11-01 19:26:45 +01:00
return json.dumps(self, cls=EncodeClusters)
2021-05-13 20:43:34 +02:00
def to_dict(self) -> Dict[str, Any]:
2017-07-25 18:04:15 +02:00
to_return = {'value': self.value}
2018-04-05 11:36:24 +02:00
if self.uuid:
to_return['uuid'] = self.uuid
2017-07-25 18:04:15 +02:00
if self.description:
to_return['description'] = self.description
if self.meta:
2017-11-01 19:20:09 +01:00
to_return['meta'] = self.meta
2017-07-25 18:04:15 +02:00
return to_return
2021-05-13 20:43:34 +02:00
class Cluster(Mapping): # type: ignore
2017-07-25 18:04:15 +02:00
2024-06-18 07:44:47 +02:00
def __init__(self, cluster: Dict[str, Any], skip_duplicates: bool = False):
2017-07-25 18:04:15 +02:00
self.cluster = cluster
self.name = self.cluster['name']
self.type = self.cluster['type']
self.source = self.cluster['source']
self.authors = self.cluster['authors']
self.description = self.cluster['description']
self.uuid = self.cluster['uuid']
self.version = self.cluster['version']
2019-01-22 16:14:55 +01:00
self.category = self.cluster['category']
2017-07-26 16:37:13 +02:00
self.cluster_values = {}
self.duplicates = []
2017-07-25 18:04:15 +02:00
for value in self.cluster['values']:
new_cluster_value = ClusterValue(value)
2017-07-26 17:01:26 +02:00
if self.get(new_cluster_value.value):
if skip_duplicates:
self.duplicates.append((self.name, new_cluster_value.value))
else:
raise PyMISPGalaxiesError("Duplicate value ({}) in cluster: {}".format(new_cluster_value.value, self.name))
2017-07-26 16:37:13 +02:00
self.cluster_values[new_cluster_value.value] = new_cluster_value
@overload
2024-06-18 07:44:47 +02:00
def search(self, query: str, return_tags: Literal[False] = False) -> List[ClusterValue]:
2021-12-28 17:12:40 +01:00
...
@overload
2021-12-28 17:12:40 +01:00
def search(self, query: str, return_tags: Literal[True]) -> List[str]:
...
@overload
2021-12-28 17:12:40 +01:00
def search(self, query: str, return_tags: bool) -> Union[List[ClusterValue], List[str]]:
...
2024-06-18 07:44:47 +02:00
def search(self, query: str, return_tags: bool = False) -> Union[List[ClusterValue], List[str]]:
matching = []
2017-07-26 16:37:13 +02:00
for v in self.values():
if [s for s in v.searchable if query.lower() in s.lower()]:
2018-07-06 15:28:53 +02:00
if return_tags:
matching.append('misp-galaxy:{}="{}"'.format(self.type, v.value))
pass
else:
matching.append(v)
return matching
2017-07-25 18:04:15 +02:00
def machinetags(self) -> List[str]:
2017-07-25 18:43:49 +02:00
to_return = []
2017-07-26 16:37:13 +02:00
for v in self.values():
2017-07-25 18:43:49 +02:00
to_return.append('misp-galaxy:{}="{}"'.format(self.type, v.value))
return to_return
2021-05-13 20:43:34 +02:00
def __str__(self) -> str:
2017-07-25 18:43:49 +02:00
return '\n'.join(self.machinetags())
2021-05-13 20:43:34 +02:00
def __getitem__(self, name: str) -> ClusterValue:
2017-07-26 16:37:13 +02:00
return self.cluster_values[name]
2021-05-13 20:43:34 +02:00
def __len__(self) -> int:
2017-07-26 16:37:13 +02:00
return len(self.cluster_values)
2021-05-13 20:43:34 +02:00
def __iter__(self) -> Iterator[str]:
2017-07-26 16:37:13 +02:00
return iter(self.cluster_values)
def to_json(self) -> str:
2017-11-01 19:26:45 +01:00
return json.dumps(self, cls=EncodeClusters)
2021-05-13 20:43:34 +02:00
def to_dict(self) -> Dict[str, Any]:
2017-07-25 18:04:15 +02:00
to_return = {'name': self.name, 'type': self.type, 'source': self.source,
'authors': self.authors, 'description': self.description,
2019-01-22 16:14:55 +01:00
'uuid': self.uuid, 'version': self.version, 'category': self.category,
'values': []}
2017-11-01 19:20:09 +01:00
to_return['values'] = [v for v in self.values()]
2017-07-25 18:04:15 +02:00
return to_return
2021-05-13 20:43:34 +02:00
class Clusters(Mapping): # type: ignore
2017-07-25 18:04:15 +02:00
2024-06-18 07:44:47 +02:00
def __init__(self, clusters: List[Dict[str, str]] = [], skip_duplicates: bool = False):
if not clusters:
clusters = []
2021-12-28 17:12:40 +01:00
self.root_dir_clusters = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
'data', 'misp-galaxy', 'clusters')
for cluster_file in glob(os.path.join(self.root_dir_clusters, '*.json')):
with open(cluster_file, 'r') as f:
clusters.append(json.load(f))
2017-07-25 18:04:15 +02:00
self.clusters = {}
for cluster in clusters:
self.clusters[cluster['type']] = Cluster(cluster, skip_duplicates=skip_duplicates)
2017-07-25 18:04:15 +02:00
2021-05-13 20:43:34 +02:00
def validate_with_schema(self) -> None:
2017-07-25 18:04:15 +02:00
if not HAS_JSONSCHEMA:
raise ImportError('jsonschema is required: pip install jsonschema')
2021-12-28 17:12:40 +01:00
schema = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
2017-07-25 18:04:15 +02:00
'data', 'misp-galaxy', 'schema_clusters.json')
with open(schema, 'r') as f:
loaded_schema = json.load(f)
2017-07-26 17:01:26 +02:00
for c in self.values():
2017-07-25 18:04:15 +02:00
jsonschema.validate(c.cluster, loaded_schema)
def all_machinetags(self) -> List[str]:
2017-07-26 17:01:26 +02:00
return [cluster.machinetags() for cluster in self.values()]
2017-07-25 18:43:49 +02:00
2021-05-13 20:43:34 +02:00
def revert_machinetag(self, machinetag: str) -> Tuple[Cluster, ClusterValue]:
2017-07-26 16:11:46 +02:00
try:
2017-07-26 17:06:44 +02:00
_, cluster_type, cluster_value = re.findall('^([^:]*):([^=]*)="([^"]*)"$', machinetag)[0]
cluster: Cluster = self[cluster_type]
value: ClusterValue = cluster[cluster_value]
2017-07-26 16:11:46 +02:00
return cluster, value
except Exception:
2017-07-26 16:11:46 +02:00
raise UnableToRevertMachinetag('The machinetag {} could not be found.'.format(machinetag))
2017-07-25 18:57:18 +02:00
2024-06-18 07:44:47 +02:00
def search(self, query: str, return_tags: bool = False) -> List[Tuple[Cluster, str]]:
to_return = []
2017-07-26 17:01:26 +02:00
for cluster in self.values():
2018-07-06 15:28:53 +02:00
values = cluster.search(query, return_tags)
if not values:
continue
to_return.append((cluster, values))
return to_return
2021-05-13 20:43:34 +02:00
def __getitem__(self, name: str) -> Cluster:
2017-07-25 18:04:15 +02:00
return self.clusters[name]
2021-05-13 20:43:34 +02:00
def __iter__(self) -> Iterator[str]:
2017-07-25 18:04:15 +02:00
return iter(self.clusters)
2021-05-13 20:43:34 +02:00
def __len__(self) -> int:
2017-07-25 18:04:15 +02:00
return len(self.clusters)
2021-05-13 20:43:34 +02:00
def __str__(self) -> str:
2017-07-25 18:04:15 +02:00
to_print = ''
2017-07-26 17:01:26 +02:00
for cluster in self.values():
2017-07-25 18:43:49 +02:00
to_print += '{}\n\n'.format(cluster)
2017-07-25 18:04:15 +02:00
return to_print