mirror of https://github.com/MISP/PyTaxonomies
chg: Improve typing,cleanup, bump deps
parent
dc4f445cf8
commit
397f39da17
|
@ -9,6 +9,8 @@ python:
|
|||
- "3.7-dev"
|
||||
- "3.8"
|
||||
- "3.8-dev"
|
||||
- "3.9"
|
||||
- "3.9-dev"
|
||||
|
||||
install:
|
||||
- pip install poetry
|
||||
|
@ -16,7 +18,7 @@ install:
|
|||
|
||||
script:
|
||||
- poetry run nosetests-3.4 --with-coverage --cover-package=pytaxonomies -d
|
||||
- poetry run mypy --no-strict-optional pytaxonomies
|
||||
- poetry run mypy pytaxonomies
|
||||
|
||||
after_success:
|
||||
- poetry run codecov
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
[mypy]
|
||||
disallow_untyped_calls=True
|
||||
disallow_untyped_defs=True
|
||||
disallow_incomplete_defs=True
|
||||
check_untyped_defs=True
|
||||
disallow_any_generics=True
|
||||
python_version = 3.8
|
||||
ignore_errors = False
|
||||
ignore_missing_imports = False
|
||||
strict_optional = True
|
||||
no_implicit_optional = True
|
||||
warn_unused_ignores = True
|
||||
warn_redundant_casts = True
|
||||
warn_unused_configs = True
|
||||
warn_unreachable = True
|
||||
|
||||
show_error_context = True
|
||||
pretty = True
|
|
@ -1,14 +1,14 @@
|
|||
[[package]]
|
||||
name = "attrs"
|
||||
version = "20.2.0"
|
||||
version = "20.3.0"
|
||||
description = "Classes Without Boilerplate"
|
||||
category = "dev"
|
||||
optional = false
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
|
||||
|
||||
[package.extras]
|
||||
dev = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "six", "zope.interface", "sphinx", "sphinx-rtd-theme", "pre-commit"]
|
||||
docs = ["sphinx", "sphinx-rtd-theme", "zope.interface"]
|
||||
dev = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "six", "zope.interface", "furo", "sphinx", "pre-commit"]
|
||||
docs = ["furo", "sphinx", "zope.interface"]
|
||||
tests = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "six", "zope.interface"]
|
||||
tests_no_zope = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "six"]
|
||||
|
||||
|
@ -276,7 +276,7 @@ urllib3 = ">=1.21.1,<1.25.0 || >1.25.0,<1.25.1 || >1.25.1,<1.26"
|
|||
|
||||
[package.extras]
|
||||
security = ["pyOpenSSL (>=0.14)", "cryptography (>=1.3.4)"]
|
||||
socks = ["PySocks (>=1.5.6,<1.5.7 || >1.5.7)", "win-inet-pton"]
|
||||
socks = ["PySocks (>=1.5.6,!=1.5.7)", "win-inet-pton"]
|
||||
|
||||
[[package]]
|
||||
name = "six"
|
||||
|
@ -313,7 +313,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4"
|
|||
[package.extras]
|
||||
brotli = ["brotlipy (>=0.6.0)"]
|
||||
secure = ["pyOpenSSL (>=0.14)", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "certifi", "ipaddress"]
|
||||
socks = ["PySocks (>=1.5.6,<1.5.7 || >1.5.7,<2.0)"]
|
||||
socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "visitor"
|
||||
|
@ -361,7 +361,7 @@ python-versions = ">=3.6"
|
|||
|
||||
[package.extras]
|
||||
docs = ["sphinx", "jaraco.packaging (>=3.2)", "rst.linker (>=1.9)"]
|
||||
testing = ["pytest (>=3.5,<3.7.3 || >3.7.3)", "pytest-checkdocs (>=1.2.3)", "pytest-flake8", "pytest-cov", "jaraco.test (>=3.2.0)", "jaraco.itertools", "func-timeout", "pytest-black (>=0.3.7)", "pytest-mypy"]
|
||||
testing = ["pytest (>=3.5,!=3.7.3)", "pytest-checkdocs (>=1.2.3)", "pytest-flake8", "pytest-cov", "jaraco.test (>=3.2.0)", "jaraco.itertools", "func-timeout", "pytest-black (>=0.3.7)", "pytest-mypy"]
|
||||
|
||||
[extras]
|
||||
remote = ["requests"]
|
||||
|
@ -374,8 +374,8 @@ content-hash = "f764428a06aaf11bbc20b506ad51b18a8a5c3b0cb3a1f81d85bd01b9ee9b0d69
|
|||
|
||||
[metadata.files]
|
||||
attrs = [
|
||||
{file = "attrs-20.2.0-py2.py3-none-any.whl", hash = "sha256:fce7fc47dfc976152e82d53ff92fa0407700c21acd20886a13777a0d20e655dc"},
|
||||
{file = "attrs-20.2.0.tar.gz", hash = "sha256:26b54ddbbb9ee1d34d5d3668dd37d6cf74990ab23c828c2888dccdceee395594"},
|
||||
{file = "attrs-20.3.0-py2.py3-none-any.whl", hash = "sha256:31b2eced602aa8423c2aea9c76a724617ed67cf9513173fd3a4f03e3a929c7e6"},
|
||||
{file = "attrs-20.3.0.tar.gz", hash = "sha256:832aa3cde19744e49938b91fea06d69ecb9e649c93ba974535d08ad92164f700"},
|
||||
]
|
||||
certifi = [
|
||||
{file = "certifi-2020.6.20-py2.py3-none-any.whl", hash = "sha256:8fc0819f1f30ba15bdb34cceffb9ef04d99f420f68eb75d901e9560b8749fc41"},
|
||||
|
|
|
@ -1 +1 @@
|
|||
from .api import Taxonomies, EncodeTaxonomies, Taxonomy, Predicate, Entry
|
||||
from .api import Taxonomies, Taxonomy, Predicate, Entry
|
||||
|
|
|
@ -3,12 +3,11 @@
|
|||
|
||||
import json
|
||||
import os
|
||||
from collections.abc import Mapping
|
||||
from collections import abc
|
||||
import re
|
||||
import sys
|
||||
from json import JSONEncoder
|
||||
from pathlib import Path
|
||||
from typing import Union, Dict, Optional, List, Callable, Any
|
||||
from typing import Union, Dict, Optional, List, Callable, Any, ValuesView, Iterator, Tuple
|
||||
|
||||
try:
|
||||
import requests
|
||||
|
@ -23,11 +22,9 @@ except ImportError:
|
|||
HAS_JSONSCHEMA = False
|
||||
|
||||
|
||||
class EncodeTaxonomies(JSONEncoder):
|
||||
def default(self, obj):
|
||||
if isinstance(obj, (Taxonomy, Predicate, Entry)):
|
||||
return obj.to_dict()
|
||||
return JSONEncoder.default(self, obj)
|
||||
def taxonomies_json_default(obj: Union['Taxonomy', 'Predicate', 'Entry']) -> Dict[str, Any]:
|
||||
if isinstance(obj, (Taxonomy, Predicate, Entry)):
|
||||
return obj.to_dict()
|
||||
|
||||
|
||||
class Entry():
|
||||
|
@ -59,25 +56,28 @@ class Entry():
|
|||
return to_return
|
||||
|
||||
def to_json(self) -> str:
|
||||
return json.dumps(self, cls=EncodeTaxonomies)
|
||||
return json.dumps(self, default=taxonomies_json_default)
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
return self.value
|
||||
|
||||
|
||||
class Predicate(Mapping):
|
||||
class Predicate(abc.Mapping): # type: ignore
|
||||
|
||||
def __init__(self, predicate: Optional[Dict[str, str]]=None,
|
||||
entries: Optional[List[Dict[str, str]]]=None):
|
||||
if not predicate and not entries:
|
||||
# We're creating a new one
|
||||
self.expanded = None
|
||||
self.description = None
|
||||
self.colour = None
|
||||
self.exclusive = None
|
||||
self.numerical_value = None
|
||||
self.entries: Dict[str, Entry] = {}
|
||||
return
|
||||
if not predicate:
|
||||
if entries:
|
||||
raise Exception('Need predicates if entries.')
|
||||
else:
|
||||
# We're creating a new one
|
||||
self.expanded = None
|
||||
self.description = None
|
||||
self.colour = None
|
||||
self.exclusive = None
|
||||
self.numerical_value = None
|
||||
self.entries: Dict[str, Entry] = {}
|
||||
return
|
||||
self.predicate = predicate['value']
|
||||
self.expanded = predicate.get('expanded')
|
||||
self.description = predicate.get('description')
|
||||
|
@ -86,14 +86,14 @@ class Predicate(Mapping):
|
|||
self.numerical_value = predicate.get('numerical_value')
|
||||
self.__init_entries(entries)
|
||||
|
||||
def __init_entries(self, entries: Optional[List[Dict[str, str]]]=None):
|
||||
def __init_entries(self, entries: Optional[List[Dict[str, str]]]=None) -> None:
|
||||
self.entries = {}
|
||||
if entries:
|
||||
for e in entries:
|
||||
self.entries[e['value']] = Entry(e)
|
||||
|
||||
def to_dict(self):
|
||||
to_return = {'value': self.predicate}
|
||||
def to_dict(self) -> Dict[str, Union[str, ValuesView[Entry]]]:
|
||||
to_return: Dict[str, Union[str, ValuesView[Entry]]] = {'value': self.predicate}
|
||||
if self.expanded:
|
||||
to_return['expanded'] = self.expanded
|
||||
if self.description:
|
||||
|
@ -109,31 +109,31 @@ class Predicate(Mapping):
|
|||
return to_return
|
||||
|
||||
def to_json(self) -> str:
|
||||
return json.dumps(self, cls=EncodeTaxonomies)
|
||||
return json.dumps(self, default=taxonomies_json_default)
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
return self.predicate
|
||||
|
||||
def __getitem__(self, entry):
|
||||
def __getitem__(self, entry: str) -> Entry:
|
||||
return self.entries[entry]
|
||||
|
||||
def __iter__(self):
|
||||
def __iter__(self) -> Iterator[Any]:
|
||||
return iter(self.entries)
|
||||
|
||||
def __len__(self):
|
||||
def __len__(self) -> int:
|
||||
return len(self.entries)
|
||||
|
||||
|
||||
class Taxonomy(Mapping):
|
||||
class Taxonomy(abc.Mapping): # type: ignore
|
||||
|
||||
def __init__(self, taxonomy=None):
|
||||
def __init__(self, taxonomy: Optional[Dict[str, Union[str, List[Dict[str, Any]]]]]=None):
|
||||
self.predicates: Dict[str, Predicate] = {}
|
||||
if not taxonomy:
|
||||
# We're creating a new one
|
||||
self.expanded = None
|
||||
self.refs = None
|
||||
self.type = None
|
||||
self.exclusive = None
|
||||
self.predicates: Dict[str, Predicate] = {}
|
||||
return
|
||||
self.taxonomy = taxonomy
|
||||
self.name = self.taxonomy['namespace']
|
||||
|
@ -145,21 +145,22 @@ class Taxonomy(Mapping):
|
|||
self.exclusive = self.taxonomy.get('exclusive')
|
||||
self.__init_predicates()
|
||||
|
||||
def __init_predicates(self):
|
||||
self.predicates = {}
|
||||
entries = {}
|
||||
if self.taxonomy.get('values'):
|
||||
def __init_predicates(self) -> None:
|
||||
entries: Dict[str, List[Dict[str, str]]] = {}
|
||||
if self.taxonomy.get('values') and isinstance(self.taxonomy['values'], list):
|
||||
for v in self.taxonomy['values']:
|
||||
if not entries.get(v['predicate']):
|
||||
entries[v['predicate']] = []
|
||||
entries[v['predicate']] += v['entry']
|
||||
for p in self.taxonomy['predicates']:
|
||||
if isinstance(p, str):
|
||||
continue
|
||||
self.predicates[p['value']] = Predicate(p, entries.get(p['value']))
|
||||
|
||||
def to_json(self):
|
||||
return json.dumps(self, cls=EncodeTaxonomies)
|
||||
def to_json(self) -> str:
|
||||
return json.dumps(self, default=taxonomies_json_default)
|
||||
|
||||
def to_dict(self):
|
||||
def to_dict(self) -> Dict[str, Union[str, List[Dict[str, Any]]]]:
|
||||
to_return = {'namespace': self.name, 'description': self.description,
|
||||
'version': self.version}
|
||||
if self.expanded:
|
||||
|
@ -181,48 +182,48 @@ class Taxonomy(Mapping):
|
|||
to_return['values'] = entries
|
||||
return to_return
|
||||
|
||||
def has_entries(self):
|
||||
def has_entries(self) -> bool:
|
||||
if self.values():
|
||||
for p in self.values():
|
||||
if p.entries:
|
||||
return True
|
||||
return False
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
return '\n'.join(self.machinetags())
|
||||
|
||||
def make_machinetag(self, predicate, entry=None):
|
||||
def make_machinetag(self, predicate: str, entry: Optional[Entry]=None) -> str:
|
||||
if entry:
|
||||
return '{}:{}="{}"'.format(self.name, predicate, entry)
|
||||
return f'{self.name}:{predicate}="{entry}"'
|
||||
else:
|
||||
return '{}:{}'.format(self.name, predicate)
|
||||
return f'{self.name}:{predicate}'
|
||||
|
||||
def machinetags(self):
|
||||
def machinetags(self) -> List[str]:
|
||||
to_return = []
|
||||
for p, content in self.items():
|
||||
if content:
|
||||
for k in content.keys():
|
||||
to_return.append('{}:{}="{}"'.format(self.name, p, k))
|
||||
to_return.append(f'{self.name}:{p}="{k}"')
|
||||
else:
|
||||
to_return.append('{}:{}'.format(self.name, p))
|
||||
to_return.append(f'{self.name}:{p}')
|
||||
return to_return
|
||||
|
||||
def __getitem__(self, predicate):
|
||||
def __getitem__(self, predicate: str) -> Predicate:
|
||||
return self.predicates[predicate]
|
||||
|
||||
def __iter__(self):
|
||||
def __iter__(self) -> Iterator[Any]:
|
||||
return iter(self.predicates)
|
||||
|
||||
def __len__(self):
|
||||
def __len__(self) -> int:
|
||||
return len(self.predicates)
|
||||
|
||||
def amount_entries(self):
|
||||
def amount_entries(self) -> int:
|
||||
if self.has_entries():
|
||||
return sum([len(e) for e in self.values()])
|
||||
else:
|
||||
return len(self.keys())
|
||||
|
||||
def machinetags_expanded(self):
|
||||
def machinetags_expanded(self) -> List[str]:
|
||||
to_return = []
|
||||
for p, content in self.items():
|
||||
if content:
|
||||
|
@ -233,7 +234,7 @@ class Taxonomy(Mapping):
|
|||
return to_return
|
||||
|
||||
|
||||
class Taxonomies(Mapping):
|
||||
class Taxonomies(abc.Mapping): # type: ignore
|
||||
|
||||
def __init__(self, manifest_url: str='https://raw.githubusercontent.com/MISP/misp-taxonomies/main/MANIFEST.json',
|
||||
manifest_path: Union[Path, str]=Path(os.path.abspath(os.path.dirname(sys.modules['pytaxonomies'].__file__))) / 'data' / 'misp-taxonomies' / 'MANIFEST.json'):
|
||||
|
@ -253,7 +254,7 @@ class Taxonomies(Mapping):
|
|||
self.description = self.manifest['description']
|
||||
self.__init_taxonomies()
|
||||
|
||||
def validate_with_schema(self):
|
||||
def validate_with_schema(self) -> None:
|
||||
if not HAS_JSONSCHEMA:
|
||||
raise ImportError('jsonschema is required: pip install jsonschema')
|
||||
schema = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pytaxonomies'].__file__)), 'data', 'misp-taxonomies', 'schema.json')
|
||||
|
@ -262,21 +263,21 @@ class Taxonomies(Mapping):
|
|||
for t in self.values():
|
||||
jsonschema.validate(t.taxonomy, loaded_schema)
|
||||
|
||||
def __load_path(self, path: Union[Path, str]) -> Dict:
|
||||
def __load_path(self, path: Union[Path, str]) -> Dict[str, Any]:
|
||||
if isinstance(path, str):
|
||||
path = Path(path)
|
||||
with path.open('r') as f:
|
||||
return json.load(f)
|
||||
|
||||
def __load_url(self, url: str) -> Dict:
|
||||
def __load_url(self, url: str) -> Dict[str, Any]:
|
||||
if not HAS_REQUESTS:
|
||||
raise Exception("Python module 'requests' isn't installed, unable to fetch the taxonomies.")
|
||||
return requests.get(url).json()
|
||||
|
||||
def __make_uri(self, taxonomy_name) -> str:
|
||||
def __make_uri(self, taxonomy_name: str) -> str:
|
||||
return f'{self.url}/{taxonomy_name}/{self.manifest["path"]}'
|
||||
|
||||
def __init_taxonomies(self):
|
||||
def __init_taxonomies(self) -> None:
|
||||
self.taxonomies = {}
|
||||
for t in self.manifest['taxonomies']:
|
||||
uri = self.__make_uri(t['name'])
|
||||
|
@ -285,22 +286,22 @@ class Taxonomies(Mapping):
|
|||
if t['name'] != self.taxonomies[t['name']].name:
|
||||
raise Exception("The name of the taxonomy in the manifest ({}) doesn't match with the name in the taxonomy ({})".format(t['name'], self.taxonomies[t['name']].name))
|
||||
|
||||
def __getitem__(self, name: str):
|
||||
def __getitem__(self, name: str) -> Taxonomy:
|
||||
return self.taxonomies[name]
|
||||
|
||||
def __iter__(self):
|
||||
def __iter__(self) -> Iterator[Any]:
|
||||
return iter(self.taxonomies)
|
||||
|
||||
def __len__(self):
|
||||
def __len__(self) -> int:
|
||||
return len(self.taxonomies)
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
to_print = ''
|
||||
for taxonomy in self.values():
|
||||
to_print += "{}\n\n".format(str(taxonomy))
|
||||
return to_print
|
||||
|
||||
def search(self, query: str, expanded: bool=False) -> List:
|
||||
def search(self, query: str, expanded: bool=False) -> List[str]:
|
||||
query = query.lower()
|
||||
to_return = []
|
||||
for taxonomy in self.values():
|
||||
|
@ -315,7 +316,7 @@ class Taxonomies(Mapping):
|
|||
to_return.append(mt)
|
||||
return to_return
|
||||
|
||||
def revert_machinetag(self, machinetag: str):
|
||||
def revert_machinetag(self, machinetag: str) -> Union[Tuple[Taxonomy, Predicate, Entry], Tuple[Taxonomy, Predicate]]:
|
||||
if '=' in machinetag:
|
||||
name, predicat, entry = re.findall('^([^:]*):([^=]*)="([^"]*)"$', machinetag)[0]
|
||||
else:
|
||||
|
@ -326,7 +327,7 @@ class Taxonomies(Mapping):
|
|||
else:
|
||||
return self.taxonomies[name], self.taxonomies[name][predicat]
|
||||
|
||||
def all_machinetags(self, expanded: bool=False):
|
||||
def all_machinetags(self, expanded: bool=False) -> List[str]:
|
||||
if expanded:
|
||||
return [taxonomy.machinetags_expanded() for taxonomy in self.values()]
|
||||
return [taxonomy.machinetags() for taxonomy in self.values()]
|
||||
|
|
|
@ -5,7 +5,7 @@ import argparse
|
|||
from pytaxonomies import Taxonomies
|
||||
|
||||
|
||||
def main():
|
||||
def main() -> None:
|
||||
argParser = argparse.ArgumentParser(description='Use MISP taxonomies')
|
||||
argParser.add_argument('-a', '--all', action='store_true', help='Print all taxonomies as machine tags')
|
||||
argParser.add_argument('-l', '--local', default=None, help='Use local manifest file.')
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
import json
|
||||
import unittest
|
||||
from pytaxonomies import Taxonomies, EncodeTaxonomies
|
||||
from pytaxonomies import Taxonomies
|
||||
import pytaxonomies.api
|
||||
|
||||
|
||||
|
@ -23,7 +23,6 @@ class TestPyTaxonomies(unittest.TestCase):
|
|||
self.assertEqual(str(t_online), str(t_offline))
|
||||
self.assertEqual(str(taxonomies_online), str(self.taxonomies_offline))
|
||||
|
||||
|
||||
def test_expanded_machinetags(self):
|
||||
self.taxonomies_offline.all_machinetags(expanded=True)
|
||||
|
||||
|
@ -75,7 +74,7 @@ class TestPyTaxonomies(unittest.TestCase):
|
|||
|
||||
def test_json(self):
|
||||
for key, t in self.taxonomies_offline.items():
|
||||
json.dumps(t, cls=EncodeTaxonomies)
|
||||
t.to_json()
|
||||
|
||||
def test_recreate_dump(self):
|
||||
self.maxDiff = None
|
||||
|
|
Loading…
Reference in New Issue