PyTaxonomies/pytaxonomies/api.py

346 lines
12 KiB
Python
Raw Permalink Normal View History

2016-07-25 18:48:08 +02:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import os
2020-11-05 19:20:53 +01:00
from collections import abc
2016-07-28 11:54:14 +02:00
import re
2017-07-25 15:06:37 +02:00
import sys
2020-02-17 19:15:39 +01:00
from pathlib import Path
2020-11-05 19:20:53 +01:00
from typing import Union, Dict, Optional, List, Callable, Any, ValuesView, Iterator, Tuple
2016-07-25 18:48:08 +02:00
try:
import requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
2017-07-25 16:19:34 +02:00
try:
2020-02-17 19:15:39 +01:00
import jsonschema # type: ignore
2017-07-25 16:19:34 +02:00
HAS_JSONSCHEMA = True
except ImportError:
HAS_JSONSCHEMA = False
2016-07-25 18:48:08 +02:00
2020-11-05 19:20:53 +01:00
def taxonomies_json_default(obj: Union['Taxonomy', 'Predicate', 'Entry']) -> Dict[str, Any]:
if isinstance(obj, (Taxonomy, Predicate, Entry)):
return obj.to_dict()
2016-10-13 17:23:10 +02:00
2016-07-25 18:48:08 +02:00
class Entry():
2020-02-17 19:15:39 +01:00
def __init__(self, entry: Optional[Dict[str, str]]=None):
if not entry:
# We're creating a new one
self.expanded = None
self.colour = None
self.description = None
self.numerical_value = None
return
2017-11-01 22:13:24 +01:00
self.value = entry['value']
self.expanded = entry.get('expanded')
self.colour = entry.get('colour')
self.description = entry.get('description')
self.numerical_value = entry.get('numerical_value')
2020-02-17 19:15:39 +01:00
def to_dict(self) -> Dict[str, str]:
2017-11-01 22:13:24 +01:00
to_return = {'value': self.value}
if self.expanded:
to_return['expanded'] = self.expanded
if self.colour:
to_return['colour'] = self.colour
if self.description:
to_return['description'] = self.description
if self.numerical_value is not None:
to_return['numerical_value'] = self.numerical_value
return to_return
2020-02-17 19:15:39 +01:00
def to_json(self) -> str:
2020-11-05 19:20:53 +01:00
return json.dumps(self, default=taxonomies_json_default)
2016-07-25 18:48:08 +02:00
2020-11-05 19:20:53 +01:00
def __str__(self) -> str:
2016-07-26 10:30:25 +02:00
return self.value
2016-07-25 18:48:08 +02:00
2020-11-05 19:20:53 +01:00
class Predicate(abc.Mapping): # type: ignore
2016-07-25 18:48:08 +02:00
2020-02-17 19:15:39 +01:00
def __init__(self, predicate: Optional[Dict[str, str]]=None,
entries: Optional[List[Dict[str, str]]]=None):
2020-11-05 19:20:53 +01:00
if not predicate:
if entries:
raise Exception('Need predicates if entries.')
else:
# We're creating a new one
self.expanded = None
self.description = None
self.colour = None
self.exclusive = None
self.numerical_value = None
self.entries: Dict[str, Entry] = {}
return
2017-11-01 22:13:24 +01:00
self.predicate = predicate['value']
self.expanded = predicate.get('expanded')
self.description = predicate.get('description')
self.colour = predicate.get('colour')
self.exclusive = predicate.get('exclusive')
self.numerical_value = predicate.get('numerical_value')
2016-10-13 17:23:10 +02:00
self.__init_entries(entries)
2016-07-25 18:48:08 +02:00
2020-11-05 19:20:53 +01:00
def __init_entries(self, entries: Optional[List[Dict[str, str]]]=None) -> None:
2016-10-13 17:23:10 +02:00
self.entries = {}
if entries:
for e in entries:
2017-11-01 22:13:24 +01:00
self.entries[e['value']] = Entry(e)
2020-11-05 19:20:53 +01:00
def to_dict(self) -> Dict[str, Union[str, ValuesView[Entry]]]:
to_return: Dict[str, Union[str, ValuesView[Entry]]] = {'value': self.predicate}
2017-11-01 22:13:24 +01:00
if self.expanded:
to_return['expanded'] = self.expanded
if self.description:
to_return['description'] = self.description
if self.colour:
to_return['colour'] = self.colour
2022-05-13 16:35:20 +02:00
if self.exclusive is not None:
to_return['exclusive'] = self.exclusive
if self.numerical_value is not None:
to_return['numerical_value'] = self.numerical_value
2017-11-01 22:13:24 +01:00
if self.entries:
to_return['entries'] = self.values()
return to_return
2020-02-17 19:15:39 +01:00
def to_json(self) -> str:
2020-11-05 19:20:53 +01:00
return json.dumps(self, default=taxonomies_json_default)
2016-07-25 18:48:08 +02:00
2020-11-05 19:20:53 +01:00
def __str__(self) -> str:
2016-07-26 10:30:25 +02:00
return self.predicate
2020-11-05 19:20:53 +01:00
def __getitem__(self, entry: str) -> Entry:
2016-07-25 18:48:08 +02:00
return self.entries[entry]
2020-11-05 19:20:53 +01:00
def __iter__(self) -> Iterator[Any]:
2016-07-25 18:48:08 +02:00
return iter(self.entries)
2020-11-05 19:20:53 +01:00
def __len__(self) -> int:
2017-07-26 16:58:40 +02:00
return len(self.entries)
2016-07-25 18:48:08 +02:00
2020-11-05 19:20:53 +01:00
class Taxonomy(abc.Mapping): # type: ignore
2016-07-25 18:48:08 +02:00
2020-11-05 19:20:53 +01:00
def __init__(self, taxonomy: Optional[Dict[str, Union[str, List[Dict[str, Any]]]]]=None):
self.predicates: Dict[str, Predicate] = {}
if not taxonomy:
# We're creating a new one
self.expanded = None
self.refs = None
self.type = None
self.exclusive = None
return
2016-07-25 18:48:08 +02:00
self.taxonomy = taxonomy
self.name = self.taxonomy['namespace']
self.description = self.taxonomy['description']
self.version = self.taxonomy['version']
2016-10-05 18:10:23 +02:00
self.expanded = self.taxonomy.get('expanded')
2016-10-12 14:22:19 +02:00
self.refs = self.taxonomy.get('refs')
self.type = self.taxonomy.get('type')
self.exclusive = self.taxonomy.get('exclusive')
2016-07-25 18:48:08 +02:00
self.__init_predicates()
2020-11-05 19:20:53 +01:00
def __init_predicates(self) -> None:
entries: Dict[str, List[Dict[str, str]]] = {}
if self.taxonomy.get('values') and isinstance(self.taxonomy['values'], list):
2016-07-25 18:48:08 +02:00
for v in self.taxonomy['values']:
if not entries.get(v['predicate']):
entries[v['predicate']] = []
entries[v['predicate']] += v['entry']
for p in self.taxonomy['predicates']:
2020-11-05 19:20:53 +01:00
if isinstance(p, str):
continue
2017-11-01 22:13:24 +01:00
self.predicates[p['value']] = Predicate(p, entries.get(p['value']))
2020-11-05 19:20:53 +01:00
def to_json(self) -> str:
return json.dumps(self, default=taxonomies_json_default)
2017-11-01 22:13:24 +01:00
2020-11-05 19:20:53 +01:00
def to_dict(self) -> Dict[str, Union[str, List[Dict[str, Any]]]]:
2017-11-01 22:13:24 +01:00
to_return = {'namespace': self.name, 'description': self.description,
'version': self.version}
2016-10-13 17:23:10 +02:00
if self.expanded:
to_return['expanded'] = self.expanded
if self.refs:
to_return['refs'] = self.refs
if self.type:
to_return['type'] = self.type
2022-05-13 16:35:20 +02:00
if self.exclusive is not None:
to_return['exclusive'] = self.exclusive
2017-11-01 22:13:24 +01:00
predicates = [p.to_dict() for p in self.values()]
entries = []
for p in predicates:
if p.get('entries') is None:
continue
entries.append({'predicate': p['value'], 'entry': [e.to_dict() for e in p.pop('entries')]})
2017-11-01 22:13:24 +01:00
to_return['predicates'] = predicates
if entries:
to_return['values'] = entries
2016-10-13 17:23:10 +02:00
return to_return
2020-11-05 19:20:53 +01:00
def has_entries(self) -> bool:
2017-07-26 16:58:40 +02:00
if self.values():
for p in self.values():
2016-10-14 18:24:21 +02:00
if p.entries:
return True
2016-10-05 18:10:23 +02:00
return False
2020-11-05 19:20:53 +01:00
def __str__(self) -> str:
2016-07-28 11:54:14 +02:00
return '\n'.join(self.machinetags())
2016-07-26 10:30:25 +02:00
2020-11-05 19:20:53 +01:00
def make_machinetag(self, predicate: str, entry: Optional[Entry]=None) -> str:
2016-10-05 18:10:23 +02:00
if entry:
2020-11-05 19:20:53 +01:00
return f'{self.name}:{predicate}="{entry}"'
2016-10-05 18:10:23 +02:00
else:
2020-11-05 19:20:53 +01:00
return f'{self.name}:{predicate}'
2016-10-05 18:10:23 +02:00
2020-11-05 19:20:53 +01:00
def machinetags(self) -> List[str]:
2016-07-28 11:54:14 +02:00
to_return = []
2017-07-26 16:58:40 +02:00
for p, content in self.items():
2016-07-25 18:48:08 +02:00
if content:
for k in content.keys():
2020-11-05 19:20:53 +01:00
to_return.append(f'{self.name}:{p}="{k}"')
2016-07-25 18:48:08 +02:00
else:
2020-11-05 19:20:53 +01:00
to_return.append(f'{self.name}:{p}')
2016-07-25 18:48:08 +02:00
return to_return
2020-11-05 19:20:53 +01:00
def __getitem__(self, predicate: str) -> Predicate:
2016-07-25 18:48:08 +02:00
return self.predicates[predicate]
2020-11-05 19:20:53 +01:00
def __iter__(self) -> Iterator[Any]:
2016-07-25 18:48:08 +02:00
return iter(self.predicates)
2020-11-05 19:20:53 +01:00
def __len__(self) -> int:
2016-07-25 18:48:08 +02:00
return len(self.predicates)
2020-11-05 19:20:53 +01:00
def amount_entries(self) -> int:
2016-10-05 18:10:23 +02:00
if self.has_entries():
2017-07-26 16:58:40 +02:00
return sum([len(e) for e in self.values()])
2016-10-05 18:10:23 +02:00
else:
2017-07-26 16:58:40 +02:00
return len(self.keys())
2016-07-25 18:48:08 +02:00
2020-11-05 19:20:53 +01:00
def machinetags_expanded(self) -> List[str]:
2016-07-28 11:54:14 +02:00
to_return = []
2017-07-26 16:58:40 +02:00
for p, content in self.items():
2016-07-25 18:48:08 +02:00
if content:
for k, entry in content.items():
2016-10-12 14:43:22 +02:00
to_return.append('{}:{}="{}"'.format(self.name, p, entry.expanded))
2016-07-25 18:48:08 +02:00
else:
2016-07-28 11:54:14 +02:00
to_return.append('{}:{}'.format(self.name, p))
2016-07-25 18:48:08 +02:00
return to_return
2020-11-05 19:20:53 +01:00
class Taxonomies(abc.Mapping): # type: ignore
2016-07-25 18:48:08 +02:00
2022-05-13 16:35:20 +02:00
def __init__(self, manifest_url: Optional[str]=None,
manifest_path: Optional[Union[Path, str]]=None):
self.loader: Callable[..., Dict[Any, Any]]
if not manifest_url and not manifest_path:
# try path:
if sys.modules['pytaxonomies'].__file__:
root_path = Path(os.path.abspath(os.path.dirname(sys.modules['pytaxonomies'].__file__))) / 'data' / 'misp-taxonomies' / 'MANIFEST.json'
if root_path.exists():
manifest_path = root_path
if not manifest_path:
manifest_url = 'https://raw.githubusercontent.com/MISP/misp-taxonomies/main/MANIFEST.json'
if manifest_url:
2016-07-25 18:48:08 +02:00
self.loader = self.__load_url
self.manifest = self.loader(manifest_url)
2022-05-13 16:35:20 +02:00
elif manifest_path:
self.loader = self.__load_path
self.manifest = self.loader(manifest_path)
2016-07-25 18:48:08 +02:00
if manifest_path:
self.url = os.path.dirname(os.path.realpath(manifest_path))
2016-07-25 19:38:17 +02:00
else:
self.url = self.manifest['url']
2016-07-25 18:48:08 +02:00
self.version = self.manifest['version']
self.license = self.manifest['license']
self.description = self.manifest['description']
self.__init_taxonomies()
2020-11-05 19:20:53 +01:00
def validate_with_schema(self) -> None:
2017-07-25 16:19:34 +02:00
if not HAS_JSONSCHEMA:
raise ImportError('jsonschema is required: pip install jsonschema')
2022-05-13 16:35:20 +02:00
if sys.modules['pytaxonomies'].__file__:
schema = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pytaxonomies'].__file__)), 'data', 'misp-taxonomies', 'schema.json')
2023-07-06 05:23:18 +02:00
with open(schema, 'r', encoding="utf-8") as f:
2022-05-13 16:35:20 +02:00
loaded_schema = json.load(f)
for t in self.values():
jsonschema.validate(t.taxonomy, loaded_schema)
2017-07-25 16:19:34 +02:00
2020-11-05 19:20:53 +01:00
def __load_path(self, path: Union[Path, str]) -> Dict[str, Any]:
2020-02-17 19:15:39 +01:00
if isinstance(path, str):
path = Path(path)
2023-07-06 05:23:18 +02:00
with path.open('r', encoding="utf-8") as f:
2016-07-25 18:48:08 +02:00
return json.load(f)
2020-11-05 19:20:53 +01:00
def __load_url(self, url: str) -> Dict[str, Any]:
if not HAS_REQUESTS:
raise Exception("Python module 'requests' isn't installed, unable to fetch the taxonomies.")
2016-07-25 18:48:08 +02:00
return requests.get(url).json()
2020-11-05 19:20:53 +01:00
def __make_uri(self, taxonomy_name: str) -> str:
2020-02-17 19:15:39 +01:00
return f'{self.url}/{taxonomy_name}/{self.manifest["path"]}'
2016-07-25 18:48:08 +02:00
2020-11-05 19:20:53 +01:00
def __init_taxonomies(self) -> None:
2016-07-25 18:48:08 +02:00
self.taxonomies = {}
for t in self.manifest['taxonomies']:
uri = self.__make_uri(t['name'])
tax = self.loader(uri)
self.taxonomies[t['name']] = Taxonomy(tax)
2016-10-05 18:10:23 +02:00
if t['name'] != self.taxonomies[t['name']].name:
raise Exception("The name of the taxonomy in the manifest ({}) doesn't match with the name in the taxonomy ({})".format(t['name'], self.taxonomies[t['name']].name))
2016-07-25 18:48:08 +02:00
2020-11-05 19:20:53 +01:00
def __getitem__(self, name: str) -> Taxonomy:
2016-07-25 18:48:08 +02:00
return self.taxonomies[name]
2020-11-05 19:20:53 +01:00
def __iter__(self) -> Iterator[Any]:
2016-07-25 18:48:08 +02:00
return iter(self.taxonomies)
2020-11-05 19:20:53 +01:00
def __len__(self) -> int:
2016-07-25 18:48:08 +02:00
return len(self.taxonomies)
2020-11-05 19:20:53 +01:00
def __str__(self) -> str:
2016-07-28 11:54:14 +02:00
to_print = ''
2017-07-26 16:58:40 +02:00
for taxonomy in self.values():
2016-07-29 11:28:16 +02:00
to_print += "{}\n\n".format(str(taxonomy))
2016-07-28 11:54:14 +02:00
return to_print
2020-11-05 19:20:53 +01:00
def search(self, query: str, expanded: bool=False) -> List[str]:
2016-07-28 11:54:14 +02:00
query = query.lower()
to_return = []
2017-07-26 16:58:40 +02:00
for taxonomy in self.values():
2016-07-28 11:54:14 +02:00
if expanded:
machinetags = taxonomy.machinetags_expanded()
else:
machinetags = taxonomy.machinetags()
for mt in machinetags:
entries = [e.lower() for e in re.findall('[^:="]*', mt) if e]
for e in entries:
if e.startswith(query) or e.endswith(query):
to_return.append(mt)
2016-07-25 18:48:08 +02:00
return to_return
2016-07-28 11:54:14 +02:00
2020-11-05 19:20:53 +01:00
def revert_machinetag(self, machinetag: str) -> Union[Tuple[Taxonomy, Predicate, Entry], Tuple[Taxonomy, Predicate]]:
2016-10-05 18:10:23 +02:00
if '=' in machinetag:
name, predicat, entry = re.findall('^([^:]*):([^=]*)="([^"]*)"$', machinetag)[0]
else:
name, predicat = re.findall('^([^:]*):([^=]*)$', machinetag)[0]
entry = None
if entry:
return self.taxonomies[name], self.taxonomies[name][predicat], self.taxonomies[name][predicat][entry]
else:
return self.taxonomies[name], self.taxonomies[name][predicat]
2020-11-05 19:20:53 +01:00
def all_machinetags(self, expanded: bool=False) -> List[str]:
2016-07-28 11:54:14 +02:00
if expanded:
2017-07-26 16:58:40 +02:00
return [taxonomy.machinetags_expanded() for taxonomy in self.values()]
return [taxonomy.machinetags() for taxonomy in self.values()]