cti-python-stix2/stix2/versioning.py

336 lines
12 KiB
Python

"""STIX2 core versioning methods."""
from collections.abc import Mapping
import copy
import datetime as dt
import itertools
import uuid
import stix2.base
import stix2.registry
from stix2.utils import (
detect_spec_version, get_timestamp, is_sco, parse_into_datetime,
)
import stix2.v20
from .exceptions import (
InvalidValueError, ObjectNotVersionableError, RevokeError,
TypeNotVersionableError, UnmodifiablePropertyError,
)
# STIX object properties that cannot be modified
STIX_UNMOD_PROPERTIES = ['created', 'created_by_ref', 'id', 'type']
_VERSIONING_PROPERTIES = {"created", "modified", "revoked"}
def _fudge_modified(old_modified, new_modified, use_stix21):
"""
Ensures a new modified timestamp is newer than the old. When they are
too close together, new_modified must be pushed further ahead to ensure
it is distinct and later, after JSON serialization (which may mean it's
actually being pushed a little ways into the future). JSON serialization
can remove precision, which can cause distinct timestamps to accidentally
become equal, if we're not careful.
:param old_modified: A previous "modified" timestamp, as a datetime object
:param new_modified: A candidate new "modified" timestamp, as a datetime
object
:param use_stix21: Whether to use STIX 2.1+ versioning timestamp precision
rules (boolean). This is important so that we are aware of how
timestamp precision will be truncated, so we know how close together
the timestamps can be, and how far ahead to potentially push the new
one.
:return: A suitable new "modified" timestamp. This may be different from
what was passed in, if it had to be pushed ahead.
"""
if use_stix21:
# 2.1+: we can use full precision
if new_modified <= old_modified:
new_modified = old_modified + dt.timedelta(microseconds=1)
else:
# 2.0: we must use millisecond precision
one_ms = dt.timedelta(milliseconds=1)
if new_modified - old_modified < one_ms:
new_modified = old_modified + one_ms
return new_modified
def _get_stix_version(data):
"""
Bit of factored out functionality for getting/detecting the STIX version
of the given value.
:param data: An object, e.g. _STIXBase instance or dict
:return: The STIX version as a string in "X.Y" notation, or None if the
version could not be determined.
"""
stix_version = None
if isinstance(data, Mapping):
# First, determine spec version. It's easy for our stix2 objects; more
# work for dicts.
if isinstance(data, stix2.v20._STIXBase20):
stix_version = "2.0"
elif isinstance(data, stix2.v21._STIXBase21):
stix_version = "2.1"
elif isinstance(data, dict):
stix_version = detect_spec_version(data)
return stix_version
def _is_versionable_type(data):
"""
Determine whether type of the given object is versionable. This check is
done on the basis of support for three properties for the object type:
"created", "modified", and "revoked". If all three are supported, the
object type is versionable; otherwise it is not. Dicts must have a "type"
property. This is used in STIX version detection and to determine a
complete set of supported properties for the type.
If a dict is passed whose "type" is unregistered, then this library has no
knowledge of the type. It can't determine what properties are "supported".
This function will be lax and treat the type as versionable.
Note that this support check is not sufficient for creating a new object
version. Support for the versioning properties does not mean that
sufficient properties are actually present on the object.
Also, detect whether it represents a STIX 2.1 or greater spec version.
:param data: The object to check. Must be either a stix object, or a dict
with a "type" property.
:return: A 2-tuple: the first element is True if the object is versionable
and False if not; the second is the STIX version as a string in "X.Y"
notation.
"""
is_versionable = False
stix_version = None
if isinstance(data, Mapping):
# First, determine spec version
stix_version = _get_stix_version(data)
# Then, determine versionability.
if isinstance(data, stix2.base._STIXBase):
is_versionable = _VERSIONING_PROPERTIES.issubset(
data._properties,
)
elif isinstance(data, dict):
# Tougher to handle dicts. We need to consider STIX version,
# map to a registered class, and from that get a more complete
# picture of its properties.
cls = stix2.registry.class_for_type(data.get("type"), stix_version)
if cls:
is_versionable = _VERSIONING_PROPERTIES.issubset(
cls._properties,
)
else:
# The type is not registered, so we have no knowledge of
# what properties are supported. Let's be lax and let them
# version it.
is_versionable = True
return is_versionable, stix_version
def _check_versionable_object(data):
"""
Determine whether there are or may be sufficient properties present on
an object to allow versioning. Raises an exception if the object can't be
versioned.
Also detect STIX spec version.
:param data: The object to check, e.g. dict with a "type" property, or
_STIXBase instance
:return: True if the object is STIX 2.1+, or False if not
:raises TypeNotVersionableError: If the object didn't have the versioning
properties and the type was found to not support them
:raises ObjectNotVersionableError: If the type was found to support
versioning but there were insufficient properties on the object
"""
if isinstance(data, Mapping):
if data.keys() >= _VERSIONING_PROPERTIES:
# If the properties all already exist in the object, assume they
# are either supported by the type, or are custom properties, and
# allow versioning.
stix_version = _get_stix_version(data)
else:
is_versionable_type, stix_version = _is_versionable_type(data)
if is_versionable_type:
# The type supports the versioning properties (or we don't
# recognize it and just assume it does). The question shifts
# to whether the object has sufficient properties to create a
# new version. Just require "created" for now. We need at
# least that as a starting point for new version timestamps.
is_versionable = "created" in data
if not is_versionable:
raise ObjectNotVersionableError(data)
else:
raise TypeNotVersionableError(data)
else:
raise TypeNotVersionableError(data)
return stix_version
def new_version(data, allow_custom=None, **kwargs):
"""
Create a new version of a STIX object, by modifying properties and
updating the ``modified`` property.
:param data: The object to create a new version of. Maybe a stix2 object
or dict.
:param allow_custom: Whether to allow custom properties on the new object.
If True, allow them (regardless of whether the original had custom
properties); if False disallow them; if None, auto-detect from the
object: if it has custom properties, allow them in the new version,
otherwise don't allow them.
:param kwargs: The properties to change. Setting to None requests property
removal.
:return: The new object.
"""
stix_version = _check_versionable_object(data)
if data.get('revoked'):
raise RevokeError("new_version")
try:
new_obj_inner = copy.deepcopy(data._inner)
except AttributeError:
new_obj_inner = copy.deepcopy(data)
# Make sure certain properties aren't trying to change
# ID contributing properties of 2.1+ SCOs may also not change if a UUIDv5
# is in use (depending on whether they were used to create it... but they
# probably were). That would imply an ID change, which is not allowed
# across versions.
sco_locked_props = []
if is_sco(data, "2.1"):
uuid_ = uuid.UUID(data["id"][-36:])
if uuid_.variant == uuid.RFC_4122 and uuid_.version == 5:
if isinstance(data, stix2.base._Observable):
cls = data.__class__
else:
cls = stix2.registry.class_for_type(
data["type"], stix_version, "observables",
)
sco_locked_props = cls._id_contributing_properties
unchangable_properties = set()
for prop in itertools.chain(STIX_UNMOD_PROPERTIES, sco_locked_props):
if prop in kwargs:
unchangable_properties.add(prop)
if unchangable_properties:
raise UnmodifiablePropertyError(unchangable_properties)
# Different versioning precision rules in STIX 2.0 vs 2.1, so we need
# to know which rules to apply.
precision_constraint = "min" if stix_version == "2.1" else "exact"
old_modified = data.get("modified") or data.get("created")
old_modified = parse_into_datetime(
old_modified, precision="millisecond",
precision_constraint=precision_constraint,
)
cls = type(data)
if 'modified' in kwargs:
new_modified = parse_into_datetime(
kwargs['modified'], precision='millisecond',
precision_constraint=precision_constraint,
)
if new_modified <= old_modified:
raise InvalidValueError(
cls, 'modified',
"The new modified datetime cannot be before than or equal to the current modified datetime."
"It cannot be equal, as according to STIX 2 specification, objects that are different "
"but have the same id and modified timestamp do not have defined consumer behavior.",
)
else:
new_modified = get_timestamp()
new_modified = _fudge_modified(
old_modified, new_modified, stix_version != "2.0",
)
kwargs['modified'] = new_modified
new_obj_inner.update(kwargs)
# Set allow_custom appropriately if versioning an object. We will ignore
# it for dicts.
if isinstance(data, stix2.base._STIXBase):
if allow_custom is None:
new_obj_inner["allow_custom"] = data.has_custom
else:
new_obj_inner["allow_custom"] = allow_custom
# Exclude properties with a value of 'None' in case data is not an instance of a _STIXBase subclass
return cls(**{k: v for k, v in new_obj_inner.items() if v is not None})
def revoke(data):
"""Revoke a STIX object.
Returns:
A new version of the object with ``revoked`` set to ``True``.
"""
if not isinstance(data, Mapping):
raise ValueError(
"cannot revoke object of this type! Try a dictionary "
"or instance of an SDO or SRO class.",
)
if data.get('revoked'):
raise RevokeError("revoke")
return new_version(data, revoked=True)
def remove_custom_stix(stix_obj):
"""Remove any custom STIX objects or properties.
Warnings:
This function is a best effort utility, in that it will remove custom
objects and properties based on the type names; i.e. if "x-" prefixes
object types, and "x\\_" prefixes property types. According to the
STIX2 spec, those naming conventions are a SHOULDs not MUSTs, meaning
that valid custom STIX content may ignore those conventions and in
effect render this utility function invalid when used on that STIX
content.
Args:
stix_obj (dict OR python-stix obj): a single python-stix object
or dict of a STIX object
Returns:
A new version of the object with any custom content removed
"""
if stix_obj['type'].startswith('x-'):
# if entire object is custom, discard
return None
custom_props = {
k: None
for k in stix_obj if k.startswith("x_")
}
if custom_props:
new_obj = new_version(stix_obj, allow_custom=False, **custom_props)
return new_obj
else:
return stix_obj