Merge branch 'master' of github.com:oasis-open/cti-python-stix2 into main

pull/1/head
chrisr3d 2021-08-31 19:02:36 +02:00
commit 8ab39f75f7
4 changed files with 349 additions and 90 deletions

View File

@ -296,19 +296,10 @@ class TAXIICollectionSource(DataSource):
# query TAXII collection
all_data = []
paged_request = tcv21.as_pages if isinstance(self.collection, tcv21.Collection) else tcv20.as_pages
try:
paged_request = tcv21.as_pages if isinstance(self.collection, tcv21.Collection) else tcv20.as_pages
for resource in paged_request(self.collection.get_objects, per_request=self.items_per_page, **taxii_filters_dict):
all_data.extend(resource.get("objects", []))
# deduplicate data (before filtering as reduces wasted filtering)
all_data = deduplicate(all_data)
# apply local (CompositeDataSource, TAXIICollectionSource and query) filters
query.remove(taxii_filters)
all_data = list(apply_common_filters(all_data, query))
except HTTPError as e:
# if resources not found or access is denied from TAXII server, return empty list
if e.response.status_code == 404:
@ -318,6 +309,21 @@ class TAXIICollectionSource(DataSource):
" denied. Received error: ", e,
)
# TAXII 2.0 paging can result in a 416 (Range Not Satisfiable) if
# the server isn't sending Content-Range headers, so the pager just
# goes until it runs out of pages. So 416 can't be treated as a
# real error, just an end-of-pages condition. For other codes,
# propagate the exception.
elif e.response.status_code != 416:
raise
# deduplicate data (before filtering as reduces wasted filtering)
all_data = deduplicate(all_data)
# apply local (CompositeDataSource, TAXIICollectionSource and query) filters
query.remove(taxii_filters)
all_data = list(apply_common_filters(all_data, query))
# parse python STIX objects from the STIX object dicts
stix_objs = [parse(stix_obj_dict, allow_custom=self.allow_custom, version=version) for stix_obj_dict in all_data]

View File

@ -1,11 +1,71 @@
import itertools
import re
from . import registry, version
from .base import _DomainObject
from .exceptions import DuplicateRegistrationError
from .properties import _validate_type
from .utils import PREFIX_21_REGEX, get_class_hierarchy_names
from .properties import (
ListProperty, ObjectReferenceProperty, ReferenceProperty, _validate_type,
)
from .utils import PREFIX_21_REGEX
def _validate_ref_props(props_map, is_observable20=False):
"""
Validate that reference properties contain an expected type.
Properties ending in "_ref/s" need to be instances of specific types to
meet the specification requirements. For 2.0 and 2.1 conformance, these
properties are expected to be implemented with `ReferenceProperty` (or a
subclass thereof), except for the special case of STIX 2.0 observables
which must be implemented with `ObjectReferenceProperty`.
Args:
props_map (mapping): A mapping of STIX object properties to be checked.
is_observable20 (bool): Flag for the STIX 2.0 observables special case.
Raises:
ValueError: If the properties do not conform.
"""
if is_observable20:
ref_prop_type = ObjectReferenceProperty
else:
ref_prop_type = ReferenceProperty
for prop_name, prop_obj in props_map.items():
tail = prop_name.rsplit("_", 1)[-1]
if tail == "ref" and not isinstance(prop_obj, ref_prop_type):
raise ValueError(
f"{prop_name!r} is named like a reference property but is not "
f"a subclass of {ref_prop_type.__name__!r}.",
)
elif tail == "refs" and not (
isinstance(prop_obj, ListProperty)
and isinstance(prop_obj.contained, ref_prop_type)
):
raise ValueError(
f"{prop_name!r} is named like a reference list property but is not "
f"a 'ListProperty' containing a subclass of {ref_prop_type.__name__!r}.",
)
def _validate_props(props_map, version, **kwargs):
"""
Validate that a map of properties is conformant for this STIX `version`.
Args:
props_map (mapping): A mapping of STIX object properties to be checked.
version (str): Which STIX2 version the properties must confirm to.
kwargs (mapping): Arguments to pass on to specific property validators.
Raises:
ValueError: If the properties do not conform.
"""
# Confirm conformance with STIX 2.1+ requirements for property names
if version != "2.0":
for prop_name, prop_value in props_map.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
# Confirm conformance of reference properties
_validate_ref_props(props_map, **kwargs)
def _register_object(new_type, version=version.DEFAULT_VERSION):
@ -29,15 +89,10 @@ def _register_object(new_type, version=version.DEFAULT_VERSION):
new_type.__name__,
)
properties = new_type._properties
if not version:
version = version.DEFAULT_VERSION
if version == "2.1":
for prop_name, prop in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character" % prop_name)
_validate_props(new_type._properties, version)
OBJ_MAP = registry.STIX2_OBJ_MAPS[version]['objects']
if new_type._type in OBJ_MAP.keys():
@ -54,19 +109,12 @@ def _register_marking(new_marking, version=version.DEFAULT_VERSION):
None, use latest version.
"""
mark_type = new_marking._type
properties = new_marking._properties
if not version:
version = version.DEFAULT_VERSION
mark_type = new_marking._type
_validate_type(mark_type, version)
if version == "2.1":
for prop_name, prop_value in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
_validate_props(new_marking._properties, version)
OBJ_MAP_MARKING = registry.STIX2_OBJ_MAPS[version]['markings']
if mark_type in OBJ_MAP_MARKING.keys():
@ -83,49 +131,13 @@ def _register_observable(new_observable, version=version.DEFAULT_VERSION):
None, use latest version.
"""
properties = new_observable._properties
if not version:
version = version.DEFAULT_VERSION
if version == "2.0":
# If using STIX2.0, check properties ending in "_ref/s" are ObjectReferenceProperties
for prop_name, prop in properties.items():
if prop_name.endswith('_ref') and ('ObjectReferenceProperty' not in get_class_hierarchy_names(prop)):
raise ValueError(
"'%s' is named like an object reference property but "
"is not an ObjectReferenceProperty." % prop_name,
)
elif (
prop_name.endswith('_refs') and (
'ListProperty' not in get_class_hierarchy_names(prop) or
'ObjectReferenceProperty' not in get_class_hierarchy_names(prop.contained)
)
):
raise ValueError(
"'%s' is named like an object reference list property but "
"is not a ListProperty containing ObjectReferenceProperty." % prop_name,
)
else:
# If using STIX2.1 (or newer...), check properties ending in "_ref/s" are ReferenceProperties
for prop_name, prop in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
elif prop_name.endswith('_ref') and ('ReferenceProperty' not in get_class_hierarchy_names(prop)):
raise ValueError(
"'%s' is named like a reference property but "
"is not a ReferenceProperty." % prop_name,
)
elif (
prop_name.endswith('_refs') and (
'ListProperty' not in get_class_hierarchy_names(prop) or
'ReferenceProperty' not in get_class_hierarchy_names(prop.contained)
)
):
raise ValueError(
"'%s' is named like a reference list property but "
"is not a ListProperty containing ReferenceProperty." % prop_name,
)
_validate_props(
new_observable._properties, version,
is_observable20=(version == "2.0"),
)
OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[version]['observables']
if new_observable._type in OBJ_MAP_OBSERVABLE.keys():
@ -146,20 +158,7 @@ def _register_extension(
"""
ext_type = new_extension._type
# Need to check both toplevel and nested properties
prop_groups = [new_extension._properties]
if hasattr(new_extension, "_toplevel_properties"):
prop_groups.append(new_extension._toplevel_properties)
prop_names = itertools.chain.from_iterable(prop_groups)
_validate_type(ext_type, version)
if not new_extension._properties:
raise ValueError(
"Invalid extension: must define at least one property: " +
ext_type,
)
if version == "2.1":
if not (ext_type.endswith('-ext') or ext_type.startswith('extension-definition--')):
raise ValueError(
@ -167,9 +166,23 @@ def _register_extension(
ext_type,
)
for prop_name in prop_names:
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
tl_props = getattr(new_extension, "_toplevel_properties", None)
if any((
# There must always be at least one property in an extension. This
# holds for instances of both custom object extensions which must
# contain one or more custom properties, and `extension-definition`s
# which must contain an `extension_type` property.
not new_extension._properties,
# If a top-level properties mapping is provided, it cannot be empty
tl_props is not None and not tl_props,
)):
raise ValueError(
"Invalid extension: must define at least one property: " +
ext_type,
)
# We need to validate all properties related to this extension
combined_props = dict(new_extension._properties, **(tl_props or dict()))
_validate_props(combined_props, version)
EXT_MAP = registry.STIX2_OBJ_MAPS[version]['extensions']

View File

@ -385,6 +385,84 @@ def test_custom_object_invalid_type_name():
assert "Invalid type name 'x_new_object':" in str(excinfo.value)
def test_custom_object_ref_property_as_identifier():
@stix2.v20.CustomObject(
'x-new-obj-with-ref', [
('property_ref', stix2.properties.ReferenceProperty(invalid_types=[])),
],
)
class NewObs():
pass
def test_custom_object_refs_property_containing_identifiers():
@stix2.v20.CustomObject(
'x-new-obj-with-refs', [
('property_refs', stix2.properties.ListProperty(stix2.properties.ReferenceProperty(invalid_types=[]))),
],
)
class NewObs():
pass
def test_custom_object_ref_property_as_objectref():
with pytest.raises(ValueError, match=r"not a subclass of 'ReferenceProperty"):
@stix2.v20.CustomObject(
'x-new-obj-with-objref', [
('property_ref', stix2.properties.ObjectReferenceProperty()),
],
)
class NewObs():
pass
def test_custom_object_refs_property_containing_objectrefs():
with pytest.raises(ValueError, match=r"not a 'ListProperty' containing a subclass of 'ReferenceProperty"):
@stix2.v20.CustomObject(
'x-new-obj-with-objrefs', [
('property_refs', stix2.properties.ListProperty(stix2.properties.ObjectReferenceProperty())),
],
)
class NewObs():
pass
def test_custom_object_invalid_ref_property():
with pytest.raises(ValueError) as excinfo:
@stix2.v20.CustomObject(
'x-new-obj', [
('property_ref', stix2.properties.StringProperty()),
],
)
class NewObs():
pass
assert "is named like a reference property but is not" in str(excinfo.value)
def test_custom_object_invalid_refs_property():
with pytest.raises(ValueError) as excinfo:
@stix2.v20.CustomObject(
'x-new-obj', [
('property_refs', stix2.properties.StringProperty()),
],
)
class NewObs():
pass
assert "is named like a reference list property but is not" in str(excinfo.value)
def test_custom_object_invalid_refs_list_property():
with pytest.raises(ValueError) as excinfo:
@stix2.v20.CustomObject(
'x-new-obj', [
('property_refs', stix2.properties.ListProperty(stix2.properties.StringProperty)),
],
)
class NewObs():
pass
assert "is named like a reference list property but is not" in str(excinfo.value)
def test_custom_subobject_dict():
obj_dict = {
"type": "bundle",
@ -562,6 +640,48 @@ def test_custom_observable_object_invalid_type_name():
assert "Invalid type name 'x_new_obs':" in str(excinfo.value)
def test_custom_observable_object_ref_property_as_identifier():
with pytest.raises(ValueError, match=r"not a subclass of 'ObjectReferenceProperty"):
@stix2.v20.CustomObservable(
'x-new-obs-with-ref', [
('property_ref', stix2.properties.ReferenceProperty(invalid_types=[])),
],
)
class NewObs():
pass
def test_custom_observable_object_refs_property_containing_identifiers():
with pytest.raises(ValueError, match=r"not a 'ListProperty' containing a subclass of 'ObjectReferenceProperty"):
@stix2.v20.CustomObservable(
'x-new-obs-with-refs', [
('property_refs', stix2.properties.ListProperty(stix2.properties.ReferenceProperty(invalid_types=[]))),
],
)
class NewObs():
pass
def test_custom_observable_object_ref_property_as_objectref():
@stix2.v20.CustomObservable(
'x-new-obs-with-objref', [
('property_ref', stix2.properties.ObjectReferenceProperty()),
],
)
class NewObs():
pass
def test_custom_observable_object_refs_property_containing_objectrefs():
@stix2.v20.CustomObservable(
'x-new-obs-with-objrefs', [
('property_refs', stix2.properties.ListProperty(stix2.properties.ObjectReferenceProperty())),
],
)
class NewObs():
pass
def test_custom_observable_object_invalid_ref_property():
with pytest.raises(ValueError) as excinfo:
@stix2.v20.CustomObservable(
@ -571,7 +691,7 @@ def test_custom_observable_object_invalid_ref_property():
)
class NewObs():
pass
assert "is named like an object reference property but is not an ObjectReferenceProperty" in str(excinfo.value)
assert "is named like a reference property but is not" in str(excinfo.value)
def test_custom_observable_object_invalid_refs_property():
@ -583,7 +703,7 @@ def test_custom_observable_object_invalid_refs_property():
)
class NewObs():
pass
assert "is named like an object reference list property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value)
assert "is named like a reference list property but is not" in str(excinfo.value)
def test_custom_observable_object_invalid_refs_list_property():
@ -595,7 +715,7 @@ def test_custom_observable_object_invalid_refs_list_property():
)
class NewObs():
pass
assert "is named like an object reference list property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value)
assert "is named like a reference list property but is not" in str(excinfo.value)
def test_custom_observable_object_invalid_valid_refs():

View File

@ -538,6 +538,84 @@ def test_custom_object_invalid_type_name():
assert "Invalid type name '7x-new-object':" in str(excinfo.value)
def test_custom_object_ref_property_containing_identifier():
@stix2.v21.CustomObject(
'x-new-obj-with-ref', [
('property_ref', stix2.properties.ReferenceProperty(invalid_types=[])),
],
)
class NewObs():
pass
def test_custom_object_refs_property_containing_identifiers():
@stix2.v21.CustomObject(
'x-new-obj-with-refs', [
('property_refs', stix2.properties.ListProperty(stix2.properties.ReferenceProperty(invalid_types=[]))),
],
)
class NewObs():
pass
def test_custom_object_ref_property_containing_objectref():
with pytest.raises(ValueError, match=r"not a subclass of 'ReferenceProperty"):
@stix2.v21.CustomObject(
'x-new-obj-with-objref', [
('property_ref', stix2.properties.ObjectReferenceProperty()),
],
)
class NewObs():
pass
def test_custom_object_refs_property_containing_objectrefs():
with pytest.raises(ValueError, match=r"not a 'ListProperty' containing a subclass of 'ReferenceProperty"):
@stix2.v21.CustomObject(
'x-new-obj-with-objrefs', [
('property_refs', stix2.properties.ListProperty(stix2.properties.ObjectReferenceProperty())),
],
)
class NewObs():
pass
def test_custom_object_invalid_ref_property():
with pytest.raises(ValueError) as excinfo:
@stix2.v21.CustomObject(
'x-new-obj', [
('property_ref', stix2.properties.StringProperty()),
],
)
class NewObs():
pass
assert "is named like a reference property but is not" in str(excinfo.value)
def test_custom_object_invalid_refs_property():
with pytest.raises(ValueError) as excinfo:
@stix2.v21.CustomObject(
'x-new-obj', [
('property_refs', stix2.properties.StringProperty()),
],
)
class NewObs():
pass
assert "is named like a reference list property but is not" in str(excinfo.value)
def test_custom_object_invalid_refs_list_property():
with pytest.raises(ValueError) as excinfo:
@stix2.v21.CustomObject(
'x-new-obj', [
('property_refs', stix2.properties.ListProperty(stix2.properties.StringProperty)),
],
)
class NewObs():
pass
assert "is named like a reference list property but is not" in str(excinfo.value)
def test_custom_subobject_dict():
obj_dict = {
"type": "bundle",
@ -730,6 +808,48 @@ def test_custom_observable_object_invalid_type_name():
assert "Invalid type name '7x-new-obs':" in str(excinfo.value)
def test_custom_observable_object_ref_property_as_identifier():
@stix2.v21.CustomObservable(
'x-new-obs-with-ref', [
('property_ref', stix2.properties.ReferenceProperty(invalid_types=[])),
],
)
class NewObs():
pass
def test_custom_observable_object_refs_property_containing_identifiers():
@stix2.v21.CustomObservable(
'x-new-obs-with-refs', [
('property_refs', stix2.properties.ListProperty(stix2.properties.ReferenceProperty(invalid_types=[]))),
],
)
class NewObs():
pass
def test_custom_observable_object_ref_property_as_objectref():
with pytest.raises(ValueError, match=r"not a subclass of 'ReferenceProperty"):
@stix2.v21.CustomObservable(
'x-new-obs-with-objref', [
('property_ref', stix2.properties.ObjectReferenceProperty()),
],
)
class NewObs():
pass
def test_custom_observable_object_refs_property_containing_objectrefs():
with pytest.raises(ValueError, match=r"not a 'ListProperty' containing a subclass of 'ReferenceProperty"):
@stix2.v21.CustomObservable(
'x-new-obs-with-objrefs', [
('property_refs', stix2.properties.ListProperty(stix2.properties.ObjectReferenceProperty())),
],
)
class NewObs():
pass
def test_custom_observable_object_invalid_ref_property():
with pytest.raises(ValueError) as excinfo:
@stix2.v21.CustomObservable(
@ -739,7 +859,7 @@ def test_custom_observable_object_invalid_ref_property():
)
class NewObs():
pass
assert "is named like a reference property but is not a ReferenceProperty" in str(excinfo.value)
assert "is named like a reference property but is not" in str(excinfo.value)
def test_custom_observable_object_invalid_refs_property():
@ -751,7 +871,7 @@ def test_custom_observable_object_invalid_refs_property():
)
class NewObs():
pass
assert "is named like a reference list property but is not a ListProperty containing ReferenceProperty" in str(excinfo.value)
assert "is named like a reference list property but is not" in str(excinfo.value)
def test_custom_observable_object_invalid_refs_list_property():
@ -763,7 +883,7 @@ def test_custom_observable_object_invalid_refs_list_property():
)
class NewObs():
pass
assert "is named like a reference list property but is not a ListProperty containing ReferenceProperty" in str(excinfo.value)
assert "is named like a reference list property but is not" in str(excinfo.value)
def test_custom_no_properties_raises_exception():