PyMISPGalaxies/pymispgalaxies/api.py

345 lines
12 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
from json import JSONEncoder
import os
import sys
from collections.abc import Mapping
from glob import glob
import re
from typing import List, Dict, Optional, Any, Tuple, Iterator, overload, Union
if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal
try:
import jsonschema # type: ignore
HAS_JSONSCHEMA = True
except ImportError:
HAS_JSONSCHEMA = False
class EncodeGalaxies(JSONEncoder):
def default(self, obj: Any) -> Dict[str, str]:
if isinstance(obj, Galaxy):
return obj.to_dict()
return JSONEncoder.default(self, obj)
class EncodeClusters(JSONEncoder):
def default(self, obj: Any) -> Dict[str, str]:
if isinstance(obj, (Cluster, ClusterValue, ClusterValueMeta)):
return obj.to_dict()
return JSONEncoder.default(self, obj)
class PyMISPGalaxiesError(Exception):
def __init__(self, message: str):
super(PyMISPGalaxiesError, self).__init__(message)
self.message = message
class UnableToRevertMachinetag(PyMISPGalaxiesError):
pass
class Galaxy():
def __init__(self, galaxy: Dict[str, str]):
self.galaxy = galaxy
self.type = self.galaxy['type']
self.name = self.galaxy['name']
self.icon = self.galaxy['icon']
self.description = self.galaxy['description']
self.version = self.galaxy['version']
self.uuid = self.galaxy['uuid']
self.namespace = self.galaxy.pop('namespace', None)
self.kill_chain_order = self.galaxy.pop('kill_chain_order', None)
def to_json(self) -> str:
return json.dumps(self, cls=EncodeGalaxies)
def to_dict(self) -> Dict[str, str]:
to_return = {'type': self.type, 'name': self.name, 'description': self.description,
'version': self.version, 'uuid': self.uuid, 'icon': self.icon}
if self.namespace:
to_return['namespace'] = self.namespace
if self.kill_chain_order:
to_return['kill_chain_order'] = self.kill_chain_order
return to_return
class Galaxies(Mapping): # type: ignore
def __init__(self, galaxies: List[Dict[str, str]]=[]):
if not galaxies:
galaxies = []
self.root_dir_galaxies = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
'data', 'misp-galaxy', 'galaxies')
for galaxy_file in glob(os.path.join(self.root_dir_galaxies, '*.json')):
with open(galaxy_file, 'r') as f:
galaxies.append(json.load(f))
self.galaxies = {}
for galaxy in galaxies:
self.galaxies[galaxy['name']] = Galaxy(galaxy)
def validate_with_schema(self) -> None:
if not HAS_JSONSCHEMA:
raise ImportError('jsonschema is required: pip install jsonschema')
schema = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
'data', 'misp-galaxy', 'schema_galaxies.json')
with open(schema, 'r') as f:
loaded_schema = json.load(f)
for g in self.galaxies.values():
jsonschema.validate(g.galaxy, loaded_schema)
def __getitem__(self, name: str) -> Galaxy:
return self.galaxies[name]
def __iter__(self) -> Iterator[str]:
return iter(self.galaxies)
def __len__(self) -> int:
return len(self.galaxies)
class ClusterValueMeta():
def __init__(self, m: Dict[str, str]):
self.type = m.pop('type', None)
self.complexity = m.pop('complexity', None)
self.effectiveness = m.pop('effectiveness', None)
self.country = m.pop('country', None)
self.possible_issues = m.pop('possible_issues', None)
self.colour = m.pop('colour', None)
self.motive = m.pop('motive', None)
self.impact = m.pop('impact', None)
self.refs = m.pop('refs', None)
self.synonyms = m.pop('synonyms', None)
self.derivated_from = m.pop('derivated_from', None)
self.status = m.pop('status', None)
self.date = m.pop('date', None)
self.encryption = m.pop('encryption', None)
self.extensions = m.pop('extensions', None)
self.ransomnotes = m.pop('ransomnotes', None)
# NOTE: meta can have aditional properties. We only load the ones
# defined on the schema
self.additional_properties = m
def to_json(self) -> str:
return json.dumps(self, cls=EncodeClusters)
def to_dict(self) -> Dict[str, str]:
to_return = {}
if self.type:
to_return['type'] = self.type
if self.complexity:
to_return['complexity'] = self.complexity
if self.effectiveness:
to_return['effectiveness'] = self.effectiveness
if self.country:
to_return['country'] = self.country
if self.possible_issues:
to_return['possible_issues'] = self.possible_issues
if self.colour:
to_return['colour'] = self.colour
if self.motive:
to_return['motive'] = self.motive
if self.impact:
to_return['impact'] = self.impact
if self.refs:
to_return['refs'] = self.refs
if self.synonyms:
to_return['synonyms'] = self.synonyms
if self.derivated_from:
to_return['derivated_from'] = self.derivated_from
if self.status:
to_return['status'] = self.status
if self.date:
to_return['date'] = self.date
if self.encryption:
to_return['encryption'] = self.encryption
if self.extensions:
to_return['extensions'] = self.extensions
if self.ransomnotes:
to_return['ransomnotes'] = self.ransomnotes
if self.additional_properties:
to_return.update(self.additional_properties)
return to_return
class ClusterValue():
def __init__(self, v: Dict[str, Any]):
if not v['value']:
raise PyMISPGalaxiesError("Invalid cluster (no value): {}".format(v))
self.uuid = v.get('uuid', None)
self.value = v['value']
self.description = v.get('description')
self.meta = self.__init_meta(v.get('meta'))
self.searchable = [self.value]
if self.uuid:
self.searchable.append(self.uuid)
if self.meta and self.meta.synonyms:
self.searchable += self.meta.synonyms
self.searchable = list(set(self.searchable))
def __init_meta(self, m: Optional[Dict[str, str]]) -> Optional[ClusterValueMeta]:
if not m:
return None
return ClusterValueMeta(m)
def to_json(self) -> str:
return json.dumps(self, cls=EncodeClusters)
def to_dict(self) -> Dict[str, Any]:
to_return = {'value': self.value}
if self.uuid:
to_return['uuid'] = self.uuid
if self.description:
to_return['description'] = self.description
if self.meta:
to_return['meta'] = self.meta
return to_return
class Cluster(Mapping): # type: ignore
def __init__(self, cluster: Dict[str, Any], skip_duplicates: bool=False):
self.cluster = cluster
self.name = self.cluster['name']
self.type = self.cluster['type']
self.source = self.cluster['source']
self.authors = self.cluster['authors']
self.description = self.cluster['description']
self.uuid = self.cluster['uuid']
self.version = self.cluster['version']
self.category = self.cluster['category']
self.cluster_values = {}
self.duplicates = []
for value in self.cluster['values']:
new_cluster_value = ClusterValue(value)
if self.get(new_cluster_value.value):
if skip_duplicates:
self.duplicates.append((self.name, new_cluster_value.value))
else:
raise PyMISPGalaxiesError("Duplicate value ({}) in cluster: {}".format(new_cluster_value.value, self.name))
self.cluster_values[new_cluster_value.value] = new_cluster_value
@overload
def search(self, query: str, return_tags: Literal[False]=False) -> List[ClusterValue]:
...
@overload
def search(self, query: str, return_tags: Literal[True]) -> List[str]:
...
@overload
def search(self, query: str, return_tags: bool) -> Union[List[ClusterValue], List[str]]:
...
def search(self, query: str, return_tags: bool=False) -> Union[List[ClusterValue], List[str]]:
matching = []
for v in self.values():
if [s for s in v.searchable if query.lower() in s.lower()]:
if return_tags:
matching.append('misp-galaxy:{}="{}"'.format(self.type, v.value))
pass
else:
matching.append(v)
return matching
def machinetags(self) -> List[str]:
to_return = []
for v in self.values():
to_return.append('misp-galaxy:{}="{}"'.format(self.type, v.value))
return to_return
def __str__(self) -> str:
return '\n'.join(self.machinetags())
def __getitem__(self, name: str) -> ClusterValue:
return self.cluster_values[name]
def __len__(self) -> int:
return len(self.cluster_values)
def __iter__(self) -> Iterator[str]:
return iter(self.cluster_values)
def to_json(self) -> str:
return json.dumps(self, cls=EncodeClusters)
def to_dict(self) -> Dict[str, Any]:
to_return = {'name': self.name, 'type': self.type, 'source': self.source,
'authors': self.authors, 'description': self.description,
'uuid': self.uuid, 'version': self.version, 'category': self.category,
'values': []}
to_return['values'] = [v for v in self.values()]
return to_return
class Clusters(Mapping): # type: ignore
def __init__(self, clusters: List[Dict[str, str]]=[], skip_duplicates: bool=False):
if not clusters:
clusters = []
self.root_dir_clusters = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
'data', 'misp-galaxy', 'clusters')
for cluster_file in glob(os.path.join(self.root_dir_clusters, '*.json')):
with open(cluster_file, 'r') as f:
clusters.append(json.load(f))
self.clusters = {}
for cluster in clusters:
self.clusters[cluster['type']] = Cluster(cluster, skip_duplicates=skip_duplicates)
def validate_with_schema(self) -> None:
if not HAS_JSONSCHEMA:
raise ImportError('jsonschema is required: pip install jsonschema')
schema = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
'data', 'misp-galaxy', 'schema_clusters.json')
with open(schema, 'r') as f:
loaded_schema = json.load(f)
for c in self.values():
jsonschema.validate(c.cluster, loaded_schema)
def all_machinetags(self) -> List[str]:
return [cluster.machinetags() for cluster in self.values()]
def revert_machinetag(self, machinetag: str) -> Tuple[Cluster, ClusterValue]:
try:
_, cluster_type, cluster_value = re.findall('^([^:]*):([^=]*)="([^"]*)"$', machinetag)[0]
cluster: Cluster = self[cluster_type]
value: ClusterValue = cluster[cluster_value]
return cluster, value
except Exception:
raise UnableToRevertMachinetag('The machinetag {} could not be found.'.format(machinetag))
def search(self, query: str, return_tags: bool=False) -> List[Tuple[Cluster, str]]:
to_return = []
for cluster in self.values():
values = cluster.search(query, return_tags)
if not values:
continue
to_return.append((cluster, values))
return to_return
def __getitem__(self, name: str) -> Cluster:
return self.clusters[name]
def __iter__(self) -> Iterator[str]:
return iter(self.clusters)
def __len__(self) -> int:
return len(self.clusters)
def __str__(self) -> str:
to_print = ''
for cluster in self.values():
to_print += '{}\n\n'.format(cluster)
return to_print