From f51e309775776ed123f4305f81c3b2b288522a4b Mon Sep 17 00:00:00 2001 From: Michael Chisholm Date: Fri, 8 Jan 2021 22:08:33 -0500 Subject: [PATCH 1/4] Refactor stix2.parsing into more focused modules: - stix2.registry, which contains the class mapping structure and code for scanning stix2 modules for its initial population - stix2.registration, which contains code used to register custom STIX types with the registry - stix2.parsing, which contains code for creating instances of registered stix2 classes from raw dicts. This is intended to reduce circular import problems, by giving dependent code the ability to import a module which has exactly the functionality it needs, without pulling a lot of other stuff it doesn't need. Fewer imports means less chance of an import cycle. --- stix2/__init__.py | 5 +- stix2/custom.py | 2 +- stix2/parsing.py | 237 ++------------------------------- stix2/properties.py | 3 +- stix2/registration.py | 201 ++++++++++++++++++++++++++++ stix2/registry.py | 29 ++++ stix2/test/v20/test_custom.py | 16 ++- stix2/test/v20/test_parsing.py | 8 +- stix2/test/v21/test_custom.py | 12 +- stix2/test/v21/test_parsing.py | 14 +- stix2/versioning.py | 3 +- 11 files changed, 274 insertions(+), 256 deletions(-) create mode 100644 stix2/registration.py create mode 100644 stix2/registry.py diff --git a/stix2/__init__.py b/stix2/__init__.py index 72fb29b..3cd992c 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -41,7 +41,7 @@ from .markings import ( add_markings, clear_markings, get_markings, is_marked, remove_markings, set_markings, ) -from .parsing import _collect_stix2_mappings, parse, parse_observable +from .parsing import parse, parse_observable from .patterns import ( AndBooleanExpression, AndObservationExpression, BasicObjectPathComponent, BinaryConstant, BooleanConstant, EqualityComparisonExpression, @@ -61,4 +61,5 @@ from .v21 import * # This import will always be the latest STIX 2.X version from .version import __version__ from .versioning import new_version, revoke -_collect_stix2_mappings() +import stix2.registry +stix2.registry._collect_stix2_mappings() diff --git a/stix2/custom.py b/stix2/custom.py index 08574ef..b012f37 100644 --- a/stix2/custom.py +++ b/stix2/custom.py @@ -3,7 +3,7 @@ from collections import OrderedDict import six from .base import _cls_init -from .parsing import ( +from .registration import ( _register_marking, _register_object, _register_observable, _register_observable_extension, ) diff --git a/stix2/parsing.py b/stix2/parsing.py index c0c7bf8..3b2882f 100644 --- a/stix2/parsing.py +++ b/stix2/parsing.py @@ -1,17 +1,10 @@ """STIX2 Core parsing methods.""" import copy -import importlib -import pkgutil -import re -import stix2 - -from .base import _DomainObject, _Observable -from .exceptions import DuplicateRegistrationError, ParseError -from .utils import PREFIX_21_REGEX, _get_dict, get_class_hierarchy_names - -STIX2_OBJ_MAPS = {} +import stix2.registry +from .exceptions import ParseError +from .utils import _get_dict def parse(data, allow_custom=False, version=None): @@ -79,7 +72,7 @@ def _detect_spec_version(stix_dict): _detect_spec_version(obj) for obj in stix_dict["objects"] ), ) - elif obj_type in STIX2_OBJ_MAPS["v21"]["observables"]: + elif obj_type in stix2.registry.STIX2_OBJ_MAPS["v21"]["observables"]: # Non-bundle object with an ID and without spec_version. Could be a # 2.1 SCO or 2.0 SDO/SRO/marking. Check for 2.1 SCO... v = "v21" @@ -128,7 +121,10 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None): else: v = _detect_spec_version(stix_dict) - OBJ_MAP = dict(STIX2_OBJ_MAPS[v]['objects'], **STIX2_OBJ_MAPS[v]['observables']) + OBJ_MAP = dict( + stix2.registry.STIX2_OBJ_MAPS[v]['objects'], + **stix2.registry.STIX2_OBJ_MAPS[v]['observables'] + ) try: obj_class = OBJ_MAP[stix_dict['type']] @@ -179,7 +175,7 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None): v = _detect_spec_version(obj) try: - OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables'] + OBJ_MAP_OBSERVABLE = stix2.registry.STIX2_OBJ_MAPS[v]['observables'] obj_class = OBJ_MAP_OBSERVABLE[obj['type']] except KeyError: if allow_custom: @@ -190,218 +186,3 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None): "use the CustomObservable decorator." % obj['type']) return obj_class(allow_custom=allow_custom, **obj) - - -def _register_object(new_type, version=stix2.DEFAULT_VERSION): - """Register a custom STIX Object type. - - Args: - new_type (class): A class to register in the Object map. - version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If - None, use latest version. - - Raises: - ValueError: If the class being registered wasn't created with the - @CustomObject decorator. - DuplicateRegistrationError: If the class has already been registered. - - """ - - if not issubclass(new_type, _DomainObject): - raise ValueError( - "'%s' must be created with the @CustomObject decorator." % - new_type.__name__, - ) - - properties = new_type._properties - - if version == "2.1": - for prop_name, prop in properties.items(): - if not re.match(PREFIX_21_REGEX, prop_name): - raise ValueError("Property name '%s' must begin with an alpha character" % prop_name) - - if version: - v = 'v' + version.replace('.', '') - else: - # Use default version (latest) if no version was provided. - v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') - - OBJ_MAP = STIX2_OBJ_MAPS[v]['objects'] - if new_type._type in OBJ_MAP.keys(): - raise DuplicateRegistrationError("STIX Object", new_type._type) - OBJ_MAP[new_type._type] = new_type - - -def _register_marking(new_marking, version=stix2.DEFAULT_VERSION): - """Register a custom STIX Marking Definition type. - - Args: - new_marking (class): A class to register in the Marking map. - version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If - None, use latest version. - - """ - - mark_type = new_marking._type - properties = new_marking._properties - - stix2.properties._validate_type(mark_type, version) - - if version == "2.1": - for prop_name, prop_value in properties.items(): - if not re.match(PREFIX_21_REGEX, prop_name): - raise ValueError("Property name '%s' must begin with an alpha character." % prop_name) - - if version: - v = 'v' + version.replace('.', '') - else: - # Use default version (latest) if no version was provided. - v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') - - OBJ_MAP_MARKING = STIX2_OBJ_MAPS[v]['markings'] - if mark_type in OBJ_MAP_MARKING.keys(): - raise DuplicateRegistrationError("STIX Marking", mark_type) - OBJ_MAP_MARKING[mark_type] = new_marking - - -def _register_observable(new_observable, version=stix2.DEFAULT_VERSION): - """Register a custom STIX Cyber Observable type. - - Args: - new_observable (class): A class to register in the Observables map. - version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If - None, use latest version. - - """ - properties = new_observable._properties - - if version == "2.0": - # If using STIX2.0, check properties ending in "_ref/s" are ObjectReferenceProperties - for prop_name, prop in properties.items(): - if prop_name.endswith('_ref') and ('ObjectReferenceProperty' not in get_class_hierarchy_names(prop)): - raise ValueError( - "'%s' is named like an object reference property but " - "is not an ObjectReferenceProperty." % prop_name, - ) - elif (prop_name.endswith('_refs') and ('ListProperty' not in get_class_hierarchy_names(prop) or - 'ObjectReferenceProperty' not in get_class_hierarchy_names(prop.contained))): - raise ValueError( - "'%s' is named like an object reference list property but " - "is not a ListProperty containing ObjectReferenceProperty." % prop_name, - ) - else: - # If using STIX2.1 (or newer...), check properties ending in "_ref/s" are ReferenceProperties - for prop_name, prop in properties.items(): - if not re.match(PREFIX_21_REGEX, prop_name): - raise ValueError("Property name '%s' must begin with an alpha character." % prop_name) - elif prop_name.endswith('_ref') and ('ReferenceProperty' not in get_class_hierarchy_names(prop)): - raise ValueError( - "'%s' is named like a reference property but " - "is not a ReferenceProperty." % prop_name, - ) - elif (prop_name.endswith('_refs') and ('ListProperty' not in get_class_hierarchy_names(prop) or - 'ReferenceProperty' not in get_class_hierarchy_names(prop.contained))): - raise ValueError( - "'%s' is named like a reference list property but " - "is not a ListProperty containing ReferenceProperty." % prop_name, - ) - - if version: - v = 'v' + version.replace('.', '') - else: - # Use default version (latest) if no version was provided. - v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') - - OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables'] - if new_observable._type in OBJ_MAP_OBSERVABLE.keys(): - raise DuplicateRegistrationError("Cyber Observable", new_observable._type) - OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable - - -def _register_observable_extension( - observable, new_extension, version=stix2.DEFAULT_VERSION, -): - """Register a custom extension to a STIX Cyber Observable type. - - Args: - observable: An observable class or instance - new_extension (class): A class to register in the Observables - Extensions map. - version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). - Defaults to the latest supported version. - - """ - obs_class = observable if isinstance(observable, type) else \ - type(observable) - ext_type = new_extension._type - properties = new_extension._properties - - if not issubclass(obs_class, _Observable): - raise ValueError("'observable' must be a valid Observable class!") - - stix2.properties._validate_type(ext_type, version) - - if not new_extension._properties: - raise ValueError( - "Invalid extension: must define at least one property: " + - ext_type, - ) - - if version == "2.1": - if not ext_type.endswith('-ext'): - raise ValueError( - "Invalid extension type name '%s': must end with '-ext'." % - ext_type, - ) - - for prop_name, prop_value in properties.items(): - if not re.match(PREFIX_21_REGEX, prop_name): - raise ValueError("Property name '%s' must begin with an alpha character." % prop_name) - - v = 'v' + version.replace('.', '') - - try: - observable_type = observable._type - except AttributeError: - raise ValueError( - "Unknown observable type. Custom observables must be " - "created with the @CustomObservable decorator.", - ) - - OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables'] - EXT_MAP = STIX2_OBJ_MAPS[v]['observable-extensions'] - - try: - if ext_type in EXT_MAP[observable_type].keys(): - raise DuplicateRegistrationError("Observable Extension", ext_type) - EXT_MAP[observable_type][ext_type] = new_extension - except KeyError: - if observable_type not in OBJ_MAP_OBSERVABLE: - raise ValueError( - "Unknown observable type '%s'. Custom observables " - "must be created with the @CustomObservable decorator." - % observable_type, - ) - else: - EXT_MAP[observable_type] = {ext_type: new_extension} - - -def _collect_stix2_mappings(): - """Navigate the package once and retrieve all object mapping dicts for each - v2X package. Includes OBJ_MAP, OBJ_MAP_OBSERVABLE, EXT_MAP.""" - if not STIX2_OBJ_MAPS: - top_level_module = importlib.import_module('stix2') - path = top_level_module.__path__ - prefix = str(top_level_module.__name__) + '.' - - for module_loader, name, is_pkg in pkgutil.walk_packages(path=path, prefix=prefix): - ver = name.split('.')[1] - if re.match(r'^stix2\.v2[0-9]$', name) and is_pkg: - mod = importlib.import_module(name, str(top_level_module.__name__)) - STIX2_OBJ_MAPS[ver] = {} - STIX2_OBJ_MAPS[ver]['objects'] = mod.OBJ_MAP - STIX2_OBJ_MAPS[ver]['observables'] = mod.OBJ_MAP_OBSERVABLE - STIX2_OBJ_MAPS[ver]['observable-extensions'] = mod.EXT_MAP - elif re.match(r'^stix2\.v2[0-9]\.common$', name) and is_pkg is False: - mod = importlib.import_module(name, str(top_level_module.__name__)) - STIX2_OBJ_MAPS[ver]['markings'] = mod.OBJ_MAP_MARKING diff --git a/stix2/properties.py b/stix2/properties.py index 1ca2dbe..36c027d 100644 --- a/stix2/properties.py +++ b/stix2/properties.py @@ -16,7 +16,8 @@ from .exceptions import ( CustomContentError, DictionaryKeyError, MissingPropertiesError, MutuallyExclusivePropertiesError, STIXError, ) -from .parsing import STIX2_OBJ_MAPS, parse, parse_observable +from .parsing import parse, parse_observable +from .registry import STIX2_OBJ_MAPS from .utils import _get_dict, get_class_hierarchy_names, parse_into_datetime try: diff --git a/stix2/registration.py b/stix2/registration.py new file mode 100644 index 0000000..248929a --- /dev/null +++ b/stix2/registration.py @@ -0,0 +1,201 @@ +import re + +import stix2 +import stix2.registry +from .base import _DomainObject, _Observable +from .exceptions import DuplicateRegistrationError +from .utils import PREFIX_21_REGEX, get_class_hierarchy_names + + +def _register_object(new_type, version=stix2.DEFAULT_VERSION): + """Register a custom STIX Object type. + + Args: + new_type (class): A class to register in the Object map. + version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If + None, use latest version. + + Raises: + ValueError: If the class being registered wasn't created with the + @CustomObject decorator. + DuplicateRegistrationError: If the class has already been registered. + + """ + + if not issubclass(new_type, _DomainObject): + raise ValueError( + "'%s' must be created with the @CustomObject decorator." % + new_type.__name__, + ) + + properties = new_type._properties + + if version == "2.1": + for prop_name, prop in properties.items(): + if not re.match(PREFIX_21_REGEX, prop_name): + raise ValueError("Property name '%s' must begin with an alpha character" % prop_name) + + if version: + v = 'v' + version.replace('.', '') + else: + # Use default version (latest) if no version was provided. + v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') + + OBJ_MAP = stix2.registry.STIX2_OBJ_MAPS[v]['objects'] + if new_type._type in OBJ_MAP.keys(): + raise DuplicateRegistrationError("STIX Object", new_type._type) + OBJ_MAP[new_type._type] = new_type + + +def _register_marking(new_marking, version=stix2.DEFAULT_VERSION): + """Register a custom STIX Marking Definition type. + + Args: + new_marking (class): A class to register in the Marking map. + version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If + None, use latest version. + + """ + + mark_type = new_marking._type + properties = new_marking._properties + + stix2.properties._validate_type(mark_type, version) + + if version == "2.1": + for prop_name, prop_value in properties.items(): + if not re.match(PREFIX_21_REGEX, prop_name): + raise ValueError("Property name '%s' must begin with an alpha character." % prop_name) + + if version: + v = 'v' + version.replace('.', '') + else: + # Use default version (latest) if no version was provided. + v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') + + OBJ_MAP_MARKING = stix2.registry.STIX2_OBJ_MAPS[v]['markings'] + if mark_type in OBJ_MAP_MARKING.keys(): + raise DuplicateRegistrationError("STIX Marking", mark_type) + OBJ_MAP_MARKING[mark_type] = new_marking + + +def _register_observable(new_observable, version=stix2.DEFAULT_VERSION): + """Register a custom STIX Cyber Observable type. + + Args: + new_observable (class): A class to register in the Observables map. + version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If + None, use latest version. + + """ + properties = new_observable._properties + + if version == "2.0": + # If using STIX2.0, check properties ending in "_ref/s" are ObjectReferenceProperties + for prop_name, prop in properties.items(): + if prop_name.endswith('_ref') and ('ObjectReferenceProperty' not in get_class_hierarchy_names(prop)): + raise ValueError( + "'%s' is named like an object reference property but " + "is not an ObjectReferenceProperty." % prop_name, + ) + elif (prop_name.endswith('_refs') and ('ListProperty' not in get_class_hierarchy_names(prop) or + 'ObjectReferenceProperty' not in get_class_hierarchy_names(prop.contained))): + raise ValueError( + "'%s' is named like an object reference list property but " + "is not a ListProperty containing ObjectReferenceProperty." % prop_name, + ) + else: + # If using STIX2.1 (or newer...), check properties ending in "_ref/s" are ReferenceProperties + for prop_name, prop in properties.items(): + if not re.match(PREFIX_21_REGEX, prop_name): + raise ValueError("Property name '%s' must begin with an alpha character." % prop_name) + elif prop_name.endswith('_ref') and ('ReferenceProperty' not in get_class_hierarchy_names(prop)): + raise ValueError( + "'%s' is named like a reference property but " + "is not a ReferenceProperty." % prop_name, + ) + elif (prop_name.endswith('_refs') and ('ListProperty' not in get_class_hierarchy_names(prop) or + 'ReferenceProperty' not in get_class_hierarchy_names(prop.contained))): + raise ValueError( + "'%s' is named like a reference list property but " + "is not a ListProperty containing ReferenceProperty." % prop_name, + ) + + if version: + v = 'v' + version.replace('.', '') + else: + # Use default version (latest) if no version was provided. + v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') + + OBJ_MAP_OBSERVABLE = stix2.registry.STIX2_OBJ_MAPS[v]['observables'] + if new_observable._type in OBJ_MAP_OBSERVABLE.keys(): + raise DuplicateRegistrationError("Cyber Observable", new_observable._type) + OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable + + +def _register_observable_extension( + observable, new_extension, version=stix2.DEFAULT_VERSION, +): + """Register a custom extension to a STIX Cyber Observable type. + + Args: + observable: An observable class or instance + new_extension (class): A class to register in the Observables + Extensions map. + version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). + Defaults to the latest supported version. + + """ + obs_class = observable if isinstance(observable, type) else \ + type(observable) + ext_type = new_extension._type + properties = new_extension._properties + + if not issubclass(obs_class, _Observable): + raise ValueError("'observable' must be a valid Observable class!") + + stix2.properties._validate_type(ext_type, version) + + if not new_extension._properties: + raise ValueError( + "Invalid extension: must define at least one property: " + + ext_type, + ) + + if version == "2.1": + if not ext_type.endswith('-ext'): + raise ValueError( + "Invalid extension type name '%s': must end with '-ext'." % + ext_type, + ) + + for prop_name, prop_value in properties.items(): + if not re.match(PREFIX_21_REGEX, prop_name): + raise ValueError("Property name '%s' must begin with an alpha character." % prop_name) + + v = 'v' + version.replace('.', '') + + try: + observable_type = observable._type + except AttributeError: + raise ValueError( + "Unknown observable type. Custom observables must be " + "created with the @CustomObservable decorator.", + ) + + OBJ_MAP_OBSERVABLE = stix2.registry.STIX2_OBJ_MAPS[v]['observables'] + EXT_MAP = stix2.registry.STIX2_OBJ_MAPS[v]['observable-extensions'] + + try: + if ext_type in EXT_MAP[observable_type].keys(): + raise DuplicateRegistrationError("Observable Extension", ext_type) + EXT_MAP[observable_type][ext_type] = new_extension + except KeyError: + if observable_type not in OBJ_MAP_OBSERVABLE: + raise ValueError( + "Unknown observable type '%s'. Custom observables " + "must be created with the @CustomObservable decorator." + % observable_type, + ) + else: + EXT_MAP[observable_type] = {ext_type: new_extension} diff --git a/stix2/registry.py b/stix2/registry.py new file mode 100644 index 0000000..80d7cdc --- /dev/null +++ b/stix2/registry.py @@ -0,0 +1,29 @@ +import importlib +import pkgutil +import re + + +# Collects information on which classes implement which STIX types, for the +# various STIX spec versions. +STIX2_OBJ_MAPS = {} + + +def _collect_stix2_mappings(): + """Navigate the package once and retrieve all object mapping dicts for each + v2X package. Includes OBJ_MAP, OBJ_MAP_OBSERVABLE, EXT_MAP.""" + if not STIX2_OBJ_MAPS: + top_level_module = importlib.import_module('stix2') + path = top_level_module.__path__ + prefix = str(top_level_module.__name__) + '.' + + for module_loader, name, is_pkg in pkgutil.walk_packages(path=path, prefix=prefix): + ver = name.split('.')[1] + if re.match(r'^stix2\.v2[0-9]$', name) and is_pkg: + mod = importlib.import_module(name, str(top_level_module.__name__)) + STIX2_OBJ_MAPS[ver] = {} + STIX2_OBJ_MAPS[ver]['objects'] = mod.OBJ_MAP + STIX2_OBJ_MAPS[ver]['observables'] = mod.OBJ_MAP_OBSERVABLE + STIX2_OBJ_MAPS[ver]['observable-extensions'] = mod.EXT_MAP + elif re.match(r'^stix2\.v2[0-9]\.common$', name) and is_pkg is False: + mod = importlib.import_module(name, str(top_level_module.__name__)) + STIX2_OBJ_MAPS[ver]['markings'] = mod.OBJ_MAP_MARKING diff --git a/stix2/test/v20/test_custom.py b/stix2/test/v20/test_custom.py index 70835c1..77ee13d 100644 --- a/stix2/test/v20/test_custom.py +++ b/stix2/test/v20/test_custom.py @@ -1,7 +1,9 @@ import pytest import stix2 -from stix2 import parsing +import stix2.parsing +import stix2.registry +import stix2.registration import stix2.v20 from ...exceptions import DuplicateRegistrationError, InvalidValueError @@ -981,7 +983,7 @@ def test_register_custom_object(): _type = 'awesome-object' with pytest.raises(ValueError): - stix2.parsing._register_object(CustomObject2, version="2.0") + stix2.registration._register_object(CustomObject2, version="2.0") def test_extension_property_location(): @@ -1041,10 +1043,10 @@ def test_register_custom_object_with_version(): "id": "x-new-type-2--00000000-0000-4000-8000-000000000007", } - cust_obj_1 = parsing.dict_to_stix2(custom_obj_1, version='2.0') + cust_obj_1 = stix2.parsing.dict_to_stix2(custom_obj_1, version='2.0') v = 'v20' - assert cust_obj_1.type in parsing.STIX2_OBJ_MAPS[v]['objects'] + assert cust_obj_1.type in stix2.registry.STIX2_OBJ_MAPS[v]['objects'] # spec_version is not in STIX 2.0, and is required in 2.1, so this # suffices as a test for a STIX 2.0 object. assert "spec_version" not in cust_obj_1 @@ -1076,7 +1078,7 @@ def test_register_observable_with_version(): custom_obs = NewObservable2(property1="Test Observable") v = 'v20' - assert custom_obs.type in parsing.STIX2_OBJ_MAPS[v]['observables'] + assert custom_obs.type in stix2.registry.STIX2_OBJ_MAPS[v]['observables'] def test_register_duplicate_observable_with_version(): @@ -1102,7 +1104,7 @@ def test_register_marking_with_version(): v = 'v20' no = NewObj2(property1='something') - assert no._type in parsing.STIX2_OBJ_MAPS[v]['markings'] + assert no._type in stix2.registry.STIX2_OBJ_MAPS[v]['markings'] def test_register_observable_extension_with_version(): @@ -1117,7 +1119,7 @@ def test_register_observable_extension_with_version(): v = 'v20' example = SomeCustomExtension2(keys='test123') - assert example._type in parsing.STIX2_OBJ_MAPS[v]['observable-extensions']['user-account'] + assert example._type in stix2.registry.STIX2_OBJ_MAPS[v]['observable-extensions']['user-account'] def test_register_duplicate_observable_extension(): diff --git a/stix2/test/v20/test_parsing.py b/stix2/test/v20/test_parsing.py index 6eb554e..12c6e33 100644 --- a/stix2/test/v20/test_parsing.py +++ b/stix2/test/v20/test_parsing.py @@ -3,7 +3,7 @@ from collections import OrderedDict import pytest import stix2 -from stix2 import exceptions, parsing +from stix2 import exceptions, parsing, registration, registry BUNDLE = { "type": "bundle", @@ -73,8 +73,8 @@ def test_register_marking_with_version(): _type = 'x-new-marking1' _properties = OrderedDict() - parsing._register_marking(NewMarking1, version='2.0') + registration._register_marking(NewMarking1, version='2.0') v = 'v20' - assert NewMarking1._type in parsing.STIX2_OBJ_MAPS[v]['markings'] - assert v in str(parsing.STIX2_OBJ_MAPS[v]['markings'][NewMarking1._type]) + assert NewMarking1._type in registry.STIX2_OBJ_MAPS[v]['markings'] + assert v in str(registry.STIX2_OBJ_MAPS[v]['markings'][NewMarking1._type]) diff --git a/stix2/test/v21/test_custom.py b/stix2/test/v21/test_custom.py index ea6d3a8..f9cb574 100644 --- a/stix2/test/v21/test_custom.py +++ b/stix2/test/v21/test_custom.py @@ -4,6 +4,8 @@ import pytest import stix2 import stix2.base +import stix2.registration +import stix2.registry import stix2.v21 from ...exceptions import DuplicateRegistrationError, InvalidValueError @@ -1199,7 +1201,7 @@ def test_register_custom_object(): _type = 'awesome-object' with pytest.raises(ValueError) as excinfo: - stix2.parsing._register_object(CustomObject2, version="2.1") + stix2.registration._register_object(CustomObject2, version="2.1") assert '@CustomObject decorator' in str(excinfo) @@ -1265,7 +1267,7 @@ def test_register_custom_object_with_version(): cust_obj_1 = stix2.parsing.dict_to_stix2(custom_obj_1, version='2.1') v = 'v21' - assert cust_obj_1.type in stix2.parsing.STIX2_OBJ_MAPS[v]['objects'] + assert cust_obj_1.type in stix2.registry.STIX2_OBJ_MAPS[v]['objects'] assert cust_obj_1.spec_version == "2.1" @@ -1295,7 +1297,7 @@ def test_register_observable(): custom_obs = NewObservable3(property1="Test Observable") v = 'v21' - assert custom_obs.type in stix2.parsing.STIX2_OBJ_MAPS[v]['observables'] + assert custom_obs.type in stix2.registry.STIX2_OBJ_MAPS[v]['observables'] def test_register_duplicate_observable(): @@ -1323,8 +1325,8 @@ def test_register_observable_custom_extension(): example = NewExtension2(property1="Hi there") v = 'v21' - assert 'domain-name' in stix2.parsing.STIX2_OBJ_MAPS[v]['observables'] - assert example._type in stix2.parsing.STIX2_OBJ_MAPS[v]['observable-extensions']['domain-name'] + assert 'domain-name' in stix2.registry.STIX2_OBJ_MAPS[v]['observables'] + assert example._type in stix2.registry.STIX2_OBJ_MAPS[v]['observable-extensions']['domain-name'] def test_register_duplicate_observable_extension(): diff --git a/stix2/test/v21/test_parsing.py b/stix2/test/v21/test_parsing.py index 53b53b2..b448941 100644 --- a/stix2/test/v21/test_parsing.py +++ b/stix2/test/v21/test_parsing.py @@ -3,7 +3,7 @@ from collections import OrderedDict import pytest import stix2 -from stix2 import exceptions, parsing +from stix2 import exceptions, parsing, registration, registry BUNDLE = { "type": "bundle", @@ -78,11 +78,11 @@ def test_register_marking_with_version(): _type = 'x-new-marking1' _properties = OrderedDict() - parsing._register_marking(NewMarking1, version='2.1') + registration._register_marking(NewMarking1, version='2.1') v = 'v21' - assert NewMarking1._type in parsing.STIX2_OBJ_MAPS[v]['markings'] - assert v in str(parsing.STIX2_OBJ_MAPS[v]['markings'][NewMarking1._type]) + assert NewMarking1._type in registry.STIX2_OBJ_MAPS[v]['markings'] + assert v in str(registry.STIX2_OBJ_MAPS[v]['markings'][NewMarking1._type]) @pytest.mark.xfail(reason="The default version is not 2.1", condition=stix2.DEFAULT_VERSION != "2.1") @@ -92,8 +92,8 @@ def test_register_marking_with_no_version(): _type = 'x-new-marking2' _properties = OrderedDict() - parsing._register_marking(NewMarking2) + registration._register_marking(NewMarking2) v = 'v21' - assert NewMarking2._type in parsing.STIX2_OBJ_MAPS[v]['markings'] - assert v in str(parsing.STIX2_OBJ_MAPS[v]['markings'][NewMarking2._type]) + assert NewMarking2._type in registry.STIX2_OBJ_MAPS[v]['markings'] + assert v in str(registry.STIX2_OBJ_MAPS[v]['markings'][NewMarking2._type]) diff --git a/stix2/versioning.py b/stix2/versioning.py index 90d1ac9..e66f394 100644 --- a/stix2/versioning.py +++ b/stix2/versioning.py @@ -9,6 +9,7 @@ import six from six.moves.collections_abc import Mapping import stix2.base +import stix2.registry from stix2.utils import get_timestamp, parse_into_datetime import stix2.v20 @@ -109,7 +110,7 @@ def _is_versionable(data): # registered class, and from that get a more complete picture of its # properties. elif isinstance(data, dict): - class_maps = stix2.parsing.STIX2_OBJ_MAPS[stix_vid] + class_maps = stix2.registry.STIX2_OBJ_MAPS[stix_vid] obj_type = data["type"] if obj_type in class_maps["objects"]: From 5d016142cfc086d61d266302495b9d35f355bd13 Mon Sep 17 00:00:00 2001 From: Michael Chisholm Date: Wed, 13 Jan 2021 11:22:34 -0500 Subject: [PATCH 2/4] Small tweaks: move the definition of DEFAULT_VERSION from the top-level stix2 package to stix2.version but import it into stix2. This makes it possible for someone to get the symbol without needing to import all of stix2. Change an "import X" style import to "from X import Y" in stix2/__init__.py to be consistent with the other imports in that file. --- stix2/__init__.py | 6 +++--- stix2/version.py | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/stix2/__init__.py b/stix2/__init__.py index 3cd992c..1747c9b 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -24,7 +24,7 @@ # flake8: noqa -DEFAULT_VERSION = '2.1' # Default version will always be the latest STIX 2.X version +from .version import DEFAULT_VERSION from .confidence import scales from .datastore import CompositeDataSource @@ -61,5 +61,5 @@ from .v21 import * # This import will always be the latest STIX 2.X version from .version import __version__ from .versioning import new_version, revoke -import stix2.registry -stix2.registry._collect_stix2_mappings() +from .registry import _collect_stix2_mappings +_collect_stix2_mappings() diff --git a/stix2/version.py b/stix2/version.py index 9aa3f90..9ddc874 100644 --- a/stix2/version.py +++ b/stix2/version.py @@ -1 +1,3 @@ __version__ = "2.1.0" + +DEFAULT_VERSION = '2.1' # Default version will always be the latest STIX 2.X version From a3f20dde7a225da4ad8e3c008aa63ce916bc9c97 Mon Sep 17 00:00:00 2001 From: Chris Lenk Date: Fri, 15 Jan 2021 10:27:23 -0500 Subject: [PATCH 3/4] Use consistent import style in parsing.py --- stix2/parsing.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/stix2/parsing.py b/stix2/parsing.py index 3b2882f..c7c9ff7 100644 --- a/stix2/parsing.py +++ b/stix2/parsing.py @@ -2,7 +2,7 @@ import copy -import stix2.registry +from . import registry from .exceptions import ParseError from .utils import _get_dict @@ -72,7 +72,7 @@ def _detect_spec_version(stix_dict): _detect_spec_version(obj) for obj in stix_dict["objects"] ), ) - elif obj_type in stix2.registry.STIX2_OBJ_MAPS["v21"]["observables"]: + elif obj_type in registry.STIX2_OBJ_MAPS["v21"]["observables"]: # Non-bundle object with an ID and without spec_version. Could be a # 2.1 SCO or 2.0 SDO/SRO/marking. Check for 2.1 SCO... v = "v21" @@ -122,8 +122,8 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None): v = _detect_spec_version(stix_dict) OBJ_MAP = dict( - stix2.registry.STIX2_OBJ_MAPS[v]['objects'], - **stix2.registry.STIX2_OBJ_MAPS[v]['observables'] + registry.STIX2_OBJ_MAPS[v]['objects'], + **registry.STIX2_OBJ_MAPS[v]['observables'] ) try: @@ -175,7 +175,7 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None): v = _detect_spec_version(obj) try: - OBJ_MAP_OBSERVABLE = stix2.registry.STIX2_OBJ_MAPS[v]['observables'] + OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[v]['observables'] obj_class = OBJ_MAP_OBSERVABLE[obj['type']] except KeyError: if allow_custom: From 7de5c458bb155bc72ffb2f245c1c4f0777033465 Mon Sep 17 00:00:00 2001 From: Chris Lenk Date: Fri, 15 Jan 2021 10:27:39 -0500 Subject: [PATCH 4/4] Fix import sort order --- stix2/__init__.py | 6 ++---- stix2/registration.py | 1 + stix2/registry.py | 1 - stix2/test/v20/test_custom.py | 2 +- 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/stix2/__init__.py b/stix2/__init__.py index 1747c9b..31821cf 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -24,8 +24,6 @@ # flake8: noqa -from .version import DEFAULT_VERSION - from .confidence import scales from .datastore import CompositeDataSource from .datastore.filesystem import ( @@ -57,9 +55,9 @@ from .patterns import ( RepeatQualifier, StartStopQualifier, StringConstant, TimestampConstant, WithinQualifier, ) +from .registry import _collect_stix2_mappings from .v21 import * # This import will always be the latest STIX 2.X version -from .version import __version__ +from .version import DEFAULT_VERSION, __version__ from .versioning import new_version, revoke -from .registry import _collect_stix2_mappings _collect_stix2_mappings() diff --git a/stix2/registration.py b/stix2/registration.py index 248929a..13bca74 100644 --- a/stix2/registration.py +++ b/stix2/registration.py @@ -2,6 +2,7 @@ import re import stix2 import stix2.registry + from .base import _DomainObject, _Observable from .exceptions import DuplicateRegistrationError from .utils import PREFIX_21_REGEX, get_class_hierarchy_names diff --git a/stix2/registry.py b/stix2/registry.py index 80d7cdc..6cb6cd8 100644 --- a/stix2/registry.py +++ b/stix2/registry.py @@ -2,7 +2,6 @@ import importlib import pkgutil import re - # Collects information on which classes implement which STIX types, for the # various STIX spec versions. STIX2_OBJ_MAPS = {} diff --git a/stix2/test/v20/test_custom.py b/stix2/test/v20/test_custom.py index 77ee13d..6ce4a62 100644 --- a/stix2/test/v20/test_custom.py +++ b/stix2/test/v20/test_custom.py @@ -2,8 +2,8 @@ import pytest import stix2 import stix2.parsing -import stix2.registry import stix2.registration +import stix2.registry import stix2.v20 from ...exceptions import DuplicateRegistrationError, InvalidValueError