Improve versioning.new_version() to better handle custom objects

and dicts, and add better raised exception types if versioning
couldn't be done.  I changed workbench monkeypatching a bit, to
copy some class attributes over to the workbench wrapper
class-like callables, since some code expected those attributes
to be there (e.g. the versioning code).
pull/1/head
Michael Chisholm 2020-08-24 16:57:40 -04:00
parent 5e2d888d1c
commit c74d06aadc
5 changed files with 316 additions and 79 deletions

View File

@ -175,7 +175,11 @@ class ImmutableError(STIXError):
return msg.format(self)
class UnmodifiablePropertyError(STIXError):
class VersioningError(STIXError):
pass
class UnmodifiablePropertyError(VersioningError):
"""Attempted to modify an unmodifiable property of object when creating a new version."""
def __init__(self, unchangable_properties):
@ -187,6 +191,32 @@ class UnmodifiablePropertyError(STIXError):
return msg.format(", ".join(self.unchangable_properties))
class TypeNotVersionableError(VersioningError):
def __init__(self, obj):
if isinstance(obj, dict):
type_name = obj.get("type")
else:
# try standard attribute of _STIXBase subclasses/instances
type_name = getattr(obj, "_type", None)
self.object = obj
msg = "Object type{}is not versionable. Try a dictionary or " \
"instance of an SDO or SRO class.".format(
" '{}' ".format(type_name) if type_name else " ",
)
super().__init__(msg)
class ObjectNotVersionableError(VersioningError):
def __init__(self, obj):
self.object = obj
msg = "Creating a new object version requires at least the 'created'" \
" property: " + str(obj)
super().__init__(msg)
class RevokeError(STIXError):
"""Attempted an operation on a revoked object."""

View File

@ -170,6 +170,60 @@ def test_versioning_error_dict_bad_modified_value():
"but have the same id and modified timestamp do not have defined consumer behavior."
def test_versioning_dict_unregistered_no_modified():
d = {
"type": "not-registered",
"id": "not-registered--4da54535-47b7-468c-88fa-d13b04033c4b",
"created": "1995-04-07T15:37:48.178Z",
}
new_d = stix2.versioning.new_version(d)
assert "modified" in new_d
assert new_d["modified"] > stix2.utils.parse_into_datetime(d["created"])
new_d = stix2.versioning.new_version(d, modified="1996-11-20T01:19:29.134Z")
assert new_d["modified"] == "1996-11-20T01:19:29.134Z"
def test_versioning_dict_unregistered_unversionable():
d = {
"type": "not-registered",
"id": "not-registered--4da54535-47b7-468c-88fa-d13b04033c4b",
"modified": "1995-04-07T15:37:48.178Z",
}
with pytest.raises(stix2.exceptions.ObjectNotVersionableError):
stix2.versioning.new_version(d)
with pytest.raises(stix2.exceptions.ObjectNotVersionableError):
# should fail even if we provide a "created" kwarg.
stix2.versioning.new_version(d, created="1985-06-29T06:09:51.157Z")
def test_versioning_custom_object():
@stix2.v20.CustomObject(
"x-versionable-all-optional-20", [
("created", stix2.properties.TimestampProperty()),
("modified", stix2.properties.TimestampProperty()),
("revoked", stix2.properties.BooleanProperty()),
],
)
class CustomSDO:
pass
obj = CustomSDO(created="1990-12-18T17:56:11.346234Z")
new_obj = stix2.versioning.new_version(obj)
assert "modified" in new_obj
assert new_obj["modified"] > new_obj["created"]
obj = CustomSDO()
with pytest.raises(stix2.exceptions.ObjectNotVersionableError):
# fails due to insufficient properties on the object, even though its
# type supports versioning.
stix2.versioning.new_version(obj)
def test_versioning_error_dict_no_modified_value():
campaign_v1 = {
'type': 'campaign',
@ -184,10 +238,10 @@ def test_versioning_error_dict_no_modified_value():
def test_making_new_version_invalid_cls():
campaign_v1 = "This is a campaign."
with pytest.raises(ValueError) as excinfo:
with pytest.raises(stix2.exceptions.TypeNotVersionableError) as excinfo:
stix2.versioning.new_version(campaign_v1, name="fred")
assert 'cannot create new version of object of this type' in str(excinfo.value)
assert excinfo.value.object is campaign_v1
def test_revoke_dict():
@ -206,7 +260,7 @@ def test_revoke_dict():
def test_revoke_unversionable():
sco = stix2.v20.File(name="data.txt")
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
sco.revoke()
@ -277,7 +331,7 @@ def test_version_unversionable_dict():
"name": "data.txt",
}
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(f)
@ -295,10 +349,10 @@ def test_version_sco_with_modified():
"modified": "1991-05-13T19:24:57Z",
}
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(file_sco, name="newname.txt")
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.revoke(file_sco)
file_sco_obj = stix2.v20.File(
@ -307,10 +361,10 @@ def test_version_sco_with_modified():
modified="1991-05-13T19:24:57Z",
)
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(file_sco_obj, name="newname.txt")
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.revoke(file_sco_obj)
@ -337,6 +391,21 @@ def test_version_sco_with_custom():
revoked_obj = stix2.versioning.revoke(new_file_sco_obj)
assert revoked_obj.revoked
# Same thing with a dict
d = {
"type": "file",
"name": "data.txt",
"created": "1973-11-23T02:31:37Z",
"modified": "1991-05-13T19:24:57Z",
"revoked": False,
}
new_d = stix2.versioning.new_version(d, size=1234)
assert new_d["size"] == 1234
revoked_d = stix2.versioning.revoke(new_d)
assert revoked_d["revoked"]
def test_version_disable_custom():
m = stix2.v20.Malware(

View File

@ -4,6 +4,7 @@ import pytest
import stix2
import stix2.exceptions
import stix2.properties
import stix2.utils
import stix2.v21
import stix2.versioning
@ -179,6 +180,62 @@ def test_versioning_error_dict_bad_modified_value():
"but have the same id and modified timestamp do not have defined consumer behavior."
def test_versioning_dict_unregistered_no_modified():
d = {
"type": "not-registered",
"id": "not-registered--4da54535-47b7-468c-88fa-d13b04033c4b",
"spec_version": "2.1",
"created": "1995-04-07T15:37:48.178Z",
}
new_d = stix2.versioning.new_version(d)
assert "modified" in new_d
assert new_d["modified"] > stix2.utils.parse_into_datetime(d["created"])
new_d = stix2.versioning.new_version(d, modified="1996-11-20T01:19:29.134Z")
assert new_d["modified"] == "1996-11-20T01:19:29.134Z"
def test_versioning_dict_unregistered_unversionable():
d = {
"type": "not-registered",
"id": "not-registered--4da54535-47b7-468c-88fa-d13b04033c4b",
"spec_version": "2.1",
"modified": "1995-04-07T15:37:48.178Z",
}
with pytest.raises(stix2.exceptions.ObjectNotVersionableError):
stix2.versioning.new_version(d)
with pytest.raises(stix2.exceptions.ObjectNotVersionableError):
# should fail even if we provide a "created" kwarg.
stix2.versioning.new_version(d, created="1985-06-29T06:09:51.157Z")
def test_versioning_custom_object():
@stix2.v21.CustomObject(
"x-versionable-all-optional-21", [
("created", stix2.properties.TimestampProperty()),
("modified", stix2.properties.TimestampProperty()),
("revoked", stix2.properties.BooleanProperty()),
],
)
class CustomSDO:
pass
obj = CustomSDO(created="1990-12-18T17:56:11.346234Z")
new_obj = stix2.versioning.new_version(obj)
assert "modified" in new_obj
assert new_obj["modified"] > new_obj["created"]
obj = CustomSDO()
with pytest.raises(stix2.exceptions.ObjectNotVersionableError):
# fails due to insufficient properties on the object, even though its
# type supports versioning.
stix2.versioning.new_version(obj)
def test_versioning_error_dict_no_modified_value():
campaign_v1 = {
'type': 'campaign',
@ -193,10 +250,10 @@ def test_versioning_error_dict_no_modified_value():
def test_making_new_version_invalid_cls():
campaign_v1 = "This is a campaign."
with pytest.raises(ValueError) as excinfo:
with pytest.raises(stix2.exceptions.TypeNotVersionableError) as excinfo:
stix2.versioning.new_version(campaign_v1, name="fred")
assert 'cannot create new version of object of this type' in str(excinfo.value)
assert excinfo.value.object is campaign_v1
def test_revoke_dict():
@ -216,7 +273,7 @@ def test_revoke_dict():
def test_revoke_unversionable():
sco = stix2.v21.File(name="data.txt")
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
sco.revoke()
@ -318,7 +375,7 @@ def test_version_unversionable_dict():
"name": "data.txt",
}
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(f)
@ -345,6 +402,23 @@ def test_version_sco_with_custom():
revoked_obj = stix2.versioning.revoke(new_file_sco_obj)
assert revoked_obj.revoked
# Same thing with a dict
d = {
"type": "file",
"id": "file--d287f10a-98b4-4a47-8fa0-64b12695ea58",
"spec_version": "2.1",
"name": "data.txt",
"created": "1973-11-23T02:31:37Z",
"modified": "1991-05-13T19:24:57Z",
"revoked": False,
}
new_d = stix2.versioning.new_version(d, size=1234)
assert new_d["size"] == 1234
revoked_d = stix2.versioning.revoke(new_d)
assert revoked_d["revoked"]
def test_version_sco_id_contributing_properties():
file_sco_obj = stix2.v21.File(

View File

@ -9,13 +9,13 @@ import uuid
import stix2.base
import stix2.registry
from stix2.utils import (
detect_spec_version, get_timestamp, is_sco, is_sdo, is_sro,
parse_into_datetime,
detect_spec_version, get_timestamp, is_sco, parse_into_datetime,
)
import stix2.v20
from .exceptions import (
InvalidValueError, RevokeError, UnmodifiablePropertyError,
InvalidValueError, ObjectNotVersionableError, RevokeError,
TypeNotVersionableError, UnmodifiablePropertyError,
)
# STIX object properties that cannot be modified
@ -56,27 +56,16 @@ def _fudge_modified(old_modified, new_modified, use_stix21):
return new_modified
def _is_versionable(data):
def _get_stix_version(data):
"""
Determine whether the given object is versionable. This check is done on
the basis of support for three properties for the object type: "created",
"modified", and "revoked". If all three are supported, the object is
versionable; otherwise it is not. Dicts must have a "type" property whose
value is for a registered object type. This is used to determine a
complete set of supported properties for the type.
Bit of factored out functionality for getting/detecting the STIX version
of the given value.
Also, detect whether it represents a STIX 2.1 or greater spec version.
:param data: The object to check. Must be either a stix object, or a dict
with a "type" property.
:return: A 2-tuple of bools: the first is True if the object is versionable
and False if not; the second is True if the object is STIX 2.1+ and
False if not.
:param data: An object, e.g. _STIXBase instance or dict
:return: The STIX version as a string in "X.Y" notation, or None if the
version could not be determined.
"""
is_versionable = False
stix_version = None
if isinstance(data, Mapping):
# First, determine spec version. It's easy for our stix2 objects; more
@ -88,36 +77,112 @@ def _is_versionable(data):
elif isinstance(data, dict):
stix_version = detect_spec_version(data)
return stix_version
def _is_versionable_type(data):
"""
Determine whether type of the given object is versionable. This check is
done on the basis of support for three properties for the object type:
"created", "modified", and "revoked". If all three are supported, the
object type is versionable; otherwise it is not. Dicts must have a "type"
property. This is used in STIX version detection and to determine a
complete set of supported properties for the type.
If a dict is passed whose "type" is unregistered, then this library has no
knowledge of the type. It can't determine what properties are "supported".
This function will be lax and treat the type as versionable.
Note that this support check is not sufficient for creating a new object
version. Support for the versioning properties does not mean that
sufficient properties are actually present on the object.
Also, detect whether it represents a STIX 2.1 or greater spec version.
:param data: The object to check. Must be either a stix object, or a dict
with a "type" property.
:return: A 2-tuple: the first element is True if the object is versionable
and False if not; the second is the STIX version as a string in "X.Y"
notation.
"""
is_versionable = False
stix_version = None
if isinstance(data, Mapping):
# First, determine spec version
stix_version = _get_stix_version(data)
# Then, determine versionability.
if isinstance(data, stix2.base._STIXBase):
is_versionable = _VERSIONING_PROPERTIES.issubset(
data._properties,
)
# This should be sufficient for STIX objects; maybe we get lucky with
# dicts here but probably not.
if data.keys() >= _VERSIONING_PROPERTIES:
is_versionable = True
# Tougher to handle dicts. We need to consider STIX version, map to a
# registered class, and from that get a more complete picture of its
# properties.
elif isinstance(data, dict):
obj_type = data["type"]
# Tougher to handle dicts. We need to consider STIX version,
# map to a registered class, and from that get a more complete
# picture of its properties.
if is_sdo(obj_type, stix_version) or is_sro(obj_type, stix_version):
# Should we bother checking properties for SDOs/SROs?
# They were designed to be versionable.
is_versionable = True
elif is_sco(obj_type, stix_version):
# but do check SCOs
cls = stix2.registry.class_for_type(
obj_type, stix_version, "observables",
)
cls = stix2.registry.class_for_type(data["type"], stix_version)
if cls:
is_versionable = _VERSIONING_PROPERTIES.issubset(
cls._properties,
)
else:
# The type is not registered, so we have no knowledge of
# what properties are supported. Let's be lax and let them
# version it.
is_versionable = True
return is_versionable, stix_version
def _check_versionable_object(data):
"""
Determine whether there are or may be sufficient properties present on
an object to allow versioning. Raises an exception if the object can't be
versioned.
Also detect STIX spec version.
:param data: The object to check, e.g. dict with a "type" property, or
_STIXBase instance
:return: True if the object is STIX 2.1+, or False if not
:raises TypeNotVersionableError: If the object didn't have the versioning
properties and the type was found to not support them
:raises ObjectNotVersionableError: If the type was found to support
versioning but there were insufficient properties on the object
"""
if isinstance(data, Mapping):
if data.keys() >= _VERSIONING_PROPERTIES:
# If the properties all already exist in the object, assume they
# are either supported by the type, or are custom properties, and
# allow versioning.
stix_version = _get_stix_version(data)
else:
is_versionable_type, stix_version = _is_versionable_type(data)
if is_versionable_type:
# The type supports the versioning properties (or we don't
# recognize it and just assume it does). The question shifts
# to whether the object has sufficient properties to create a
# new version. Just require "created" for now. We need at
# least that as a starting point for new version timestamps.
is_versionable = "created" in data
if not is_versionable:
raise ObjectNotVersionableError(data)
else:
raise TypeNotVersionableError(data)
else:
raise TypeNotVersionableError(data)
return stix_version
def new_version(data, allow_custom=None, **kwargs):
"""
Create a new version of a STIX object, by modifying properties and
@ -134,13 +199,7 @@ def new_version(data, allow_custom=None, **kwargs):
:return: The new object.
"""
is_versionable, stix_version = _is_versionable(data)
if not is_versionable:
raise ValueError(
"cannot create new version of object of this type! "
"Try a dictionary or instance of an SDO or SRO class.",
)
stix_version = _check_versionable_object(data)
if data.get('revoked'):
raise RevokeError("new_version")
@ -178,36 +237,34 @@ def new_version(data, allow_custom=None, **kwargs):
# to know which rules to apply.
precision_constraint = "min" if stix_version == "2.1" else "exact"
old_modified = data.get("modified") or data.get("created")
old_modified = parse_into_datetime(
old_modified, precision="millisecond",
precision_constraint=precision_constraint,
)
cls = type(data)
if 'modified' not in kwargs:
old_modified = parse_into_datetime(
data["modified"], precision="millisecond",
precision_constraint=precision_constraint,
)
new_modified = get_timestamp()
new_modified = _fudge_modified(
old_modified, new_modified, stix_version == "2.1",
)
kwargs['modified'] = new_modified
elif 'modified' in data:
old_modified_property = parse_into_datetime(
data.get('modified'), precision='millisecond',
precision_constraint=precision_constraint,
)
new_modified_property = parse_into_datetime(
if 'modified' in kwargs:
new_modified = parse_into_datetime(
kwargs['modified'], precision='millisecond',
precision_constraint=precision_constraint,
)
if new_modified_property <= old_modified_property:
if new_modified <= old_modified:
raise InvalidValueError(
cls, 'modified',
"The new modified datetime cannot be before than or equal to the current modified datetime."
"It cannot be equal, as according to STIX 2 specification, objects that are different "
"but have the same id and modified timestamp do not have defined consumer behavior.",
)
else:
new_modified = get_timestamp()
new_modified = _fudge_modified(
old_modified, new_modified, stix_version!="2.0"
)
kwargs['modified'] = new_modified
new_obj_inner.update(kwargs)
# Set allow_custom appropriately if versioning an object. We will ignore

View File

@ -161,6 +161,13 @@ def _setup_workbench():
new_class = type(obj_type.__name__, (obj_type,), new_class_dict)
factory_func = functools.partial(_environ.create, new_class)
# Copy over some class attributes that other code expects to find
factory_func._type = obj_type._type
factory_func._properties = obj_type._properties
if hasattr(obj_type, "_id_contributing_properties"):
factory_func._id_contributing_properties = \
obj_type._id_contributing_properties
# Add our new "class" to this module's globals and to the library-wide
# mapping. This allows parse() to use the wrapped classes.
globals()[obj_type.__name__] = factory_func