diff --git a/stix2/base.py b/stix2/base.py index 2c48ef6..c0e52e6 100644 --- a/stix2/base.py +++ b/stix2/base.py @@ -1,6 +1,7 @@ """Base classes for type definitions in the STIX2 library.""" import copy +import itertools import re import uuid @@ -12,7 +13,7 @@ from stix2.canonicalization.Canonicalize import canonicalize from .exceptions import ( AtLeastOnePropertyError, DependentPropertiesError, ExtraPropertiesError, ImmutableError, InvalidObjRefError, InvalidValueError, - MissingPropertiesError, MutuallyExclusivePropertiesError, + MissingPropertiesError, MutuallyExclusivePropertiesError, STIXError, ) from .markings import _MarkingsMixin from .markings.utils import validate @@ -54,7 +55,7 @@ class _STIXBase(Mapping): return all_properties - def _check_property(self, prop_name, prop, kwargs): + def _check_property(self, prop_name, prop, kwargs, allow_custom): if prop_name not in kwargs: if hasattr(prop, 'default'): value = prop.default() @@ -62,9 +63,12 @@ class _STIXBase(Mapping): value = self.__now kwargs[prop_name] = value + has_custom = False if prop_name in kwargs: try: - kwargs[prop_name] = prop.clean(kwargs[prop_name]) + kwargs[prop_name], has_custom = prop.clean( + kwargs[prop_name], allow_custom, + ) except InvalidValueError: # No point in wrapping InvalidValueError in another # InvalidValueError... so let those propagate. @@ -74,6 +78,8 @@ class _STIXBase(Mapping): self.__class__, prop_name, reason=str(exc), ) from exc + return has_custom + # interproperty constraint methods def _check_mutually_exclusive_properties(self, list_of_properties, at_least_one=True): @@ -113,7 +119,6 @@ class _STIXBase(Mapping): def __init__(self, allow_custom=False, **kwargs): cls = self.__class__ - self._allow_custom = allow_custom # Use the same timestamp for any auto-generated datetimes self.__now = get_timestamp() @@ -123,16 +128,18 @@ class _STIXBase(Mapping): if custom_props and not isinstance(custom_props, dict): raise ValueError("'custom_properties' must be a dictionary") - extra_kwargs = list(set(kwargs) - set(self._properties)) - if extra_kwargs and not self._allow_custom: + extra_kwargs = kwargs.keys() - self._properties.keys() + if extra_kwargs and not allow_custom: raise ExtraPropertiesError(cls, extra_kwargs) - # because allow_custom is true, any extra kwargs are custom - if custom_props or extra_kwargs: - self._allow_custom = True - if isinstance(self, stix2.v21._STIXBase21): - all_custom_prop_names = extra_kwargs - all_custom_prop_names.extend(list(custom_props.keys())) + if custom_props: + # loophole for custom_properties... + allow_custom = True + + all_custom_prop_names = extra_kwargs | custom_props.keys() - \ + self._properties.keys() + if all_custom_prop_names: + if not isinstance(self, stix2.v20._STIXBase20): for prop_name in all_custom_prop_names: if not re.match(PREFIX_21_REGEX, prop_name): raise InvalidValueError( @@ -141,12 +148,11 @@ class _STIXBase(Mapping): ) # Remove any keyword arguments whose value is None or [] (i.e. empty list) - setting_kwargs = {} - props = kwargs.copy() - props.update(custom_props) - for prop_name, prop_value in props.items(): - if prop_value is not None and prop_value != []: - setting_kwargs[prop_name] = prop_value + setting_kwargs = { + k: v + for k, v in itertools.chain(kwargs.items(), custom_props.items()) + if v is not None and v != [] + } # Detect any missing required properties required_properties = set(get_required_properties(self._properties)) @@ -154,8 +160,13 @@ class _STIXBase(Mapping): if missing_kwargs: raise MissingPropertiesError(cls, missing_kwargs) + has_custom = bool(all_custom_prop_names) for prop_name, prop_metadata in self._properties.items(): - self._check_property(prop_name, prop_metadata, setting_kwargs) + temp_custom = self._check_property( + prop_name, prop_metadata, setting_kwargs, allow_custom, + ) + + has_custom = has_custom or temp_custom # Cache defaulted optional properties for serialization defaulted = [] @@ -174,6 +185,22 @@ class _STIXBase(Mapping): self._check_object_constraints() + if allow_custom: + self.__has_custom = has_custom + + else: + # The simple case: our property cleaners are supposed to do their + # job and prevent customizations, so we just set to False. But + # this sanity check is helpful for finding bugs in those clean() + # methods. + if has_custom: + raise STIXError( + "Internal error: a clean() method did not properly enforce " + "allow_custom=False!", + ) + + self.__has_custom = False + def __getitem__(self, key): return self._inner[key] @@ -220,13 +247,16 @@ class _STIXBase(Mapping): if isinstance(self, _Observable): # Assume: valid references in the original object are still valid in the new version new_inner['_valid_refs'] = {'*': '*'} - new_inner['allow_custom'] = self._allow_custom - return cls(**new_inner) + return cls(allow_custom=True, **new_inner) def properties_populated(self): return list(self._inner.keys()) -# Versioning API + @property + def has_custom(self): + return self.__has_custom + + # Versioning API def new_version(self, **kwargs): return _new_version(self, **kwargs) @@ -348,20 +378,21 @@ class _Observable(_STIXBase): if ref_type not in allowed_types: raise InvalidObjRefError(self.__class__, prop_name, "object reference '%s' is of an invalid type '%s'" % (ref, ref_type)) - def _check_property(self, prop_name, prop, kwargs): - super(_Observable, self)._check_property(prop_name, prop, kwargs) - if prop_name not in kwargs: - return + def _check_property(self, prop_name, prop, kwargs, allow_custom): + has_custom = super(_Observable, self)._check_property(prop_name, prop, kwargs, allow_custom) - from .properties import ObjectReferenceProperty - if prop_name.endswith('_ref'): - if isinstance(prop, ObjectReferenceProperty): - ref = kwargs[prop_name] - self._check_ref(ref, prop, prop_name) - elif prop_name.endswith('_refs'): - if isinstance(prop.contained, ObjectReferenceProperty): - for ref in kwargs[prop_name]: + if prop_name in kwargs: + from .properties import ObjectReferenceProperty + if prop_name.endswith('_ref'): + if isinstance(prop, ObjectReferenceProperty): + ref = kwargs[prop_name] self._check_ref(ref, prop, prop_name) + elif prop_name.endswith('_refs'): + if isinstance(prop.contained, ObjectReferenceProperty): + for ref in kwargs[prop_name]: + self._check_ref(ref, prop, prop_name) + + return has_custom def _generate_id(self): """ diff --git a/stix2/properties.py b/stix2/properties.py index dbbe667..85d4e08 100644 --- a/stix2/properties.py +++ b/stix2/properties.py @@ -134,11 +134,23 @@ class Property(object): Subclasses can also define the following functions: - - ``def clean(self, value) -> any:`` - - Return a value that is valid for this property. If ``value`` is not - valid for this property, this will attempt to transform it first. If - ``value`` is not valid and no such transformation is possible, it - should raise an exception. + - ``def clean(self, value, allow_custom) -> (any, has_custom):`` + - Return a value that is valid for this property, and enforce and + detect value customization. If ``value`` is not valid for this + property, you may attempt to transform it first. If ``value`` is not + valid and no such transformation is possible, it must raise an + exception. The method is also responsible for enforcing and + detecting customizations. If allow_custom is False, no customizations + must be allowed. If any are encountered, an exception must be raised + (e.g. CustomContentError). If none are encountered, False must be + returned for has_custom. If allow_custom is True, then the clean() + method is responsible for detecting any customizations in the value + (just because the user has elected to allow customizations doesn't + mean there actually are any). The method must return an appropriate + value for has_custom. Customization may not be applicable/possible + for a property. In that case, allow_custom can be ignored, and + has_custom must be returned as False. + - ``def default(self):`` - provide a default value for this property. - ``default()`` can return the special value ``NOW`` to use the current @@ -159,10 +171,10 @@ class Property(object): """ - def _default_clean(self, value): + def _default_clean(self, value, allow_custom=False): if value != self._fixed_value: raise ValueError("must equal '{}'.".format(self._fixed_value)) - return value + return value, False def __init__(self, required=False, fixed=None, default=None): self.required = required @@ -180,14 +192,8 @@ class Property(object): if default: self.default = default - def clean(self, value): - return value - - def __call__(self, value=None): - """Used by ListProperty to handle lists that have been defined with - either a class or an instance. - """ - return value + def clean(self, value, allow_custom=False): + return value, False class ListProperty(Property): @@ -219,7 +225,7 @@ class ListProperty(Property): super(ListProperty, self).__init__(**kwargs) - def clean(self, value): + def clean(self, value, allow_custom): try: iter(value) except TypeError: @@ -228,21 +234,22 @@ class ListProperty(Property): if isinstance(value, (_STIXBase, str)): value = [value] + result = [] + has_custom = False if isinstance(self.contained, Property): - result = [ - self.contained.clean(item) - for item in value - ] + for item in value: + valid, temp_custom = self.contained.clean(item, allow_custom) + result.append(valid) + has_custom = has_custom or temp_custom else: # self.contained must be a _STIXBase subclass - result = [] for item in value: if isinstance(item, self.contained): valid = item elif isinstance(item, Mapping): # attempt a mapping-like usage... - valid = self.contained(**item) + valid = self.contained(allow_custom=allow_custom, **item) else: raise ValueError( @@ -252,12 +259,16 @@ class ListProperty(Property): ) result.append(valid) + has_custom = has_custom or valid.has_custom + + if not allow_custom and has_custom: + raise CustomContentError("custom content encountered") # STIX spec forbids empty lists if len(result) < 1: raise ValueError("must not be empty.") - return result + return result, has_custom class StringProperty(Property): @@ -265,10 +276,10 @@ class StringProperty(Property): def __init__(self, **kwargs): super(StringProperty, self).__init__(**kwargs) - def clean(self, value): + def clean(self, value, allow_custom=False): if not isinstance(value, str): - return str(value) - return value + value = str(value) + return value, False class TypeProperty(Property): @@ -286,9 +297,9 @@ class IDProperty(Property): self.spec_version = spec_version super(IDProperty, self).__init__() - def clean(self, value): + def clean(self, value, allow_custom=False): _validate_id(value, self.spec_version, self.required_prefix) - return value + return value, False def default(self): return self.required_prefix + str(uuid.uuid4()) @@ -301,7 +312,7 @@ class IntegerProperty(Property): self.max = max super(IntegerProperty, self).__init__(**kwargs) - def clean(self, value): + def clean(self, value, allow_custom=False): try: value = int(value) except Exception: @@ -315,7 +326,7 @@ class IntegerProperty(Property): msg = "maximum value is {}. received {}".format(self.max, value) raise ValueError(msg) - return value + return value, False class FloatProperty(Property): @@ -325,7 +336,7 @@ class FloatProperty(Property): self.max = max super(FloatProperty, self).__init__(**kwargs) - def clean(self, value): + def clean(self, value, allow_custom=False): try: value = float(value) except Exception: @@ -339,29 +350,26 @@ class FloatProperty(Property): msg = "maximum value is {}. received {}".format(self.max, value) raise ValueError(msg) - return value + return value, False class BooleanProperty(Property): + _trues = ['true', 't', '1', 1, True] + _falses = ['false', 'f', '0', 0, False] - def clean(self, value): - if isinstance(value, bool): - return value + def clean(self, value, allow_custom=False): - trues = ['true', 't', '1'] - falses = ['false', 'f', '0'] - try: - if value.lower() in trues: - return True - if value.lower() in falses: - return False - except AttributeError: - if value == 1: - return True - if value == 0: - return False + if isinstance(value, str): + value = value.lower() - raise ValueError("must be a boolean value.") + if value in self._trues: + result = True + elif value in self._falses: + result = False + else: + raise ValueError("must be a boolean value.") + + return result, False class TimestampProperty(Property): @@ -372,10 +380,10 @@ class TimestampProperty(Property): super(TimestampProperty, self).__init__(**kwargs) - def clean(self, value): + def clean(self, value, allow_custom=False): return parse_into_datetime( value, self.precision, self.precision_constraint, - ) + ), False class DictionaryProperty(Property): @@ -384,7 +392,7 @@ class DictionaryProperty(Property): self.spec_version = spec_version super(DictionaryProperty, self).__init__(**kwargs) - def clean(self, value): + def clean(self, value, allow_custom=False): try: dictified = _get_dict(value) except ValueError: @@ -409,7 +417,7 @@ class DictionaryProperty(Property): if len(dictified) < 1: raise ValueError("must not be empty.") - return dictified + return dictified, False HASHES_REGEX = { @@ -433,8 +441,14 @@ HASHES_REGEX = { class HashesProperty(DictionaryProperty): - def clean(self, value): - clean_dict = super(HashesProperty, self).clean(value) + def clean(self, value, allow_custom): + # ignore the has_custom return value here; there is no customization + # of DictionaryProperties. + clean_dict, _ = super(HashesProperty, self).clean( + value, allow_custom, + ) + + has_custom = False for k, v in copy.deepcopy(clean_dict).items(): key = k.upper().replace('-', '') if key in HASHES_REGEX: @@ -446,25 +460,32 @@ class HashesProperty(DictionaryProperty): if k != vocab_key: clean_dict[vocab_key] = clean_dict[k] del clean_dict[k] - return clean_dict + + else: + has_custom = True + + if not allow_custom and has_custom: + raise CustomContentError("custom hash found: " + k) + + return clean_dict, has_custom class BinaryProperty(Property): - def clean(self, value): + def clean(self, value, allow_custom=False): try: base64.b64decode(value) except (binascii.Error, TypeError): raise ValueError("must contain a base64 encoded string") - return value + return value, False class HexProperty(Property): - def clean(self, value): + def clean(self, value, allow_custom=False): if not re.match(r"^([a-fA-F0-9]{2})+$", value): raise ValueError("must contain an even number of hexadecimal characters") - return value + return value, False class ReferenceProperty(Property): @@ -493,31 +514,52 @@ class ReferenceProperty(Property): super(ReferenceProperty, self).__init__(**kwargs) - def clean(self, value): + def clean(self, value, allow_custom): if isinstance(value, _STIXBase): value = value.id value = str(value) - possible_prefix = value[:value.index('--')] + _validate_id(value, self.spec_version, None) + + obj_type = value[:value.index('--')] if self.valid_types: + # allow_custom is not applicable to "whitelist" style object type + # constraints, so we ignore it. + has_custom = False + ref_valid_types = enumerate_types(self.valid_types, self.spec_version) - if possible_prefix in ref_valid_types: - required_prefix = possible_prefix - else: - raise ValueError("The type-specifying prefix '%s' for this property is not valid" % (possible_prefix)) - elif self.invalid_types: + if obj_type not in ref_valid_types: + raise ValueError("The type-specifying prefix '%s' for this property is not valid" % (obj_type)) + + else: + # A type "blacklist" was used to describe legal object types. + # We must enforce the type blacklist regardless of allow_custom. ref_invalid_types = enumerate_types(self.invalid_types, self.spec_version) - if possible_prefix not in ref_invalid_types: - required_prefix = possible_prefix - else: - raise ValueError("An invalid type-specifying prefix '%s' was specified for this property" % (possible_prefix)) + if obj_type in ref_invalid_types: + raise ValueError("An invalid type-specifying prefix '%s' was specified for this property" % (obj_type)) - _validate_id(value, self.spec_version, required_prefix) + # allow_custom=True only allows references to custom objects which + # are not otherwise blacklisted. So we need to figure out whether + # the referenced object is custom or not. No good way to do that + # at present... just check if unregistered and for the "x-" type + # prefix, for now? + type_maps = STIX2_OBJ_MAPS[self.spec_version] - return value + has_custom = obj_type not in type_maps["objects"] \ + and obj_type not in type_maps["observables"] \ + and obj_type not in ["relationship", "sighting"] + + has_custom = has_custom or obj_type.startswith("x-") + + if not allow_custom and has_custom: + raise CustomContentError( + "reference to custom object type: " + obj_type, + ) + + return value, has_custom def enumerate_types(types, spec_version): @@ -529,8 +571,7 @@ def enumerate_types(types, spec_version): once each of those words is being processed, that word will be removed from `return_types`, so as not to mistakenly allow objects to be created of types "SCO", "SDO", or "SRO" """ - return_types = [] - return_types += types + return_types = types[:] if "SDO" in types: return_types.remove("SDO") @@ -550,10 +591,10 @@ SELECTOR_REGEX = re.compile(r"^([a-z0-9_-]{3,250}(\.(\[\d+\]|[a-z0-9_-]{1,250})) class SelectorProperty(Property): - def clean(self, value): + def clean(self, value, allow_custom=False): if not SELECTOR_REGEX.match(value): raise ValueError("must adhere to selector syntax.") - return value + return value, False class ObjectReferenceProperty(StringProperty): @@ -571,12 +612,20 @@ class EmbeddedObjectProperty(Property): self.type = type super(EmbeddedObjectProperty, self).__init__(**kwargs) - def clean(self, value): - if type(value) is dict: - value = self.type(**value) + def clean(self, value, allow_custom): + if isinstance(value, dict): + value = self.type(allow_custom=allow_custom, **value) elif not isinstance(value, self.type): raise ValueError("must be of type {}.".format(self.type.__name__)) - return value + + has_custom = False + if isinstance(value, _STIXBase): + has_custom = value.has_custom + + if not allow_custom and has_custom: + raise CustomContentError("custom content encountered") + + return value, has_custom class EnumProperty(StringProperty): @@ -587,12 +636,14 @@ class EnumProperty(StringProperty): self.allowed = allowed super(EnumProperty, self).__init__(**kwargs) - def clean(self, value): - cleaned_value = super(EnumProperty, self).clean(value) - if cleaned_value not in self.allowed: + def clean(self, value, allow_custom): + cleaned_value, _ = super(EnumProperty, self).clean(value, allow_custom) + has_custom = cleaned_value not in self.allowed + + if not allow_custom and has_custom: raise ValueError("value '{}' is not valid for this enumeration.".format(cleaned_value)) - return cleaned_value + return cleaned_value, has_custom class PatternProperty(StringProperty): @@ -603,12 +654,11 @@ class ObservableProperty(Property): """Property for holding Cyber Observable Objects. """ - def __init__(self, spec_version=DEFAULT_VERSION, allow_custom=False, *args, **kwargs): - self.allow_custom = allow_custom + def __init__(self, spec_version=DEFAULT_VERSION, *args, **kwargs): self.spec_version = spec_version super(ObservableProperty, self).__init__(*args, **kwargs) - def clean(self, value): + def clean(self, value, allow_custom): try: dictified = _get_dict(value) # get deep copy since we are going modify the dict and might @@ -622,28 +672,43 @@ class ObservableProperty(Property): valid_refs = dict((k, v['type']) for (k, v) in dictified.items()) + has_custom = False for key, obj in dictified.items(): parsed_obj = parse_observable( obj, valid_refs, - allow_custom=self.allow_custom, + allow_custom=allow_custom, version=self.spec_version, ) + + if isinstance(parsed_obj, _STIXBase): + has_custom = has_custom or parsed_obj.has_custom + else: + # we get dicts for unregistered custom objects + has_custom = True + + if not allow_custom and has_custom: + if parsed_obj.has_custom: + raise CustomContentError( + "customized {} observable found".format( + parsed_obj["type"], + ), + ) + dictified[key] = parsed_obj - return dictified + return dictified, has_custom class ExtensionsProperty(DictionaryProperty): """Property for representing extensions on Observable objects. """ - def __init__(self, spec_version=DEFAULT_VERSION, allow_custom=False, enclosing_type=None, required=False): - self.allow_custom = allow_custom + def __init__(self, spec_version=DEFAULT_VERSION, enclosing_type=None, required=False): self.enclosing_type = enclosing_type super(ExtensionsProperty, self).__init__(spec_version=spec_version, required=required) - def clean(self, value): + def clean(self, value, allow_custom): try: dictified = _get_dict(value) # get deep copy since we are going modify the dict and might @@ -653,37 +718,47 @@ class ExtensionsProperty(DictionaryProperty): except ValueError: raise ValueError("The extensions property must contain a dictionary") + has_custom = False specific_type_map = STIX2_OBJ_MAPS[self.spec_version]['observable-extensions'].get(self.enclosing_type, {}) for key, subvalue in dictified.items(): if key in specific_type_map: cls = specific_type_map[key] if type(subvalue) is dict: - if self.allow_custom: - subvalue['allow_custom'] = True - dictified[key] = cls(**subvalue) - else: - dictified[key] = cls(**subvalue) + ext = cls(allow_custom=allow_custom, **subvalue) elif type(subvalue) is cls: # If already an instance of an _Extension class, assume it's valid - dictified[key] = subvalue + ext = subvalue else: raise ValueError("Cannot determine extension type.") + + has_custom = has_custom or ext.has_custom + + if not allow_custom and has_custom: + raise CustomContentError( + "custom content found in {} extension".format( + key, + ), + ) + + dictified[key] = ext + else: - if self.allow_custom: + if allow_custom: + has_custom = True dictified[key] = subvalue else: raise CustomContentError("Can't parse unknown extension type: {}".format(key)) - return dictified + + return dictified, has_custom class STIXObjectProperty(Property): - def __init__(self, spec_version=DEFAULT_VERSION, allow_custom=False, *args, **kwargs): - self.allow_custom = allow_custom + def __init__(self, spec_version=DEFAULT_VERSION, *args, **kwargs): self.spec_version = spec_version super(STIXObjectProperty, self).__init__(*args, **kwargs) - def clean(self, value): + def clean(self, value, allow_custom): # Any STIX Object (SDO, SRO, or Marking Definition) can be added to # a bundle with no further checks. if any( @@ -702,7 +777,11 @@ class STIXObjectProperty(Property): "containing objects of a different spec " "version.", ) - return value + + if not allow_custom and value.has_custom: + raise CustomContentError("custom content encountered") + + return value, value.has_custom try: dictified = _get_dict(value) except ValueError: @@ -718,6 +797,22 @@ class STIXObjectProperty(Property): "containing objects of a different spec version.", ) - parsed_obj = parse(dictified, allow_custom=self.allow_custom) + parsed_obj = parse(dictified, allow_custom=allow_custom) - return parsed_obj + if isinstance(parsed_obj, _STIXBase): + has_custom = parsed_obj.has_custom + else: + # we get dicts for unregistered custom objects + has_custom = True + + if not allow_custom and has_custom: + # parse() will ignore the caller's allow_custom=False request if + # the object type is registered and dictified has a + # "custom_properties" key. So we have to do another check here. + raise CustomContentError( + "customized {} object found".format( + parsed_obj["type"], + ), + ) + + return parsed_obj, has_custom diff --git a/stix2/test/test_properties.py b/stix2/test/test_properties.py index dab713e..b8a589c 100644 --- a/stix2/test/test_properties.py +++ b/stix2/test/test_properties.py @@ -4,7 +4,10 @@ import pytest import pytz import stix2 -from stix2.exceptions import ExtraPropertiesError, STIXError +from stix2.base import _STIXBase +from stix2.exceptions import ( + ExtraPropertiesError, STIXError, CustomContentError, +) from stix2.properties import ( BinaryProperty, BooleanProperty, EmbeddedObjectProperty, EnumProperty, FloatProperty, HexProperty, IntegerProperty, ListProperty, Property, @@ -16,8 +19,8 @@ def test_property(): p = Property() assert p.required is False - assert p.clean('foo') == 'foo' - assert p.clean(3) == 3 + assert p.clean('foo') == ('foo', False) + assert p.clean(3) == (3, False) def test_basic_clean(): @@ -47,7 +50,7 @@ def test_property_default(): assert p.default() == 77 -def test_property_fixed(): +def test_fixed_property(): p = Property(fixed="2.0") assert p.clean("2.0") @@ -65,16 +68,16 @@ def test_property_fixed_and_required(): Property(default=lambda: 3, required=True) -def test_list_property(): +def test_list_property_property_type(): p = ListProperty(StringProperty) - assert p.clean(['abc', 'xyz']) + assert p.clean(['abc', 'xyz'], False) with pytest.raises(ValueError): - p.clean([]) + p.clean([], False) def test_list_property_property_type_custom(): - class TestObj(stix2.base._STIXBase): + class TestObj(_STIXBase): _type = "test" _properties = { "foo": StringProperty(), @@ -86,20 +89,24 @@ def test_list_property_property_type_custom(): TestObj(foo="xyz"), ] - assert p.clean(objs_custom) + assert p.clean(objs_custom, True) + + with pytest.raises(CustomContentError): + p.clean(objs_custom, False) dicts_custom = [ {"foo": "abc", "bar": 123}, {"foo": "xyz"}, ] - # no opportunity to set allow_custom=True when using dicts + assert p.clean(dicts_custom, True) + with pytest.raises(ExtraPropertiesError): - p.clean(dicts_custom) + p.clean(dicts_custom, False) def test_list_property_object_type(): - class TestObj(stix2.base._STIXBase): + class TestObj(_STIXBase): _type = "test" _properties = { "foo": StringProperty(), @@ -107,14 +114,14 @@ def test_list_property_object_type(): p = ListProperty(TestObj) objs = [TestObj(foo="abc"), TestObj(foo="xyz")] - assert p.clean(objs) + assert p.clean(objs, False) dicts = [{"foo": "abc"}, {"foo": "xyz"}] - assert p.clean(dicts) + assert p.clean(dicts, False) def test_list_property_object_type_custom(): - class TestObj(stix2.base._STIXBase): + class TestObj(_STIXBase): _type = "test" _properties = { "foo": StringProperty(), @@ -126,16 +133,20 @@ def test_list_property_object_type_custom(): TestObj(foo="xyz"), ] - assert p.clean(objs_custom) + assert p.clean(objs_custom, True) + + with pytest.raises(CustomContentError): + p.clean(objs_custom, False) dicts_custom = [ {"foo": "abc", "bar": 123}, {"foo": "xyz"}, ] - # no opportunity to set allow_custom=True when using dicts + assert p.clean(dicts_custom, True) + with pytest.raises(ExtraPropertiesError): - p.clean(dicts_custom) + p.clean(dicts_custom, False) def test_list_property_bad_element_type(): @@ -144,7 +155,7 @@ def test_list_property_bad_element_type(): def test_list_property_bad_value_type(): - class TestObj(stix2.base._STIXBase): + class TestObj(_STIXBase): _type = "test" _properties = { "foo": StringProperty(), @@ -152,7 +163,7 @@ def test_list_property_bad_value_type(): list_prop = ListProperty(TestObj) with pytest.raises(ValueError): - list_prop.clean([1]) + list_prop.clean([1], False) def test_string_property(): @@ -296,7 +307,7 @@ def test_boolean_property_invalid(value): ) def test_timestamp_property_valid(value): ts_prop = TimestampProperty() - assert ts_prop.clean(value) == dt.datetime(2017, 1, 1, 12, 34, 56, tzinfo=pytz.utc) + assert ts_prop.clean(value) == (dt.datetime(2017, 1, 1, 12, 34, 56, tzinfo=pytz.utc), False) def test_timestamp_property_invalid(): @@ -332,15 +343,21 @@ def test_hex_property(): ) def test_enum_property_valid(value): enum_prop = EnumProperty(value) - assert enum_prop.clean('b') + assert enum_prop.clean('b', False) def test_enum_property_clean(): enum_prop = EnumProperty(['1']) - assert enum_prop.clean(1) == '1' + assert enum_prop.clean(1, False) == ('1', False) def test_enum_property_invalid(): enum_prop = EnumProperty(['a', 'b', 'c']) with pytest.raises(ValueError): - enum_prop.clean('z') + enum_prop.clean('z', False) + + +def test_enum_property_custom(): + enum_prop = EnumProperty(['a', 'b', 'c']) + result = enum_prop.clean("z", True) + assert result == ("z", True) diff --git a/stix2/test/test_workbench.py b/stix2/test/test_workbench.py index 433bf81..84f97a5 100644 --- a/stix2/test/test_workbench.py +++ b/stix2/test/test_workbench.py @@ -369,6 +369,7 @@ def test_workbench_custom_property_object_in_observable_extension(): x_foo='bar', ) artifact = File( + allow_custom=True, name='test', extensions={'ntfs-ext': ntfs}, ) @@ -390,7 +391,6 @@ def test_workbench_custom_property_dict_in_observable_extension(): name='test', extensions={ 'ntfs-ext': { - 'allow_custom': True, 'sid': 1, 'x_foo': 'bar', }, diff --git a/stix2/test/v20/test_bundle.py b/stix2/test/v20/test_bundle.py index ac5d239..990a554 100644 --- a/stix2/test/v20/test_bundle.py +++ b/stix2/test/v20/test_bundle.py @@ -224,7 +224,7 @@ def test_stix_object_property(): prop = stix2.properties.STIXObjectProperty(spec_version='2.0') identity = stix2.v20.Identity(name="test", identity_class="individual") - assert prop.clean(identity) is identity + assert prop.clean(identity, False) == (identity, False) def test_bundle_with_different_spec_objects(): diff --git a/stix2/test/v20/test_custom.py b/stix2/test/v20/test_custom.py index a83bf24..8d237e0 100644 --- a/stix2/test/v20/test_custom.py +++ b/stix2/test/v20/test_custom.py @@ -157,10 +157,11 @@ def test_custom_properties_dict_in_bundled_object(): 'x_foo': 'bar', }, } - bundle = stix2.v20.Bundle(custom_identity) - assert bundle.objects[0].x_foo == "bar" - assert '"x_foo": "bar"' in str(bundle) + # must not succeed: allow_custom was not set to True when creating + # the bundle, so it must reject the customized identity object. + with pytest.raises(InvalidValueError): + stix2.v20.Bundle(custom_identity) def test_custom_property_in_observed_data(): @@ -188,6 +189,7 @@ def test_custom_property_object_in_observable_extension(): x_foo='bar', ) artifact = stix2.v20.File( + allow_custom=True, name='test', extensions={'ntfs-ext': ntfs}, ) @@ -220,7 +222,6 @@ def test_custom_property_dict_in_observable_extension(): name='test', extensions={ 'ntfs-ext': { - 'allow_custom': True, 'sid': 1, 'x_foo': 'bar', }, @@ -384,6 +385,47 @@ def test_custom_object_invalid_type_name(): assert "Invalid type name 'x_new_object':" in str(excinfo.value) +def test_custom_subobject_dict(): + obj_dict = { + "type": "bundle", + "spec_version": "2.0", + "objects": [ + { + "type": "identity", + "name": "alice", + "identity_class": "individual", + "x_foo": 123, + }, + ], + } + + obj = stix2.parse(obj_dict, allow_custom=True) + assert obj["objects"][0]["x_foo"] == 123 + assert obj.has_custom + + with pytest.raises(InvalidValueError): + stix2.parse(obj_dict, allow_custom=False) + + +def test_custom_subobject_obj(): + ident = stix2.v20.Identity( + name="alice", identity_class=123, x_foo=123, allow_custom=True, + ) + + obj_dict = { + "type": "bundle", + "spec_version": "2.0", + "objects": [ident], + } + + obj = stix2.parse(obj_dict, allow_custom=True) + assert obj["objects"][0]["x_foo"] == 123 + assert obj.has_custom + + with pytest.raises(InvalidValueError): + stix2.parse(obj_dict, allow_custom=False) + + def test_parse_custom_object_type(): nt_string = """{ "type": "x-new-type", @@ -898,6 +940,35 @@ def test_parse_observable_with_custom_extension(): assert parsed.extensions['x-new-ext'].property2 == 12 +def test_parse_observable_with_custom_extension_property(): + input_str = """{ + "type": "observed-data", + "first_observed": "1976-09-09T01:50:24.000Z", + "last_observed": "1988-01-18T15:22:10.000Z", + "number_observed": 5, + "objects": { + "0": { + "type": "file", + "name": "cats.png", + "extensions": { + "raster-image-ext": { + "image_height": 1024, + "image_width": 768, + "x-foo": false + } + } + } + } + }""" + + parsed = stix2.parse(input_str, version='2.0', allow_custom=True) + assert parsed.has_custom + assert parsed["objects"]["0"]["extensions"]["raster-image-ext"]["x-foo"] is False + + with pytest.raises(InvalidValueError): + stix2.parse(input_str, version="2.0", allow_custom=False) + + def test_custom_and_spec_extension_mix(): """ Try to make sure that when allow_custom=True, encountering a custom diff --git a/stix2/test/v20/test_properties.py b/stix2/test/v20/test_properties.py index b03879c..3ff207a 100644 --- a/stix2/test/v20/test_properties.py +++ b/stix2/test/v20/test_properties.py @@ -3,14 +3,14 @@ import uuid import pytest import stix2 -import stix2.base from stix2.exceptions import ( AtLeastOnePropertyError, CustomContentError, DictionaryKeyError, + ExtraPropertiesError, ParseError, ) from stix2.properties import ( DictionaryProperty, EmbeddedObjectProperty, ExtensionsProperty, - HashesProperty, IDProperty, ListProperty, ReferenceProperty, - STIXObjectProperty, + HashesProperty, ListProperty, ObservableProperty, ReferenceProperty, + STIXObjectProperty, IDProperty, ) from stix2.v20.common import MarkingProperty @@ -27,7 +27,7 @@ MY_ID = 'my-type--232c9d3f-49fc-4440-bb01-607f638778e7' ], ) def test_id_property_valid(value): - assert ID_PROP.clean(value) == value + assert ID_PROP.clean(value) == (value, False) CONSTANT_IDS = [ @@ -54,7 +54,7 @@ CONSTANT_IDS.extend(constants.RELATIONSHIP_IDS) @pytest.mark.parametrize("value", CONSTANT_IDS) def test_id_property_valid_for_type(value): type = value.split('--', 1)[0] - assert IDProperty(type=type, spec_version="2.0").clean(value) == value + assert IDProperty(type=type, spec_version="2.0").clean(value) == (value, False) def test_id_property_wrong_type(): @@ -80,29 +80,63 @@ def test_id_property_not_a_valid_hex_uuid(value): def test_id_property_default(): default = ID_PROP.default() - assert ID_PROP.clean(default) == default + assert ID_PROP.clean(default) == (default, False) def test_reference_property(): ref_prop = ReferenceProperty(valid_types="my-type", spec_version="2.0") - assert ref_prop.clean("my-type--00000000-0000-4000-8000-000000000000") + assert ref_prop.clean("my-type--00000000-0000-4000-8000-000000000000", False) with pytest.raises(ValueError): - ref_prop.clean("foo") + ref_prop.clean("foo", False) # This is not a valid V4 UUID with pytest.raises(ValueError): - ref_prop.clean("my-type--00000000-0000-0000-0000-000000000000") + ref_prop.clean("my-type--00000000-0000-0000-0000-000000000000", False) -def test_reference_property_specific_type(): +def test_reference_property_whitelist_type(): ref_prop = ReferenceProperty(valid_types="my-type", spec_version="2.0") with pytest.raises(ValueError): - ref_prop.clean("not-my-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf") + ref_prop.clean("not-my-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf", False) - assert ref_prop.clean("my-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf") == \ - "my-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf" + with pytest.raises(ValueError): + ref_prop.clean("not-my-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf", True) + + result = ref_prop.clean("my-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf", True) + assert result == ("my-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf", False) + + result = ref_prop.clean("my-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf", False) + assert result == ("my-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf", False) + + +def test_reference_property_blacklist_type(): + ref_prop = ReferenceProperty(invalid_types="identity", spec_version="2.0") + result = ref_prop.clean( + "malware--8a8e8758-f92c-4058-ba38-f061cd42a0cf", False, + ) + assert result == ("malware--8a8e8758-f92c-4058-ba38-f061cd42a0cf", False) + + result = ref_prop.clean( + "malware--8a8e8758-f92c-4058-ba38-f061cd42a0cf", True, + ) + assert result == ("malware--8a8e8758-f92c-4058-ba38-f061cd42a0cf", False) + + result = ref_prop.clean( + "some-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf", True, + ) + assert result == ("some-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf", True) + + with pytest.raises(ValueError): + ref_prop.clean( + "identity--8a8e8758-f92c-4058-ba38-f061cd42a0cf", False, + ) + + with pytest.raises(ValueError): + ref_prop.clean( + "identity--8a8e8758-f92c-4058-ba38-f061cd42a0cf", True, + ) @pytest.mark.parametrize( @@ -183,7 +217,8 @@ def test_property_list_of_dictionary(): ) def test_hashes_property_valid(value): hash_prop = HashesProperty() - assert hash_prop.clean(value) + _, has_custom = hash_prop.clean(value, False) + assert not has_custom @pytest.mark.parametrize( @@ -196,7 +231,21 @@ def test_hashes_property_invalid(value): hash_prop = HashesProperty() with pytest.raises(ValueError): - hash_prop.clean(value) + hash_prop.clean(value, False) + + +def test_hashes_property_custom(): + value = { + "sha256": "6db12788c37247f2316052e142f42f4b259d6561751e5f401a1ae2a6df9c674b", + "abc-123": "aaaaaaaaaaaaaaaaaaaaa", + } + + hash_prop = HashesProperty() + result = hash_prop.clean(value, True) + assert result == (value, True) + + with pytest.raises(CustomContentError): + hash_prop.clean(value, False) def test_embedded_property(): @@ -206,25 +255,103 @@ def test_embedded_property(): content_disposition="inline", body="Cats are funny!", ) - assert emb_prop.clean(mime) + result = emb_prop.clean(mime, False) + assert result == (mime, False) + + result = emb_prop.clean(mime, True) + assert result == (mime, False) with pytest.raises(ValueError): - emb_prop.clean("string") + emb_prop.clean("string", False) + + +def test_embedded_property_dict(): + emb_prop = EmbeddedObjectProperty(type=stix2.v20.EmailMIMEComponent) + mime = { + "content_type": "text/plain; charset=utf-8", + "content_disposition": "inline", + "body": "Cats are funny!", + } + + result = emb_prop.clean(mime, False) + assert isinstance(result[0], stix2.v20.EmailMIMEComponent) + assert result[0]["body"] == "Cats are funny!" + assert not result[1] + + result = emb_prop.clean(mime, True) + assert isinstance(result[0], stix2.v20.EmailMIMEComponent) + assert result[0]["body"] == "Cats are funny!" + assert not result[1] + + +def test_embedded_property_custom(): + emb_prop = EmbeddedObjectProperty(type=stix2.v20.EmailMIMEComponent) + mime = stix2.v20.EmailMIMEComponent( + content_type="text/plain; charset=utf-8", + content_disposition="inline", + body="Cats are funny!", + foo=123, + allow_custom=True, + ) + + with pytest.raises(CustomContentError): + emb_prop.clean(mime, False) + + result = emb_prop.clean(mime, True) + assert result == (mime, True) + + +def test_embedded_property_dict_custom(): + emb_prop = EmbeddedObjectProperty(type=stix2.v20.EmailMIMEComponent) + mime = { + "content_type": "text/plain; charset=utf-8", + "content_disposition": "inline", + "body": "Cats are funny!", + "foo": 123, + } + + with pytest.raises(ExtraPropertiesError): + emb_prop.clean(mime, False) + + result = emb_prop.clean(mime, True) + assert isinstance(result[0], stix2.v20.EmailMIMEComponent) + assert result[0]["body"] == "Cats are funny!" + assert result[1] def test_extension_property_valid(): ext_prop = ExtensionsProperty(spec_version="2.0", enclosing_type='file') - assert ext_prop({ - 'windows-pebinary-ext': { - 'pe_type': 'exe', - }, - }) + result = ext_prop.clean( + { + 'windows-pebinary-ext': { + 'pe_type': 'exe', + }, + }, False, + ) + + assert isinstance( + result[0]["windows-pebinary-ext"], stix2.v20.WindowsPEBinaryExt, + ) + assert not result[1] + + result = ext_prop.clean( + { + 'windows-pebinary-ext': { + 'pe_type': 'exe', + }, + }, True, + ) + + assert isinstance( + result[0]["windows-pebinary-ext"], stix2.v20.WindowsPEBinaryExt, + ) + assert not result[1] def test_extension_property_invalid1(): ext_prop = ExtensionsProperty(spec_version="2.0", enclosing_type='file') with pytest.raises(ValueError): - ext_prop.clean(1) + ext_prop.clean(1, False) def test_extension_property_invalid2(): @@ -236,8 +363,47 @@ def test_extension_property_invalid2(): 'pe_type': 'exe', }, }, + False, ) + result = ext_prop.clean( + { + 'foobar-ext': { + 'pe_type': 'exe', + }, + }, True, + ) + assert result == ({"foobar-ext": {"pe_type": "exe"}}, True) + + +def test_extension_property_invalid3(): + ext_prop = ExtensionsProperty(spec_version="2.0", enclosing_type='file') + with pytest.raises(ExtraPropertiesError): + ext_prop.clean( + { + 'windows-pebinary-ext': { + 'pe_type': 'exe', + 'abc': 123, + }, + }, + False, + ) + + result = ext_prop.clean( + { + 'windows-pebinary-ext': { + 'pe_type': 'exe', + 'abc': 123, + }, + }, True, + ) + + assert isinstance( + result[0]["windows-pebinary-ext"], stix2.v20.WindowsPEBinaryExt, + ) + assert result[0]["windows-pebinary-ext"]["abc"] == 123 + assert result[1] + def test_extension_property_invalid_type(): ext_prop = ExtensionsProperty(spec_version="2.0", enclosing_type='indicator') @@ -248,6 +414,7 @@ def test_extension_property_invalid_type(): 'pe_type': 'exe', }, }, + False, ) assert "Can't parse unknown extension" in str(excinfo.value) @@ -272,6 +439,116 @@ def test_stix_property_not_compliant_spec(): stix_prop = STIXObjectProperty(spec_version="2.0") with pytest.raises(ValueError) as excinfo: - stix_prop.clean(indicator) + stix_prop.clean(indicator, False) assert "Spec version 2.0 bundles don't yet support containing objects of a different spec version." in str(excinfo.value) + + +def test_observable_property_obj(): + prop = ObservableProperty(spec_version="2.0") + + obs = stix2.v20.File(name="data.dat") + obs_dict = { + "0": obs, + } + + result = prop.clean(obs_dict, False) + assert result[0]["0"] == obs + assert not result[1] + + result = prop.clean(obs_dict, True) + assert result[0]["0"] == obs + assert not result[1] + + +def test_observable_property_dict(): + prop = ObservableProperty(spec_version="2.0") + + obs_dict = { + "0": { + "type": "file", + "name": "data.dat", + }, + } + + result = prop.clean(obs_dict, False) + assert isinstance(result[0]["0"], stix2.v20.File) + assert result[0]["0"]["name"] == "data.dat" + assert not result[1] + + result = prop.clean(obs_dict, True) + assert isinstance(result[0]["0"], stix2.v20.File) + assert result[0]["0"]["name"] == "data.dat" + assert not result[1] + + +def test_observable_property_obj_custom(): + prop = ObservableProperty(spec_version="2.0") + + obs = stix2.v20.File(name="data.dat", foo=True, allow_custom=True) + obs_dict = { + "0": obs, + } + + with pytest.raises(ExtraPropertiesError): + prop.clean(obs_dict, False) + + result = prop.clean(obs_dict, True) + assert result[0]["0"] == obs + assert result[1] + + +def test_observable_property_dict_custom(): + prop = ObservableProperty(spec_version="2.0") + + obs_dict = { + "0": { + "type": "file", + "name": "data.dat", + "foo": True, + }, + } + + with pytest.raises(ExtraPropertiesError): + prop.clean(obs_dict, False) + + result = prop.clean(obs_dict, True) + assert isinstance(result[0]["0"], stix2.v20.File) + assert result[0]["0"]["foo"] + assert result[1] + + +def test_stix_object_property_custom_prop(): + prop = STIXObjectProperty(spec_version="2.0") + + obj_dict = { + "type": "identity", + "name": "alice", + "identity_class": "supergirl", + "foo": "bar", + } + + with pytest.raises(ExtraPropertiesError): + prop.clean(obj_dict, False) + + result = prop.clean(obj_dict, True) + assert isinstance(result[0], stix2.v20.Identity) + assert result[0]["foo"] == "bar" + assert result[1] + + +def test_stix_object_property_custom_obj(): + prop = STIXObjectProperty(spec_version="2.0") + + obj_dict = { + "type": "something", + "abc": 123, + "xyz": ["a", 1], + } + + with pytest.raises(ParseError): + prop.clean(obj_dict, False) + + result = prop.clean(obj_dict, True) + assert result[0] == {"type": "something", "abc": 123, "xyz": ["a", 1]} + assert result[1] diff --git a/stix2/test/v21/test_bundle.py b/stix2/test/v21/test_bundle.py index 1cf30d0..a4777d8 100644 --- a/stix2/test/v21/test_bundle.py +++ b/stix2/test/v21/test_bundle.py @@ -235,7 +235,7 @@ def test_stix_object_property(): prop = stix2.properties.STIXObjectProperty(spec_version='2.1') identity = stix2.v21.Identity(name="test", identity_class="individual") - assert prop.clean(identity) is identity + assert prop.clean(identity, False) == (identity, False) def test_bundle_obj_id_found(): diff --git a/stix2/test/v21/test_custom.py b/stix2/test/v21/test_custom.py index 36e3548..1f86a27 100644 --- a/stix2/test/v21/test_custom.py +++ b/stix2/test/v21/test_custom.py @@ -206,8 +206,10 @@ def test_custom_properties_dict_in_bundled_object(): 'x_foo': 'bar', }, } - bundle = stix2.v21.Bundle(custom_identity) + with pytest.raises(InvalidValueError): + stix2.v21.Bundle(custom_identity) + bundle = stix2.v21.Bundle(custom_identity, allow_custom=True) assert bundle.objects[0].x_foo == "bar" assert '"x_foo": "bar"' in str(bundle) @@ -251,6 +253,7 @@ def test_custom_property_object_in_observable_extension(): x_foo='bar', ) artifact = stix2.v21.File( + allow_custom=True, name='test', extensions={'ntfs-ext': ntfs}, ) @@ -283,7 +286,6 @@ def test_custom_property_dict_in_observable_extension(): name='test', extensions={ 'ntfs-ext': { - 'allow_custom': True, 'sid': 1, 'x_foo': 'bar', }, @@ -506,6 +508,48 @@ def test_custom_object_invalid_type_name(): assert "Invalid type name '7x-new-object':" in str(excinfo.value) +def test_custom_subobject_dict(): + obj_dict = { + "type": "bundle", + "id": "bundle--78d99c4a-4eda-4c59-b264-60807f05d799", + "objects": [ + { + "type": "identity", + "spec_version": "2.1", + "name": "alice", + "identity_class": "individual", + "x_foo": 123, + }, + ], + } + + obj = stix2.parse(obj_dict, allow_custom=True) + assert obj["objects"][0]["x_foo"] == 123 + assert obj.has_custom + + with pytest.raises(InvalidValueError): + stix2.parse(obj_dict, allow_custom=False) + + +def test_custom_subobject_obj(): + ident = stix2.v21.Identity( + name="alice", identity_class=123, x_foo=123, allow_custom=True, + ) + + obj_dict = { + "type": "bundle", + "id": "bundle--78d99c4a-4eda-4c59-b264-60807f05d799", + "objects": [ident], + } + + obj = stix2.parse(obj_dict, allow_custom=True) + assert obj["objects"][0]["x_foo"] == 123 + assert obj.has_custom + + with pytest.raises(InvalidValueError): + stix2.parse(obj_dict, allow_custom=False) + + def test_parse_custom_object_type(): nt_string = """{ "type": "x-new-type", @@ -1117,6 +1161,37 @@ def test_parse_observable_with_custom_extension(): assert parsed.extensions['x-new-ext'].property2 == 12 +def test_parse_observable_with_custom_extension_property(): + input_str = """{ + "type": "observed-data", + "spec_version": "2.1", + "first_observed": "1976-09-09T01:50:24.000Z", + "last_observed": "1988-01-18T15:22:10.000Z", + "number_observed": 5, + "objects": { + "0": { + "type": "file", + "spec_version": "2.1", + "name": "cats.png", + "extensions": { + "raster-image-ext": { + "image_height": 1024, + "image_width": 768, + "x-foo": false + } + } + } + } + }""" + + parsed = stix2.parse(input_str, version='2.1', allow_custom=True) + assert parsed.has_custom + assert parsed["objects"]["0"]["extensions"]["raster-image-ext"]["x-foo"] is False + + with pytest.raises(InvalidValueError): + stix2.parse(input_str, version="2.1", allow_custom=False) + + def test_custom_and_spec_extension_mix(): """ Try to make sure that when allow_custom=True, encountering a custom @@ -1337,3 +1412,42 @@ def test_register_duplicate_observable_extension(): class NewExtension2(): pass assert "cannot be registered again" in str(excinfo.value) + + +def test_register_duplicate_marking(): + with pytest.raises(DuplicateRegistrationError) as excinfo: + @stix2.v21.CustomMarking( + 'x-new-obj', [ + ('property1', stix2.properties.StringProperty(required=True)), + ], + ) + class NewObj2(): + pass + assert "cannot be registered again" in str(excinfo.value) + + +def test_allow_custom_propagation(): + obj_dict = { + "type": "bundle", + "objects": [ + { + "type": "file", + "spec_version": "2.1", + "name": "data.dat", + "extensions": { + "archive-ext": { + "contains_refs": [ + "file--3d4da5f6-31d8-4a66-a172-f31af9bf5238", + "file--4bb16def-cdfc-40d1-b6a4-815de6c60b74", + ], + "x_foo": "bar", + }, + }, + }, + ], + } + + # allow_custom=False at the top level should catch the custom property way + # down in the SCO extension. + with pytest.raises(InvalidValueError): + stix2.parse(obj_dict, allow_custom=False) diff --git a/stix2/test/v21/test_indicator.py b/stix2/test/v21/test_indicator.py index 2b22418..5f99ac5 100644 --- a/stix2/test/v21/test_indicator.py +++ b/stix2/test/v21/test_indicator.py @@ -223,6 +223,7 @@ def test_indicator_with_custom_embedded_objs(): valid_from=epoch, indicator_types=['malicious-activity'], external_references=[ext_ref], + allow_custom=True, ) assert ind.indicator_types == ['malicious-activity'] diff --git a/stix2/test/v21/test_properties.py b/stix2/test/v21/test_properties.py index 36ff858..9914bd1 100644 --- a/stix2/test/v21/test_properties.py +++ b/stix2/test/v21/test_properties.py @@ -3,11 +3,12 @@ import pytest import stix2 from stix2.exceptions import ( AtLeastOnePropertyError, CustomContentError, DictionaryKeyError, + ExtraPropertiesError, ParseError, ) from stix2.properties import ( DictionaryProperty, EmbeddedObjectProperty, ExtensionsProperty, - HashesProperty, IDProperty, ListProperty, ReferenceProperty, - StringProperty, TypeProperty, + HashesProperty, IDProperty, ListProperty, ObservableProperty, + ReferenceProperty, STIXObjectProperty, StringProperty, TypeProperty, ) from stix2.v21.common import MarkingProperty @@ -50,7 +51,7 @@ MY_ID = 'my-type--232c9d3f-49fc-4440-bb01-607f638778e7' ], ) def test_id_property_valid(value): - assert ID_PROP.clean(value) == value + assert ID_PROP.clean(value) == (value, False) CONSTANT_IDS = [ @@ -77,7 +78,7 @@ CONSTANT_IDS.extend(constants.RELATIONSHIP_IDS) @pytest.mark.parametrize("value", CONSTANT_IDS) def test_id_property_valid_for_type(value): type = value.split('--', 1)[0] - assert IDProperty(type=type, spec_version="2.1").clean(value) == value + assert IDProperty(type=type, spec_version="2.1").clean(value) == (value, False) def test_id_property_wrong_type(): @@ -100,29 +101,63 @@ def test_id_property_not_a_valid_hex_uuid(value): def test_id_property_default(): default = ID_PROP.default() - assert ID_PROP.clean(default) == default + assert ID_PROP.clean(default) == (default, False) def test_reference_property(): ref_prop = ReferenceProperty(valid_types="my-type", spec_version="2.1") - assert ref_prop.clean("my-type--00000000-0000-4000-8000-000000000000") + assert ref_prop.clean("my-type--00000000-0000-4000-8000-000000000000", False) with pytest.raises(ValueError): - ref_prop.clean("foo") + ref_prop.clean("foo", False) # This is not a valid RFC 4122 UUID with pytest.raises(ValueError): - ref_prop.clean("my-type--00000000-0000-0000-0000-000000000000") + ref_prop.clean("my-type--00000000-0000-0000-0000-000000000000", False) -def test_reference_property_specific_type(): +def test_reference_property_whitelist_type(): ref_prop = ReferenceProperty(valid_types="my-type", spec_version="2.1") with pytest.raises(ValueError): - ref_prop.clean("not-my-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf") + ref_prop.clean("not-my-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf", False) - assert ref_prop.clean("my-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf") == \ - "my-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf" + with pytest.raises(ValueError): + ref_prop.clean("not-my-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf", True) + + result = ref_prop.clean("my-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf", True) + assert result == ("my-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf", False) + + result = ref_prop.clean("my-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf", False) + assert result == ("my-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf", False) + + +def test_reference_property_blacklist_type(): + ref_prop = ReferenceProperty(invalid_types="identity", spec_version="2.1") + result = ref_prop.clean( + "location--8a8e8758-f92c-4058-ba38-f061cd42a0cf", False, + ) + assert result == ("location--8a8e8758-f92c-4058-ba38-f061cd42a0cf", False) + + result = ref_prop.clean( + "location--8a8e8758-f92c-4058-ba38-f061cd42a0cf", True, + ) + assert result == ("location--8a8e8758-f92c-4058-ba38-f061cd42a0cf", False) + + result = ref_prop.clean( + "some-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf", True, + ) + assert result == ("some-type--8a8e8758-f92c-4058-ba38-f061cd42a0cf", True) + + with pytest.raises(ValueError): + ref_prop.clean( + "identity--8a8e8758-f92c-4058-ba38-f061cd42a0cf", False, + ) + + with pytest.raises(ValueError): + ref_prop.clean( + "identity--8a8e8758-f92c-4058-ba38-f061cd42a0cf", True, + ) @pytest.mark.parametrize( @@ -213,7 +248,11 @@ def test_property_list_of_dictionary(): ) def test_hashes_property_valid(value): hash_prop = HashesProperty() - assert hash_prop.clean(value) + _, has_custom = hash_prop.clean(value, False) + assert not has_custom + + _, has_custom = hash_prop.clean(value, True) + assert not has_custom @pytest.mark.parametrize( @@ -227,7 +266,24 @@ def test_hashes_property_invalid(value): hash_prop = HashesProperty() with pytest.raises(ValueError): - hash_prop.clean(value) + hash_prop.clean(value, False) + + with pytest.raises(ValueError): + hash_prop.clean(value, True) + + +def test_hashes_property_custom(): + value = { + "sha256": "6db12788c37247f2316052e142f42f4b259d6561751e5f401a1ae2a6df9c674b", + "abc-123": "aaaaaaaaaaaaaaaaaaaaa", + } + + hash_prop = HashesProperty() + result = hash_prop.clean(value, True) + assert result == (value, True) + + with pytest.raises(CustomContentError): + hash_prop.clean(value, False) def test_embedded_property(): @@ -237,25 +293,103 @@ def test_embedded_property(): content_disposition="inline", body="Cats are funny!", ) - assert emb_prop.clean(mime) + result = emb_prop.clean(mime, False) + assert result == (mime, False) + + result = emb_prop.clean(mime, True) + assert result == (mime, False) with pytest.raises(ValueError): - emb_prop.clean("string") + emb_prop.clean("string", False) + + +def test_embedded_property_dict(): + emb_prop = EmbeddedObjectProperty(type=stix2.v21.EmailMIMEComponent) + mime = { + "content_type": "text/plain; charset=utf-8", + "content_disposition": "inline", + "body": "Cats are funny!", + } + + result = emb_prop.clean(mime, False) + assert isinstance(result[0], stix2.v21.EmailMIMEComponent) + assert result[0]["body"] == "Cats are funny!" + assert not result[1] + + result = emb_prop.clean(mime, True) + assert isinstance(result[0], stix2.v21.EmailMIMEComponent) + assert result[0]["body"] == "Cats are funny!" + assert not result[1] + + +def test_embedded_property_custom(): + emb_prop = EmbeddedObjectProperty(type=stix2.v21.EmailMIMEComponent) + mime = stix2.v21.EmailMIMEComponent( + content_type="text/plain; charset=utf-8", + content_disposition="inline", + body="Cats are funny!", + foo=123, + allow_custom=True, + ) + + with pytest.raises(CustomContentError): + emb_prop.clean(mime, False) + + result = emb_prop.clean(mime, True) + assert result == (mime, True) + + +def test_embedded_property_dict_custom(): + emb_prop = EmbeddedObjectProperty(type=stix2.v21.EmailMIMEComponent) + mime = { + "content_type": "text/plain; charset=utf-8", + "content_disposition": "inline", + "body": "Cats are funny!", + "foo": 123, + } + + with pytest.raises(ExtraPropertiesError): + emb_prop.clean(mime, False) + + result = emb_prop.clean(mime, True) + assert isinstance(result[0], stix2.v21.EmailMIMEComponent) + assert result[0]["body"] == "Cats are funny!" + assert result[1] def test_extension_property_valid(): ext_prop = ExtensionsProperty(spec_version='2.1', enclosing_type='file') - assert ext_prop({ - 'windows-pebinary-ext': { - 'pe_type': 'exe', - }, - }) + result = ext_prop.clean( + { + 'windows-pebinary-ext': { + 'pe_type': 'exe', + }, + }, False, + ) + + assert isinstance( + result[0]["windows-pebinary-ext"], stix2.v21.WindowsPEBinaryExt, + ) + assert not result[1] + + result = ext_prop.clean( + { + 'windows-pebinary-ext': { + 'pe_type': 'exe', + }, + }, True, + ) + + assert isinstance( + result[0]["windows-pebinary-ext"], stix2.v21.WindowsPEBinaryExt, + ) + assert not result[1] def test_extension_property_invalid1(): ext_prop = ExtensionsProperty(spec_version='2.1', enclosing_type='file') with pytest.raises(ValueError): - ext_prop.clean(1) + ext_prop.clean(1, False) def test_extension_property_invalid2(): @@ -267,8 +401,47 @@ def test_extension_property_invalid2(): 'pe_type': 'exe', }, }, + False, ) + result = ext_prop.clean( + { + 'foobar-ext': { + 'pe_type': 'exe', + }, + }, True, + ) + assert result == ({"foobar-ext": {"pe_type": "exe"}}, True) + + +def test_extension_property_invalid3(): + ext_prop = ExtensionsProperty(spec_version="2.1", enclosing_type='file') + with pytest.raises(ExtraPropertiesError): + ext_prop.clean( + { + 'windows-pebinary-ext': { + 'pe_type': 'exe', + 'abc': 123, + }, + }, + False, + ) + + result = ext_prop.clean( + { + 'windows-pebinary-ext': { + 'pe_type': 'exe', + 'abc': 123, + }, + }, True, + ) + + assert isinstance( + result[0]["windows-pebinary-ext"], stix2.v21.WindowsPEBinaryExt, + ) + assert result[0]["windows-pebinary-ext"]["abc"] == 123 + assert result[1] + def test_extension_property_invalid_type(): ext_prop = ExtensionsProperty(spec_version='2.1', enclosing_type='indicator') @@ -279,6 +452,7 @@ def test_extension_property_invalid_type(): 'pe_type': 'exe', }, }, + False, ) assert "Can't parse unknown extension" in str(excinfo.value) @@ -295,3 +469,115 @@ def test_marking_property_error(): mark_prop.clean('my-marking') assert str(excinfo.value) == "must be a Statement, TLP Marking or a registered marking." + + +def test_observable_property_obj(): + prop = ObservableProperty(spec_version="2.1") + + obs = stix2.v21.File(name="data.dat") + obs_dict = { + "0": obs, + } + + result = prop.clean(obs_dict, False) + assert result[0]["0"] == obs + assert not result[1] + + result = prop.clean(obs_dict, True) + assert result[0]["0"] == obs + assert not result[1] + + +def test_observable_property_dict(): + prop = ObservableProperty(spec_version="2.1") + + obs_dict = { + "0": { + "type": "file", + "name": "data.dat", + }, + } + + result = prop.clean(obs_dict, False) + assert isinstance(result[0]["0"], stix2.v21.File) + assert result[0]["0"]["name"] == "data.dat" + assert not result[1] + + result = prop.clean(obs_dict, True) + assert isinstance(result[0]["0"], stix2.v21.File) + assert result[0]["0"]["name"] == "data.dat" + assert not result[1] + + +def test_observable_property_obj_custom(): + prop = ObservableProperty(spec_version="2.1") + + obs = stix2.v21.File(name="data.dat", foo=True, allow_custom=True) + obs_dict = { + "0": obs, + } + + with pytest.raises(ExtraPropertiesError): + prop.clean(obs_dict, False) + + result = prop.clean(obs_dict, True) + assert result[0]["0"] == obs + assert result[1] + + +def test_observable_property_dict_custom(): + prop = ObservableProperty(spec_version="2.1") + + obs_dict = { + "0": { + "type": "file", + "name": "data.dat", + "foo": True, + }, + } + + with pytest.raises(ExtraPropertiesError): + prop.clean(obs_dict, False) + + result = prop.clean(obs_dict, True) + assert isinstance(result[0]["0"], stix2.v21.File) + assert result[0]["0"]["foo"] + assert result[1] + + +def test_stix_object_property_custom_prop(): + prop = STIXObjectProperty(spec_version="2.1") + + obj_dict = { + "type": "identity", + "spec_version": "2.1", + "name": "alice", + "identity_class": "supergirl", + "foo": "bar", + } + + with pytest.raises(ExtraPropertiesError): + prop.clean(obj_dict, False) + + result = prop.clean(obj_dict, True) + assert isinstance(result[0], stix2.v21.Identity) + assert result[0].has_custom + assert result[0]["foo"] == "bar" + assert result[1] + + +def test_stix_object_property_custom_obj(): + prop = STIXObjectProperty(spec_version="2.1") + + obj_dict = { + "type": "something", + "abc": 123, + "xyz": ["a", 1], + } + + with pytest.raises(ParseError): + prop.clean(obj_dict, False) + + result = prop.clean(obj_dict, True) + assert result[0] == {"type": "something", "abc": 123, "xyz": ["a", 1]} + assert result[1] diff --git a/stix2/v20/bundle.py b/stix2/v20/bundle.py index 6a663d6..c5ca25d 100644 --- a/stix2/v20/bundle.py +++ b/stix2/v20/bundle.py @@ -35,9 +35,6 @@ class Bundle(_STIXBase20): kwargs['objects'] = obj_list + kwargs.get('objects', []) - self._allow_custom = kwargs.get('allow_custom', False) - self._properties['objects'].contained.allow_custom = kwargs.get('allow_custom', False) - super(Bundle, self).__init__(**kwargs) def get_obj(self, obj_uuid): diff --git a/stix2/v20/common.py b/stix2/v20/common.py index 6695c9a..fe151fb 100644 --- a/stix2/v20/common.py +++ b/stix2/v20/common.py @@ -104,9 +104,9 @@ class MarkingProperty(Property): marking-definition objects. """ - def clean(self, value): + def clean(self, value, allow_custom=False): if type(value) in OBJ_MAP_MARKING.values(): - return value + return value, False else: raise ValueError("must be a Statement, TLP Marking or a registered marking.") diff --git a/stix2/v20/sdo.py b/stix2/v20/sdo.py index e1f6410..538558a 100644 --- a/stix2/v20/sdo.py +++ b/stix2/v20/sdo.py @@ -219,12 +219,6 @@ class ObservedData(_DomainObject): ('granular_markings', ListProperty(GranularMarking)), ]) - def __init__(self, *args, **kwargs): - self._allow_custom = kwargs.get('allow_custom', False) - self._properties['objects'].allow_custom = kwargs.get('allow_custom', False) - - super(ObservedData, self).__init__(*args, **kwargs) - class Report(_DomainObject): """For more detailed information on this object's properties, see diff --git a/stix2/v21/bundle.py b/stix2/v21/bundle.py index 5497da5..7ad056d 100644 --- a/stix2/v21/bundle.py +++ b/stix2/v21/bundle.py @@ -32,9 +32,6 @@ class Bundle(_STIXBase21): kwargs['objects'] = obj_list + kwargs.get('objects', []) - self._allow_custom = kwargs.get('allow_custom', False) - self._properties['objects'].contained.allow_custom = kwargs.get('allow_custom', False) - super(Bundle, self).__init__(**kwargs) def get_obj(self, obj_uuid): diff --git a/stix2/v21/common.py b/stix2/v21/common.py index 2980276..af0a758 100644 --- a/stix2/v21/common.py +++ b/stix2/v21/common.py @@ -137,9 +137,9 @@ class MarkingProperty(Property): marking-definition objects. """ - def clean(self, value): + def clean(self, value, allow_custom=False): if type(value) in OBJ_MAP_MARKING.values(): - return value + return value, False else: raise ValueError("must be a Statement, TLP Marking or a registered marking.") diff --git a/stix2/v21/sdo.py b/stix2/v21/sdo.py index 35a878e..d0b6077 100644 --- a/stix2/v21/sdo.py +++ b/stix2/v21/sdo.py @@ -567,8 +567,6 @@ class ObservedData(_DomainObject): ]) def __init__(self, *args, **kwargs): - self._allow_custom = kwargs.get('allow_custom', False) - self._properties['objects'].allow_custom = kwargs.get('allow_custom', False) if "objects" in kwargs: warnings.warn( diff --git a/stix2/versioning.py b/stix2/versioning.py index a6dc0bd..02a1e13 100644 --- a/stix2/versioning.py +++ b/stix2/versioning.py @@ -192,8 +192,9 @@ def new_version(data, allow_custom=None, **kwargs): or dict. :param allow_custom: Whether to allow custom properties on the new object. If True, allow them (regardless of whether the original had custom - properties); if False disallow them; if None, propagate the preference - from the original object. + properties); if False disallow them; if None, auto-detect from the + object: if it has custom properties, allow them in the new version, + otherwise don't allow them. :param kwargs: The properties to change. Setting to None requests property removal. :return: The new object. @@ -271,7 +272,7 @@ def new_version(data, allow_custom=None, **kwargs): # it for dicts. if isinstance(data, stix2.base._STIXBase): if allow_custom is None: - new_obj_inner["allow_custom"] = data._allow_custom + new_obj_inner["allow_custom"] = data.has_custom else: new_obj_inner["allow_custom"] = allow_custom