Bug fixes, hackage removal, and some pre-commit stylistic
fixes. - Fixed bugged logic in _STIXBase._check_at_least_one_property(), and revamped the code to be simpler and clearer. - Changed custom extension registration to auto-create an "extension_type" property based on the attribute of that name on the custom class, if present. - The custom extension registration change above uncovered what seemed like a bug in a unit test: a custom extension was registered, but it was not given an extension type. The test used the extension as extension_type="property-extension"; this now causes a standard error about an extra property. I fixed the test to assign the custom extension the proper type.pull/1/head
parent
c7b4840232
commit
d87718c15c
|
@ -78,20 +78,34 @@ class _STIXBase(collections.abc.Mapping):
|
||||||
if count > 1 or (at_least_one and count == 0):
|
if count > 1 or (at_least_one and count == 0):
|
||||||
raise MutuallyExclusivePropertiesError(self.__class__, list_of_properties)
|
raise MutuallyExclusivePropertiesError(self.__class__, list_of_properties)
|
||||||
|
|
||||||
def _check_at_least_one_property(self, list_of_properties=None):
|
def _check_at_least_one_property(self, properties_checked=None):
|
||||||
if not list_of_properties:
|
"""
|
||||||
list_of_properties = sorted(self.__class__._properties.keys())
|
Check whether one or more of the given properties are present.
|
||||||
|
|
||||||
|
:param properties_checked: An iterable of the names of the properties
|
||||||
|
of interest, or None to check against a default list. The default
|
||||||
|
list includes all properties defined on the object, with some
|
||||||
|
hard-coded exceptions.
|
||||||
|
:raises AtLeastOnePropertyError: If none of the given properties are
|
||||||
|
present.
|
||||||
|
"""
|
||||||
|
if properties_checked is None:
|
||||||
|
property_exceptions = {"extensions", "type"}
|
||||||
if isinstance(self, _Observable):
|
if isinstance(self, _Observable):
|
||||||
props_to_remove = {"type", "id", "defanged", "spec_version"}
|
property_exceptions |= {"id", "defanged", "spec_version"}
|
||||||
else:
|
|
||||||
props_to_remove = {"type"}
|
|
||||||
|
|
||||||
list_of_properties = [prop for prop in list_of_properties if prop not in props_to_remove]
|
properties_checked = self._properties.keys() - property_exceptions
|
||||||
current_properties = self.properties_populated()
|
|
||||||
list_of_properties_populated = set(list_of_properties).intersection(current_properties)
|
|
||||||
|
|
||||||
if list_of_properties and (not list_of_properties_populated or list_of_properties_populated == {'extensions'}):
|
elif not isinstance(properties_checked, set):
|
||||||
raise AtLeastOnePropertyError(self.__class__, list_of_properties)
|
properties_checked = set(properties_checked)
|
||||||
|
|
||||||
|
if properties_checked:
|
||||||
|
properties_checked_assigned = properties_checked & self.keys()
|
||||||
|
|
||||||
|
if not properties_checked_assigned:
|
||||||
|
raise AtLeastOnePropertyError(
|
||||||
|
self.__class__, properties_checked
|
||||||
|
)
|
||||||
|
|
||||||
def _check_properties_dependency(self, list_of_properties, list_of_dependent_properties):
|
def _check_properties_dependency(self, list_of_properties, list_of_dependent_properties):
|
||||||
failed_dependency_pairs = []
|
failed_dependency_pairs = []
|
||||||
|
@ -118,8 +132,6 @@ class _STIXBase(collections.abc.Mapping):
|
||||||
raise ValueError("'custom_properties' must be a dictionary")
|
raise ValueError("'custom_properties' must be a dictionary")
|
||||||
|
|
||||||
extra_kwargs = kwargs.keys() - self._properties.keys()
|
extra_kwargs = kwargs.keys() - self._properties.keys()
|
||||||
if extra_kwargs and issubclass(cls, stix2.v21._Extension):
|
|
||||||
extra_kwargs = {prop for prop in extra_kwargs if prop != 'extension_type'}
|
|
||||||
|
|
||||||
if extra_kwargs and not allow_custom:
|
if extra_kwargs and not allow_custom:
|
||||||
ext_found = False
|
ext_found = False
|
||||||
|
@ -129,7 +141,7 @@ class _STIXBase(collections.abc.Mapping):
|
||||||
for key_id, ext_def in kwargs.get('extensions', {}).items():
|
for key_id, ext_def in kwargs.get('extensions', {}).items():
|
||||||
if (
|
if (
|
||||||
key_id.startswith('extension-definition--') and
|
key_id.startswith('extension-definition--') and
|
||||||
ext_def.get('extension_type', '') == 'toplevel-property-extension'
|
ext_def.get('extension_type') == 'toplevel-property-extension'
|
||||||
):
|
):
|
||||||
ext_found = True
|
ext_found = True
|
||||||
break
|
break
|
||||||
|
|
|
@ -793,8 +793,8 @@ class ExtensionsProperty(DictionaryProperty):
|
||||||
else:
|
else:
|
||||||
raise TypeError(
|
raise TypeError(
|
||||||
"Can't create extension '{}' from {}.".format(
|
"Can't create extension '{}' from {}.".format(
|
||||||
key, type(subvalue)
|
key, type(subvalue),
|
||||||
)
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
has_custom = has_custom or ext.has_custom
|
has_custom = has_custom or ext.has_custom
|
||||||
|
@ -818,7 +818,7 @@ class ExtensionsProperty(DictionaryProperty):
|
||||||
|
|
||||||
if key.startswith('extension-definition--'):
|
if key.startswith('extension-definition--'):
|
||||||
_validate_id(
|
_validate_id(
|
||||||
key, self.spec_version, 'extension-definition--'
|
key, self.spec_version, 'extension-definition--',
|
||||||
)
|
)
|
||||||
elif allow_custom:
|
elif allow_custom:
|
||||||
has_custom = True
|
has_custom = True
|
||||||
|
|
|
@ -1117,6 +1117,7 @@ def test_process_example_empty_error():
|
||||||
assert excinfo.value.cls == stix2.v20.Process
|
assert excinfo.value.cls == stix2.v20.Process
|
||||||
properties_of_process = list(stix2.v20.Process._properties.keys())
|
properties_of_process = list(stix2.v20.Process._properties.keys())
|
||||||
properties_of_process.remove("type")
|
properties_of_process.remove("type")
|
||||||
|
properties_of_process.remove("extensions")
|
||||||
assert excinfo.value.properties == sorted(properties_of_process)
|
assert excinfo.value.properties == sorted(properties_of_process)
|
||||||
msg = "At least one of the ({1}) properties for {0} must be populated."
|
msg = "At least one of the ({1}) properties for {0} must be populated."
|
||||||
msg = msg.format(
|
msg = msg.format(
|
||||||
|
|
|
@ -1237,9 +1237,9 @@ def test_unregistered_new_style_extension():
|
||||||
"extension-definition--31adb724-a9a4-44b6-8ec2-fd4b181c9507": {
|
"extension-definition--31adb724-a9a4-44b6-8ec2-fd4b181c9507": {
|
||||||
"extension-type": "property-extension",
|
"extension-type": "property-extension",
|
||||||
"a": 1,
|
"a": 1,
|
||||||
"b": True
|
"b": True,
|
||||||
}
|
},
|
||||||
}
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
f = stix2.parse(f_dict, allow_custom=False)
|
f = stix2.parse(f_dict, allow_custom=False)
|
||||||
|
@ -1508,7 +1508,7 @@ def test_registered_embedded_extension_passes_with_allow_custom_false():
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
class ExtensionFoo1:
|
class ExtensionFoo1:
|
||||||
pass
|
extension_type = "property-extension"
|
||||||
|
|
||||||
indicator = stix2.v21.Indicator(
|
indicator = stix2.v21.Indicator(
|
||||||
id='indicator--e97bfccf-8970-4a3c-9cd1-5b5b97ed5d0c',
|
id='indicator--e97bfccf-8970-4a3c-9cd1-5b5b97ed5d0c',
|
||||||
|
|
|
@ -1218,8 +1218,8 @@ def test_process_example_empty_error():
|
||||||
stix2.v21.Process()
|
stix2.v21.Process()
|
||||||
|
|
||||||
assert excinfo.value.cls == stix2.v21.Process
|
assert excinfo.value.cls == stix2.v21.Process
|
||||||
properties_of_process = list(stix2.v21.Process._properties.keys())
|
properties_of_process = stix2.v21.Process._properties.keys()
|
||||||
properties_of_process = [prop for prop in properties_of_process if prop not in ["type", "id", "defanged", "spec_version"]]
|
properties_of_process -= {"type", "id", "defanged", "spec_version", "extensions"}
|
||||||
assert excinfo.value.properties == sorted(properties_of_process)
|
assert excinfo.value.properties == sorted(properties_of_process)
|
||||||
msg = "At least one of the ({1}) properties for {0} must be populated."
|
msg = "At least one of the ({1}) properties for {0} must be populated."
|
||||||
msg = msg.format(
|
msg = msg.format(
|
||||||
|
|
|
@ -29,17 +29,6 @@ class _Observable(_Observable, _STIXBase21):
|
||||||
class _Extension(_Extension, _STIXBase21):
|
class _Extension(_Extension, _STIXBase21):
|
||||||
extension_type = None
|
extension_type = None
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
|
||||||
super(_Extension, self).__init__(**kwargs)
|
|
||||||
if getattr(self, "extension_type", None):
|
|
||||||
self._inner["extension_type"] = self.extension_type
|
|
||||||
|
|
||||||
def _check_at_least_one_property(self, list_of_properties=None):
|
|
||||||
new_ext_check = getattr(self, "extension_type", None)
|
|
||||||
|
|
||||||
if new_ext_check is None:
|
|
||||||
super(_Extension, self)._check_at_least_one_property(list_of_properties=list_of_properties)
|
|
||||||
|
|
||||||
|
|
||||||
class _DomainObject(_DomainObject, _STIXBase21):
|
class _DomainObject(_DomainObject, _STIXBase21):
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -14,7 +14,7 @@ from ..properties import (
|
||||||
)
|
)
|
||||||
from ..utils import NOW, _get_dict
|
from ..utils import NOW, _get_dict
|
||||||
from .base import _STIXBase21
|
from .base import _STIXBase21
|
||||||
from .vocab import HASHING_ALGORITHM, EXTENSION_TYPE
|
from .vocab import EXTENSION_TYPE, HASHING_ALGORITHM
|
||||||
|
|
||||||
|
|
||||||
class ExternalReference(_STIXBase21):
|
class ExternalReference(_STIXBase21):
|
||||||
|
|
|
@ -6,6 +6,7 @@ _Observable and do not have a ``_type`` attribute.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
from collections.abc import Mapping
|
||||||
import itertools
|
import itertools
|
||||||
|
|
||||||
from ..custom import _custom_extension_builder, _custom_observable_builder
|
from ..custom import _custom_extension_builder, _custom_observable_builder
|
||||||
|
@ -20,7 +21,7 @@ from ..properties import (
|
||||||
from .base import _Extension, _Observable, _STIXBase21
|
from .base import _Extension, _Observable, _STIXBase21
|
||||||
from .common import GranularMarking
|
from .common import GranularMarking
|
||||||
from .vocab import (
|
from .vocab import (
|
||||||
ACCOUNT_TYPE, ENCRYPTION_ALGORITHM, HASHING_ALGORITHM,
|
ACCOUNT_TYPE, ENCRYPTION_ALGORITHM, EXTENSION_TYPE, HASHING_ALGORITHM,
|
||||||
NETWORK_SOCKET_ADDRESS_FAMILY, NETWORK_SOCKET_TYPE,
|
NETWORK_SOCKET_ADDRESS_FAMILY, NETWORK_SOCKET_TYPE,
|
||||||
WINDOWS_INTEGRITY_LEVEL, WINDOWS_PEBINARY_TYPE, WINDOWS_REGISTRY_DATATYPE,
|
WINDOWS_INTEGRITY_LEVEL, WINDOWS_PEBINARY_TYPE, WINDOWS_REGISTRY_DATATYPE,
|
||||||
WINDOWS_SERVICE_START_TYPE, WINDOWS_SERVICE_STATUS, WINDOWS_SERVICE_TYPE,
|
WINDOWS_SERVICE_START_TYPE, WINDOWS_SERVICE_STATUS, WINDOWS_SERVICE_TYPE,
|
||||||
|
@ -901,5 +902,21 @@ def CustomExtension(type='x-custom-observable-ext', properties=None):
|
||||||
"""Custom STIX Object Extension decorator.
|
"""Custom STIX Object Extension decorator.
|
||||||
"""
|
"""
|
||||||
def wrapper(cls):
|
def wrapper(cls):
|
||||||
|
|
||||||
|
# Auto-create an "extension_type" property from the class attribute, if
|
||||||
|
# it exists.
|
||||||
|
extension_type = getattr(cls, "extension_type", None)
|
||||||
|
if extension_type:
|
||||||
|
extension_type_prop = EnumProperty(
|
||||||
|
EXTENSION_TYPE,
|
||||||
|
required=False,
|
||||||
|
fixed=extension_type,
|
||||||
|
)
|
||||||
|
|
||||||
|
if isinstance(properties, Mapping):
|
||||||
|
properties["extension_type"] = extension_type_prop
|
||||||
|
else:
|
||||||
|
properties.append(("extension_type", extension_type_prop))
|
||||||
|
|
||||||
return _custom_extension_builder(cls, type, properties, '2.1', _Extension)
|
return _custom_extension_builder(cls, type, properties, '2.1', _Extension)
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
|
@ -99,7 +99,7 @@ EXTENSION_TYPE = [
|
||||||
EXTENSION_TYPE_NEW_SCO,
|
EXTENSION_TYPE_NEW_SCO,
|
||||||
EXTENSION_TYPE_NEW_SRO,
|
EXTENSION_TYPE_NEW_SRO,
|
||||||
EXTENSION_TYPE_PROPERTY_EXTENSION,
|
EXTENSION_TYPE_PROPERTY_EXTENSION,
|
||||||
EXTENSION_TYPE_TOPLEVEL_PROPERTY_EXTENSION
|
EXTENSION_TYPE_TOPLEVEL_PROPERTY_EXTENSION,
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue