From bd897c9848c6a16237cd4db479328d9e368b3e1c Mon Sep 17 00:00:00 2001 From: maybe-sybr <58414429+maybe-sybr@users.noreply.github.com> Date: Fri, 25 Jun 2021 10:27:36 +1000 Subject: [PATCH 1/3] improv: Check reference prop types for all customs Previously the requirement that properties ending in `_ref(s)` were instances of an appropriate type capable of containing a STIX `identifier` (the current interpretation of section 3.1 of the STIX 2.1 spec) was only enforced for custom observables. This change refactors the property checks for custom objects to enforce this and the STIX 2.1 property name requirement (also from section 3.1) in a common helper, therefore extending the enforcement of both requirements to all custom object types created by downstream projects. There is a special case carved out for STIX 2.0 observables which are required to contain "object references" rather than identifiers. This special logic is encoded in the reference property validation helper and enabled by a kwarg which is passed exclusively by the custom observable registration helper. --- stix2/registration.py | 161 ++++++++++++++++++---------------- stix2/test/v20/test_custom.py | 126 +++++++++++++++++++++++++- stix2/test/v21/test_custom.py | 126 +++++++++++++++++++++++++- 3 files changed, 333 insertions(+), 80 deletions(-) diff --git a/stix2/registration.py b/stix2/registration.py index 9fb2f49..85c6e9a 100644 --- a/stix2/registration.py +++ b/stix2/registration.py @@ -1,11 +1,71 @@ -import itertools import re from . import registry, version from .base import _DomainObject from .exceptions import DuplicateRegistrationError -from .properties import _validate_type -from .utils import PREFIX_21_REGEX, get_class_hierarchy_names +from .properties import ( + ListProperty, ObjectReferenceProperty, ReferenceProperty, _validate_type, +) +from .utils import PREFIX_21_REGEX + + +def _validate_ref_props(props_map, is_observable20=False): + """ + Validate that reference properties contain an expected type. + + Properties ending in "_ref/s" need to be instances of specific types to + meet the specification requirements. For 2.0 and 2.1 conformance, these + properties are expected to be implemented with `ReferenceProperty` (or a + subclass thereof), except for the special case of STIX 2.0 observables + which must be implemented with `ObjectReferenceProperty`. + + Args: + props_map (mapping): A mapping of STIX object properties to be checked. + is_observable20 (bool): Flag for the STIX 2.0 observables special case. + + Raises: + ValueError: If the properties do not conform. + """ + if is_observable20: + ref_prop_type = ObjectReferenceProperty + else: + ref_prop_type = ReferenceProperty + for prop_name, prop_obj in props_map.items(): + tail = prop_name.rsplit("_", 1)[-1] + if tail == "ref" and not isinstance(prop_obj, ref_prop_type): + raise ValueError( + f"{prop_name!r} is named like a reference property but is not " + f"a subclass of {ref_prop_type.__name__!r}.", + ) + elif tail == "refs" and not ( + isinstance(prop_obj, ListProperty) + and isinstance(prop_obj.contained, ref_prop_type) + ): + raise ValueError( + f"{prop_name!r} is named like a reference list property but is not " + f"a 'ListProperty' containing a subclass of {ref_prop_type.__name__!r}.", + ) + + +def _validate_props(props_map, version, **kwargs): + """ + Validate that a map of properties is conformant for this STIX `version`. + + Args: + props_map (mapping): A mapping of STIX object properties to be checked. + version (str): Which STIX2 version the properties must confirm to. + kwargs (mapping): Arguments to pass on to specific property validators. + + Raises: + ValueError: If the properties do not conform. + """ + # Confirm conformance with STIX 2.1+ requirements for property names + if version != "2.0": + for prop_name, prop_value in props_map.items(): + if not re.match(PREFIX_21_REGEX, prop_name): + raise ValueError("Property name '%s' must begin with an alpha character." % prop_name) + # Confirm conformance of reference properties + _validate_ref_props(props_map, **kwargs) def _register_object(new_type, version=version.DEFAULT_VERSION): @@ -29,15 +89,10 @@ def _register_object(new_type, version=version.DEFAULT_VERSION): new_type.__name__, ) - properties = new_type._properties - if not version: version = version.DEFAULT_VERSION - if version == "2.1": - for prop_name, prop in properties.items(): - if not re.match(PREFIX_21_REGEX, prop_name): - raise ValueError("Property name '%s' must begin with an alpha character" % prop_name) + _validate_props(new_type._properties, version) OBJ_MAP = registry.STIX2_OBJ_MAPS[version]['objects'] if new_type._type in OBJ_MAP.keys(): @@ -54,19 +109,12 @@ def _register_marking(new_marking, version=version.DEFAULT_VERSION): None, use latest version. """ - - mark_type = new_marking._type - properties = new_marking._properties - if not version: version = version.DEFAULT_VERSION + mark_type = new_marking._type _validate_type(mark_type, version) - - if version == "2.1": - for prop_name, prop_value in properties.items(): - if not re.match(PREFIX_21_REGEX, prop_name): - raise ValueError("Property name '%s' must begin with an alpha character." % prop_name) + _validate_props(new_marking._properties, version) OBJ_MAP_MARKING = registry.STIX2_OBJ_MAPS[version]['markings'] if mark_type in OBJ_MAP_MARKING.keys(): @@ -83,49 +131,13 @@ def _register_observable(new_observable, version=version.DEFAULT_VERSION): None, use latest version. """ - properties = new_observable._properties - if not version: version = version.DEFAULT_VERSION - if version == "2.0": - # If using STIX2.0, check properties ending in "_ref/s" are ObjectReferenceProperties - for prop_name, prop in properties.items(): - if prop_name.endswith('_ref') and ('ObjectReferenceProperty' not in get_class_hierarchy_names(prop)): - raise ValueError( - "'%s' is named like an object reference property but " - "is not an ObjectReferenceProperty." % prop_name, - ) - elif ( - prop_name.endswith('_refs') and ( - 'ListProperty' not in get_class_hierarchy_names(prop) or - 'ObjectReferenceProperty' not in get_class_hierarchy_names(prop.contained) - ) - ): - raise ValueError( - "'%s' is named like an object reference list property but " - "is not a ListProperty containing ObjectReferenceProperty." % prop_name, - ) - else: - # If using STIX2.1 (or newer...), check properties ending in "_ref/s" are ReferenceProperties - for prop_name, prop in properties.items(): - if not re.match(PREFIX_21_REGEX, prop_name): - raise ValueError("Property name '%s' must begin with an alpha character." % prop_name) - elif prop_name.endswith('_ref') and ('ReferenceProperty' not in get_class_hierarchy_names(prop)): - raise ValueError( - "'%s' is named like a reference property but " - "is not a ReferenceProperty." % prop_name, - ) - elif ( - prop_name.endswith('_refs') and ( - 'ListProperty' not in get_class_hierarchy_names(prop) or - 'ReferenceProperty' not in get_class_hierarchy_names(prop.contained) - ) - ): - raise ValueError( - "'%s' is named like a reference list property but " - "is not a ListProperty containing ReferenceProperty." % prop_name, - ) + _validate_props( + new_observable._properties, version, + is_observable20=(version == "2.0"), + ) OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[version]['observables'] if new_observable._type in OBJ_MAP_OBSERVABLE.keys(): @@ -146,20 +158,7 @@ def _register_extension( """ ext_type = new_extension._type - # Need to check both toplevel and nested properties - prop_groups = [new_extension._properties] - if hasattr(new_extension, "_toplevel_properties"): - prop_groups.append(new_extension._toplevel_properties) - prop_names = itertools.chain.from_iterable(prop_groups) - _validate_type(ext_type, version) - - if not new_extension._properties: - raise ValueError( - "Invalid extension: must define at least one property: " + - ext_type, - ) - if version == "2.1": if not (ext_type.endswith('-ext') or ext_type.startswith('extension-definition--')): raise ValueError( @@ -167,9 +166,23 @@ def _register_extension( ext_type, ) - for prop_name in prop_names: - if not re.match(PREFIX_21_REGEX, prop_name): - raise ValueError("Property name '%s' must begin with an alpha character." % prop_name) + tl_props = getattr(new_extension, "_toplevel_properties", None) + if any(( + # There must always be at least one property in an extension. This + # holds for instances of both custom object extensions which must + # contain one or more custom properties, and `extension-definition`s + # which must contain an `extension_type` property. + not new_extension._properties, + # If a top-level properties mapping is provided, it cannot be empty + tl_props is not None and not tl_props, + )): + raise ValueError( + "Invalid extension: must define at least one property: " + + ext_type, + ) + # We need to validate all properties related to this extension + combined_props = dict(new_extension._properties, **(tl_props or dict())) + _validate_props(combined_props, version) EXT_MAP = registry.STIX2_OBJ_MAPS[version]['extensions'] diff --git a/stix2/test/v20/test_custom.py b/stix2/test/v20/test_custom.py index 4647167..3264af9 100644 --- a/stix2/test/v20/test_custom.py +++ b/stix2/test/v20/test_custom.py @@ -385,6 +385,84 @@ def test_custom_object_invalid_type_name(): assert "Invalid type name 'x_new_object':" in str(excinfo.value) +def test_custom_object_ref_property_as_identifier(): + @stix2.v20.CustomObject( + 'x-new-obj-with-ref', [ + ('property_ref', stix2.properties.ReferenceProperty(invalid_types=[])), + ], + ) + class NewObs(): + pass + + +def test_custom_object_refs_property_containing_identifiers(): + @stix2.v20.CustomObject( + 'x-new-obj-with-refs', [ + ('property_refs', stix2.properties.ListProperty(stix2.properties.ReferenceProperty(invalid_types=[]))), + ], + ) + class NewObs(): + pass + + +def test_custom_object_ref_property_as_objectref(): + with pytest.raises(ValueError, match=r"not a subclass of 'ReferenceProperty"): + @stix2.v20.CustomObject( + 'x-new-obj-with-objref', [ + ('property_ref', stix2.properties.ObjectReferenceProperty()), + ], + ) + class NewObs(): + pass + + +def test_custom_object_refs_property_containing_objectrefs(): + with pytest.raises(ValueError, match=r"not a 'ListProperty' containing a subclass of 'ReferenceProperty"): + @stix2.v20.CustomObject( + 'x-new-obj-with-objrefs', [ + ('property_refs', stix2.properties.ListProperty(stix2.properties.ObjectReferenceProperty())), + ], + ) + class NewObs(): + pass + + +def test_custom_object_invalid_ref_property(): + with pytest.raises(ValueError) as excinfo: + @stix2.v20.CustomObject( + 'x-new-obj', [ + ('property_ref', stix2.properties.StringProperty()), + ], + ) + class NewObs(): + pass + assert "is named like a reference property but is not" in str(excinfo.value) + + +def test_custom_object_invalid_refs_property(): + with pytest.raises(ValueError) as excinfo: + @stix2.v20.CustomObject( + 'x-new-obj', [ + ('property_refs', stix2.properties.StringProperty()), + ], + ) + class NewObs(): + pass + assert "is named like a reference list property but is not" in str(excinfo.value) + + +def test_custom_object_invalid_refs_list_property(): + with pytest.raises(ValueError) as excinfo: + @stix2.v20.CustomObject( + 'x-new-obj', [ + ('property_refs', stix2.properties.ListProperty(stix2.properties.StringProperty)), + ], + ) + class NewObs(): + pass + assert "is named like a reference list property but is not" in str(excinfo.value) + + def test_custom_subobject_dict(): obj_dict = { "type": "bundle", @@ -562,6 +640,48 @@ def test_custom_observable_object_invalid_type_name(): assert "Invalid type name 'x_new_obs':" in str(excinfo.value) +def test_custom_observable_object_ref_property_as_identifier(): + with pytest.raises(ValueError, match=r"not a subclass of 'ObjectReferenceProperty"): + @stix2.v20.CustomObservable( + 'x-new-obs-with-ref', [ + ('property_ref', stix2.properties.ReferenceProperty(invalid_types=[])), + ], + ) + class NewObs(): + pass + + +def test_custom_observable_object_refs_property_containing_identifiers(): + with pytest.raises(ValueError, match=r"not a 'ListProperty' containing a subclass of 'ObjectReferenceProperty"): + @stix2.v20.CustomObservable( + 'x-new-obs-with-refs', [ + ('property_refs', stix2.properties.ListProperty(stix2.properties.ReferenceProperty(invalid_types=[]))), + ], + ) + class NewObs(): + pass + + +def test_custom_observable_object_ref_property_as_objectref(): + @stix2.v20.CustomObservable( + 'x-new-obs-with-objref', [ + ('property_ref', stix2.properties.ObjectReferenceProperty()), + ], + ) + class NewObs(): + pass + + +def test_custom_observable_object_refs_property_containing_objectrefs(): + @stix2.v20.CustomObservable( + 'x-new-obs-with-objrefs', [ + ('property_refs', stix2.properties.ListProperty(stix2.properties.ObjectReferenceProperty())), + ], + ) + class NewObs(): + pass + + def test_custom_observable_object_invalid_ref_property(): with pytest.raises(ValueError) as excinfo: @stix2.v20.CustomObservable( @@ -571,7 +691,7 @@ def test_custom_observable_object_invalid_ref_property(): ) class NewObs(): pass - assert "is named like an object reference property but is not an ObjectReferenceProperty" in str(excinfo.value) + assert "is named like a reference property but is not" in str(excinfo.value) def test_custom_observable_object_invalid_refs_property(): @@ -583,7 +703,7 @@ def test_custom_observable_object_invalid_refs_property(): ) class NewObs(): pass - assert "is named like an object reference list property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value) + assert "is named like a reference list property but is not" in str(excinfo.value) def test_custom_observable_object_invalid_refs_list_property(): @@ -595,7 +715,7 @@ def test_custom_observable_object_invalid_refs_list_property(): ) class NewObs(): pass - assert "is named like an object reference list property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value) + assert "is named like a reference list property but is not" in str(excinfo.value) def test_custom_observable_object_invalid_valid_refs(): diff --git a/stix2/test/v21/test_custom.py b/stix2/test/v21/test_custom.py index 13eeba9..03a8357 100644 --- a/stix2/test/v21/test_custom.py +++ b/stix2/test/v21/test_custom.py @@ -538,6 +538,84 @@ def test_custom_object_invalid_type_name(): assert "Invalid type name '7x-new-object':" in str(excinfo.value) +def test_custom_object_ref_property_containing_identifier(): + @stix2.v21.CustomObject( + 'x-new-obj-with-ref', [ + ('property_ref', stix2.properties.ReferenceProperty(invalid_types=[])), + ], + ) + class NewObs(): + pass + + +def test_custom_object_refs_property_containing_identifiers(): + @stix2.v21.CustomObject( + 'x-new-obj-with-refs', [ + ('property_refs', stix2.properties.ListProperty(stix2.properties.ReferenceProperty(invalid_types=[]))), + ], + ) + class NewObs(): + pass + + +def test_custom_object_ref_property_containing_objectref(): + with pytest.raises(ValueError, match=r"not a subclass of 'ReferenceProperty"): + @stix2.v21.CustomObject( + 'x-new-obj-with-objref', [ + ('property_ref', stix2.properties.ObjectReferenceProperty()), + ], + ) + class NewObs(): + pass + + +def test_custom_object_refs_property_containing_objectrefs(): + with pytest.raises(ValueError, match=r"not a 'ListProperty' containing a subclass of 'ReferenceProperty"): + @stix2.v21.CustomObject( + 'x-new-obj-with-objrefs', [ + ('property_refs', stix2.properties.ListProperty(stix2.properties.ObjectReferenceProperty())), + ], + ) + class NewObs(): + pass + + +def test_custom_object_invalid_ref_property(): + with pytest.raises(ValueError) as excinfo: + @stix2.v21.CustomObject( + 'x-new-obj', [ + ('property_ref', stix2.properties.StringProperty()), + ], + ) + class NewObs(): + pass + assert "is named like a reference property but is not" in str(excinfo.value) + + +def test_custom_object_invalid_refs_property(): + with pytest.raises(ValueError) as excinfo: + @stix2.v21.CustomObject( + 'x-new-obj', [ + ('property_refs', stix2.properties.StringProperty()), + ], + ) + class NewObs(): + pass + assert "is named like a reference list property but is not" in str(excinfo.value) + + +def test_custom_object_invalid_refs_list_property(): + with pytest.raises(ValueError) as excinfo: + @stix2.v21.CustomObject( + 'x-new-obj', [ + ('property_refs', stix2.properties.ListProperty(stix2.properties.StringProperty)), + ], + ) + class NewObs(): + pass + assert "is named like a reference list property but is not" in str(excinfo.value) + + def test_custom_subobject_dict(): obj_dict = { "type": "bundle", @@ -730,6 +808,48 @@ def test_custom_observable_object_invalid_type_name(): assert "Invalid type name '7x-new-obs':" in str(excinfo.value) +def test_custom_observable_object_ref_property_as_identifier(): + @stix2.v21.CustomObservable( + 'x-new-obs-with-ref', [ + ('property_ref', stix2.properties.ReferenceProperty(invalid_types=[])), + ], + ) + class NewObs(): + pass + + +def test_custom_observable_object_refs_property_containing_identifiers(): + @stix2.v21.CustomObservable( + 'x-new-obs-with-refs', [ + ('property_refs', stix2.properties.ListProperty(stix2.properties.ReferenceProperty(invalid_types=[]))), + ], + ) + class NewObs(): + pass + + +def test_custom_observable_object_ref_property_as_objectref(): + with pytest.raises(ValueError, match=r"not a subclass of 'ReferenceProperty"): + @stix2.v21.CustomObservable( + 'x-new-obs-with-objref', [ + ('property_ref', stix2.properties.ObjectReferenceProperty()), + ], + ) + class NewObs(): + pass + + +def test_custom_observable_object_refs_property_containing_objectrefs(): + with pytest.raises(ValueError, match=r"not a 'ListProperty' containing a subclass of 'ReferenceProperty"): + @stix2.v21.CustomObservable( + 'x-new-obs-with-objrefs', [ + ('property_refs', stix2.properties.ListProperty(stix2.properties.ObjectReferenceProperty())), + ], + ) + class NewObs(): + pass + + def test_custom_observable_object_invalid_ref_property(): with pytest.raises(ValueError) as excinfo: @stix2.v21.CustomObservable( @@ -739,7 +859,7 @@ def test_custom_observable_object_invalid_ref_property(): ) class NewObs(): pass - assert "is named like a reference property but is not a ReferenceProperty" in str(excinfo.value) + assert "is named like a reference property but is not" in str(excinfo.value) def test_custom_observable_object_invalid_refs_property(): @@ -751,7 +871,7 @@ def test_custom_observable_object_invalid_refs_property(): ) class NewObs(): pass - assert "is named like a reference list property but is not a ListProperty containing ReferenceProperty" in str(excinfo.value) + assert "is named like a reference list property but is not" in str(excinfo.value) def test_custom_observable_object_invalid_refs_list_property(): @@ -763,7 +883,7 @@ def test_custom_observable_object_invalid_refs_list_property(): ) class NewObs(): pass - assert "is named like a reference list property but is not a ListProperty containing ReferenceProperty" in str(excinfo.value) + assert "is named like a reference list property but is not" in str(excinfo.value) def test_custom_no_properties_raises_exception(): From a3d10561220e1faa79e50ef6078d24c43a652a73 Mon Sep 17 00:00:00 2001 From: Michael Chisholm Date: Tue, 24 Aug 2021 00:15:50 -0400 Subject: [PATCH 2/3] Fix TAXIICollectionSource.query() to propagate HTTPErrors instead of ignoring them. Special-cased 416, since that can occur naturally while paging with TAXII 2.0. --- stix2/datastore/taxii.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/stix2/datastore/taxii.py b/stix2/datastore/taxii.py index 9ad6df9..84e8c35 100644 --- a/stix2/datastore/taxii.py +++ b/stix2/datastore/taxii.py @@ -296,19 +296,10 @@ class TAXIICollectionSource(DataSource): # query TAXII collection all_data = [] + paged_request = tcv21.as_pages if isinstance(self.collection, tcv21.Collection) else tcv20.as_pages try: - paged_request = tcv21.as_pages if isinstance(self.collection, tcv21.Collection) else tcv20.as_pages - for resource in paged_request(self.collection.get_objects, per_request=self.items_per_page, **taxii_filters_dict): all_data.extend(resource.get("objects", [])) - - # deduplicate data (before filtering as reduces wasted filtering) - all_data = deduplicate(all_data) - - # apply local (CompositeDataSource, TAXIICollectionSource and query) filters - query.remove(taxii_filters) - all_data = list(apply_common_filters(all_data, query)) - except HTTPError as e: # if resources not found or access is denied from TAXII server, return empty list if e.response.status_code == 404: @@ -317,6 +308,19 @@ class TAXIICollectionSource(DataSource): " the supplied TAXII Collection object are either not found or access is" " denied. Received error: ", e, ) + elif e.response.status_code != 416: + # TAXII 2.0 paging can result in a 416 (Range Not Satisfiable) + # if the server isn't sending Content-Range headers, so the + # pager just goes until it runs out of pages. So 416 can't be + # treated as a real error, just an end-of-pages condition. + raise + + # deduplicate data (before filtering as reduces wasted filtering) + all_data = deduplicate(all_data) + + # apply local (CompositeDataSource, TAXIICollectionSource and query) filters + query.remove(taxii_filters) + all_data = list(apply_common_filters(all_data, query)) # parse python STIX objects from the STIX object dicts stix_objs = [parse(stix_obj_dict, allow_custom=self.allow_custom, version=version) for stix_obj_dict in all_data] From a4ce0222bf8ef6bb04c70f9958fb7cdd7ca8f4aa Mon Sep 17 00:00:00 2001 From: Michael Chisholm Date: Tue, 24 Aug 2021 15:04:05 -0400 Subject: [PATCH 3/3] Move and edit a comment about HTTP status code 416 to reduce confusion. --- stix2/datastore/taxii.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/stix2/datastore/taxii.py b/stix2/datastore/taxii.py index 84e8c35..04b3c83 100644 --- a/stix2/datastore/taxii.py +++ b/stix2/datastore/taxii.py @@ -308,11 +308,13 @@ class TAXIICollectionSource(DataSource): " the supplied TAXII Collection object are either not found or access is" " denied. Received error: ", e, ) + + # TAXII 2.0 paging can result in a 416 (Range Not Satisfiable) if + # the server isn't sending Content-Range headers, so the pager just + # goes until it runs out of pages. So 416 can't be treated as a + # real error, just an end-of-pages condition. For other codes, + # propagate the exception. elif e.response.status_code != 416: - # TAXII 2.0 paging can result in a 416 (Range Not Satisfiable) - # if the server isn't sending Content-Range headers, so the - # pager just goes until it runs out of pages. So 416 can't be - # treated as a real error, just an end-of-pages condition. raise # deduplicate data (before filtering as reduces wasted filtering)