Fix allowing custom observables and extensions
parent
b633fd3785
commit
9ef5b395a8
|
@ -10,7 +10,7 @@ from .exceptions import (AtLeastOnePropertyError, DependentPropertiesError,
|
||||||
ExtraPropertiesError, ImmutableError,
|
ExtraPropertiesError, ImmutableError,
|
||||||
InvalidObjRefError, InvalidValueError,
|
InvalidObjRefError, InvalidValueError,
|
||||||
MissingPropertiesError,
|
MissingPropertiesError,
|
||||||
MutuallyExclusivePropertiesError)
|
MutuallyExclusivePropertiesError, ParseError)
|
||||||
from .markings.utils import validate
|
from .markings.utils import validate
|
||||||
from .utils import NOW, find_property_index, format_datetime, get_timestamp
|
from .utils import NOW, find_property_index, format_datetime, get_timestamp
|
||||||
from .utils import new_version as _new_version
|
from .utils import new_version as _new_version
|
||||||
|
@ -49,7 +49,7 @@ class _STIXBase(collections.Mapping):
|
||||||
|
|
||||||
return all_properties
|
return all_properties
|
||||||
|
|
||||||
def _check_property(self, prop_name, prop, kwargs):
|
def _check_property(self, prop_name, prop, kwargs, allow_custom=False):
|
||||||
if prop_name not in kwargs:
|
if prop_name not in kwargs:
|
||||||
if hasattr(prop, 'default'):
|
if hasattr(prop, 'default'):
|
||||||
value = prop.default()
|
value = prop.default()
|
||||||
|
@ -61,6 +61,8 @@ class _STIXBase(collections.Mapping):
|
||||||
try:
|
try:
|
||||||
kwargs[prop_name] = prop.clean(kwargs[prop_name])
|
kwargs[prop_name] = prop.clean(kwargs[prop_name])
|
||||||
except ValueError as exc:
|
except ValueError as exc:
|
||||||
|
if allow_custom and isinstance(exc, ParseError):
|
||||||
|
return
|
||||||
raise InvalidValueError(self.__class__, prop_name, reason=str(exc))
|
raise InvalidValueError(self.__class__, prop_name, reason=str(exc))
|
||||||
|
|
||||||
# interproperty constraint methods
|
# interproperty constraint methods
|
||||||
|
@ -125,7 +127,11 @@ class _STIXBase(collections.Mapping):
|
||||||
raise MissingPropertiesError(cls, missing_kwargs)
|
raise MissingPropertiesError(cls, missing_kwargs)
|
||||||
|
|
||||||
for prop_name, prop_metadata in cls._properties.items():
|
for prop_name, prop_metadata in cls._properties.items():
|
||||||
self._check_property(prop_name, prop_metadata, setting_kwargs)
|
try:
|
||||||
|
self._check_property(prop_name, prop_metadata, setting_kwargs, allow_custom)
|
||||||
|
except ParseError as err:
|
||||||
|
if not allow_custom:
|
||||||
|
raise err
|
||||||
|
|
||||||
self._inner = setting_kwargs
|
self._inner = setting_kwargs
|
||||||
|
|
||||||
|
@ -244,8 +250,8 @@ class _Observable(_STIXBase):
|
||||||
if ref_type not in allowed_types:
|
if ref_type not in allowed_types:
|
||||||
raise InvalidObjRefError(self.__class__, prop_name, "object reference '%s' is of an invalid type '%s'" % (ref, ref_type))
|
raise InvalidObjRefError(self.__class__, prop_name, "object reference '%s' is of an invalid type '%s'" % (ref, ref_type))
|
||||||
|
|
||||||
def _check_property(self, prop_name, prop, kwargs):
|
def _check_property(self, prop_name, prop, kwargs, allow_custom=False):
|
||||||
super(_Observable, self)._check_property(prop_name, prop, kwargs)
|
super(_Observable, self)._check_property(prop_name, prop, kwargs, allow_custom)
|
||||||
if prop_name not in kwargs:
|
if prop_name not in kwargs:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
|
@ -617,6 +617,23 @@ def test_parse_observable_with_unregistered_custom_extension():
|
||||||
assert not isinstance(parsed_ob['extensions']['x-foobar-ext'], stix2.core._STIXBase)
|
assert not isinstance(parsed_ob['extensions']['x-foobar-ext'], stix2.core._STIXBase)
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_observable_with_unregistered_custom_extension_dict():
|
||||||
|
input_dict = {
|
||||||
|
"type": "domain-name",
|
||||||
|
"value": "example.com",
|
||||||
|
"extensions": {
|
||||||
|
"x-foobar-ext": {
|
||||||
|
"property1": "foo",
|
||||||
|
"property2": 12
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
with pytest.raises(ValueError) as excinfo:
|
||||||
|
stix2.v20.observables.DomainName(**input_dict)
|
||||||
|
assert "Can't parse unknown extension type" in str(excinfo.value)
|
||||||
|
|
||||||
|
|
||||||
def test_register_custom_object():
|
def test_register_custom_object():
|
||||||
# Not the way to register custom object.
|
# Not the way to register custom object.
|
||||||
class CustomObject2(object):
|
class CustomObject2(object):
|
||||||
|
|
|
@ -67,7 +67,7 @@ class ExtensionsProperty(DictionaryProperty):
|
||||||
else:
|
else:
|
||||||
raise ValueError("Cannot determine extension type.")
|
raise ValueError("Cannot determine extension type.")
|
||||||
else:
|
else:
|
||||||
raise ValueError("The key used in the extensions dictionary is not an extension type name")
|
raise ParseError("Can't parse unknown extension type: {}".format(key))
|
||||||
else:
|
else:
|
||||||
raise ValueError("The enclosing type '%s' has no extensions defined" % self.enclosing_type)
|
raise ValueError("The enclosing type '%s' has no extensions defined" % self.enclosing_type)
|
||||||
return dictified
|
return dictified
|
||||||
|
@ -936,7 +936,7 @@ def parse_observable(data, _valid_refs=None, allow_custom=False):
|
||||||
ext_class = EXT_MAP[obj['type']][name]
|
ext_class = EXT_MAP[obj['type']][name]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
if not allow_custom:
|
if not allow_custom:
|
||||||
raise ParseError("Can't parse Unknown extension type '%s' for observable type '%s'!" % (name, obj['type']))
|
raise ParseError("Can't parse unknown extension type '%s' for observable type '%s'!" % (name, obj['type']))
|
||||||
else: # extension was found
|
else: # extension was found
|
||||||
obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name])
|
obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name])
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue