Merge branch 'master' of github.com:oasis-open/cti-python-stix2 into main

pull/1/head
chrisr3d 2020-06-24 11:22:37 +02:00
commit 808dd94f67
21 changed files with 1179 additions and 441 deletions

View File

@ -53,8 +53,8 @@ from .patterns import (
RepeatQualifier, StartStopQualifier, StringConstant, TimestampConstant, RepeatQualifier, StartStopQualifier, StringConstant, TimestampConstant,
WithinQualifier, WithinQualifier,
) )
from .utils import new_version, revoke
from .v20 import * # This import will always be the latest STIX 2.X version from .v20 import * # This import will always be the latest STIX 2.X version
from .version import __version__ from .version import __version__
from .versioning import new_version, revoke
_collect_stix2_mappings() _collect_stix2_mappings()

View File

@ -21,8 +21,8 @@ from .markings.utils import validate
from .utils import ( from .utils import (
NOW, PREFIX_21_REGEX, find_property_index, format_datetime, get_timestamp, NOW, PREFIX_21_REGEX, find_property_index, format_datetime, get_timestamp,
) )
from .utils import new_version as _new_version from .versioning import new_version as _new_version
from .utils import revoke as _revoke from .versioning import revoke as _revoke
try: try:
from collections.abc import Mapping from collections.abc import Mapping
@ -351,24 +351,21 @@ class _Observable(_STIXBase):
def __init__(self, **kwargs): def __init__(self, **kwargs):
# the constructor might be called independently of an observed data object # the constructor might be called independently of an observed data object
self._STIXBase__valid_refs = kwargs.pop('_valid_refs', []) self._STIXBase__valid_refs = kwargs.pop('_valid_refs', [])
self._allow_custom = kwargs.get('allow_custom', False)
self._properties['extensions'].allow_custom = kwargs.get('allow_custom', False) self._properties['extensions'].allow_custom = kwargs.get('allow_custom', False)
try:
# Since `spec_version` is optional, this is how we check for a 2.1 SCO
self._id_contributing_properties
if 'id' not in kwargs:
possible_id = self._generate_id(kwargs)
if possible_id is not None:
kwargs['id'] = possible_id
except AttributeError:
# End up here if handling a 2.0 SCO, and don't need to do anything further
pass
super(_Observable, self).__init__(**kwargs) super(_Observable, self).__init__(**kwargs)
if 'id' not in kwargs and not isinstance(self, stix2.v20._Observable):
# Specific to 2.1+ observables: generate a deterministic ID
id_ = self._generate_id()
# Spec says fall back to UUIDv4 if no contributing properties were
# given. That's what already happened (the following is actually
# overwriting the default uuidv4), so nothing to do here.
if id_ is not None:
# Can't assign to self (we're immutable), so slip the ID in
# more sneakily.
self._inner["id"] = id_
def _check_ref(self, ref, prop, prop_name): def _check_ref(self, ref, prop, prop_name):
""" """
Only for checking `*_ref` or `*_refs` properties in spec_version 2.0 Only for checking `*_ref` or `*_refs` properties in spec_version 2.0
@ -413,42 +410,53 @@ class _Observable(_STIXBase):
for ref in kwargs[prop_name]: for ref in kwargs[prop_name]:
self._check_ref(ref, prop, prop_name) self._check_ref(ref, prop, prop_name)
def _generate_id(self, kwargs): def _generate_id(self):
required_prefix = self._type + "--" """
Generate a UUIDv5 for this observable, using its "ID contributing
properties".
properties_to_use = self._id_contributing_properties :return: The ID, or None if no ID contributing properties are set
if properties_to_use: """
streamlined_object = {}
if "hashes" in kwargs and "hashes" in properties_to_use: id_ = None
possible_hash = _choose_one_hash(kwargs["hashes"]) json_serializable_object = {}
if possible_hash:
streamlined_object["hashes"] = possible_hash for key in self._id_contributing_properties:
for key in properties_to_use:
if key != "hashes" and key in kwargs: if key in self:
if isinstance(kwargs[key], dict) or isinstance(kwargs[key], _STIXBase): obj_value = self[key]
temp_deep_copy = copy.deepcopy(dict(kwargs[key]))
_recursive_stix_to_dict(temp_deep_copy) if key == "hashes":
streamlined_object[key] = temp_deep_copy serializable_value = _choose_one_hash(obj_value)
elif isinstance(kwargs[key], list):
temp_deep_copy = copy.deepcopy(kwargs[key]) if serializable_value is None:
_recursive_stix_list_to_dict(temp_deep_copy) raise InvalidValueError(
streamlined_object[key] = temp_deep_copy self, key, "No hashes given",
else: )
streamlined_object[key] = kwargs[key]
if streamlined_object:
data = canonicalize(streamlined_object, utf8=False)
# The situation is complicated w.r.t. python 2/3 behavior, so
# I'd rather not rely on particular exceptions being raised to
# determine what to do. Better to just check the python version
# directly.
if six.PY3:
return required_prefix + six.text_type(uuid.uuid5(SCO_DET_ID_NAMESPACE, data))
else: else:
return required_prefix + six.text_type(uuid.uuid5(SCO_DET_ID_NAMESPACE, data.encode("utf-8"))) serializable_value = _make_json_serializable(obj_value)
# We return None if there are no values specified for any of the id-contributing-properties json_serializable_object[key] = serializable_value
return None
if json_serializable_object:
data = canonicalize(json_serializable_object, utf8=False)
# The situation is complicated w.r.t. python 2/3 behavior, so
# I'd rather not rely on particular exceptions being raised to
# determine what to do. Better to just check the python version
# directly.
if six.PY3:
uuid_ = uuid.uuid5(SCO_DET_ID_NAMESPACE, data)
else:
uuid_ = uuid.uuid5(
SCO_DET_ID_NAMESPACE, data.encode("utf-8"),
)
id_ = "{}--{}".format(self._type, six.text_type(uuid_))
return id_
class _Extension(_STIXBase): class _Extension(_STIXBase):
@ -472,35 +480,102 @@ def _choose_one_hash(hash_dict):
if k is not None: if k is not None:
return {k: hash_dict[k]} return {k: hash_dict[k]}
return None
def _cls_init(cls, obj, kwargs): def _cls_init(cls, obj, kwargs):
if getattr(cls, '__init__', object.__init__) is not object.__init__: if getattr(cls, '__init__', object.__init__) is not object.__init__:
cls.__init__(obj, **kwargs) cls.__init__(obj, **kwargs)
def _recursive_stix_to_dict(input_dict): def _make_json_serializable(value):
for key in input_dict: """
if isinstance(input_dict[key], dict): Make the given value JSON-serializable; required for the JSON canonicalizer
_recursive_stix_to_dict(input_dict[key]) to work. This recurses into lists/dicts, converts stix objects to dicts,
elif isinstance(input_dict[key], _STIXBase): etc. "Convenience" types this library uses as property values are
input_dict[key] = dict(input_dict[key]) JSON-serialized to produce a JSON-serializable value. (So you will always
get strings for those.)
# There may stil be nested _STIXBase objects The conversion will not affect the passed in value.
_recursive_stix_to_dict(input_dict[key])
elif isinstance(input_dict[key], list): :param value: The value to make JSON-serializable.
_recursive_stix_list_to_dict(input_dict[key]) :return: The JSON-serializable value.
else: :raises ValueError: If value is None (since nulls are not allowed in STIX
pass objects).
"""
if value is None:
raise ValueError("Illegal null value found in a STIX object")
json_value = value # default assumption
if isinstance(value, Mapping):
json_value = {
k: _make_json_serializable(v)
for k, v in value.items()
}
elif isinstance(value, list):
json_value = [
_make_json_serializable(v)
for v in value
]
elif not isinstance(value, (int, float, six.string_types, bool)):
# If a "simple" value which is not already JSON-serializable,
# JSON-serialize to a string and use that as our JSON-serializable
# value. This applies to our datetime objects currently (timestamp
# properties), and could apply to any other "convenience" types this
# library uses for property values in the future.
json_value = json.dumps(value, ensure_ascii=False, cls=STIXJSONEncoder)
# If it looks like a string literal was output, strip off the quotes.
# Otherwise, a second pair will be added when it's canonicalized. Also
# to be extra safe, we need to unescape.
if len(json_value) >= 2 and \
json_value[0] == '"' and json_value[-1] == '"':
json_value = _un_json_escape(json_value[1:-1])
return json_value
def _recursive_stix_list_to_dict(input_list): _JSON_ESCAPE_RE = re.compile(r"\\.")
for i in range(len(input_list)): # I don't think I should need to worry about the unicode escapes (\uXXXX)
if isinstance(input_list[i], _STIXBase): # since I use ensure_ascii=False when generating it. I will just fix all
input_list[i] = dict(input_list[i]) # the other escapes, e.g. \n, \r, etc.
elif isinstance(input_list[i], dict): #
pass # This list is taken from RFC8259 section 7:
elif isinstance(input_list[i], list): # https://tools.ietf.org/html/rfc8259#section-7
_recursive_stix_list_to_dict(input_list[i]) # Maps the second char of a "\X" style escape, to a replacement char
else: _JSON_ESCAPE_MAP = {
continue '"': '"',
_recursive_stix_to_dict(input_list[i]) "\\": "\\",
"/": "/",
"b": "\b",
"f": "\f",
"n": "\n",
"r": "\r",
"t": "\t",
}
def _un_json_escape(json_string):
"""
Removes JSON string literal escapes. We should undo these things Python's
serializer does, so we can ensure they're done canonically. The
canonicalizer should be in charge of everything, as much as is feasible.
:param json_string: String literal output of Python's JSON serializer,
minus the surrounding quotes.
:return: The unescaped string
"""
def replace(m):
replacement = _JSON_ESCAPE_MAP.get(m.group(0)[1])
if replacement is None:
raise ValueError("Unrecognized JSON escape: " + m.group(0))
return replacement
result = _JSON_ESCAPE_RE.sub(replace, json_string)
return result

View File

@ -2,7 +2,8 @@
from stix2 import exceptions from stix2 import exceptions
from stix2.markings import utils from stix2.markings import utils
from stix2.utils import is_marking, new_version from stix2.utils import is_marking
from stix2.versioning import new_version
def get_markings(obj, selectors, inherited=False, descendants=False, marking_ref=True, lang=True): def get_markings(obj, selectors, inherited=False, descendants=False, marking_ref=True, lang=True):

View File

@ -2,7 +2,7 @@
from stix2 import exceptions from stix2 import exceptions
from stix2.markings import utils from stix2.markings import utils
from stix2.utils import new_version from stix2.versioning import new_version
def get_markings(obj): def get_markings(obj):

View File

@ -40,6 +40,11 @@ def remove_terminal_nodes(parse_tree_nodes):
return values return values
_TIMESTAMP_RE = re.compile(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d{1,6})?Z')
def check_for_valid_timetamp_syntax(timestamp_string):
return _TIMESTAMP_RE.match(timestamp_string)
@ -214,6 +219,14 @@ class STIXPatternVisitorForSTIX2():
# Visit a parse tree produced by STIXPatternParser#startStopQualifier. # Visit a parse tree produced by STIXPatternParser#startStopQualifier.
def visitStartStopQualifier(self, ctx): def visitStartStopQualifier(self, ctx):
children = self.visitChildren(ctx) children = self.visitChildren(ctx)
# 2.0 parser will accept any string, need to make sure it is a full STIX timestamp
if isinstance(children[1], StringConstant):
if not check_for_valid_timetamp_syntax(children[1].value):
raise (ValueError("Start time is not a legal timestamp"))
if isinstance(children[3], StringConstant):
if not check_for_valid_timetamp_syntax(children[3].value):
raise (ValueError("Stop time is not a legal timestamp"))
return StartStopQualifier(children[1], children[3]) return StartStopQualifier(children[1], children[3])
# Visit a parse tree produced by STIXPatternParser#withinQualifier. # Visit a parse tree produced by STIXPatternParser#withinQualifier.

View File

@ -669,12 +669,16 @@ class StartStopQualifier(_ExpressionQualifier):
self.start_time = start_time self.start_time = start_time
elif isinstance(start_time, datetime.date): elif isinstance(start_time, datetime.date):
self.start_time = TimestampConstant(start_time) self.start_time = TimestampConstant(start_time)
elif isinstance(start_time, StringConstant):
self.start_time = StringConstant(start_time.value)
else: else:
raise ValueError("%s is not a valid argument for a Start/Stop Qualifier" % start_time) raise ValueError("%s is not a valid argument for a Start/Stop Qualifier" % start_time)
if isinstance(stop_time, TimestampConstant): if isinstance(stop_time, TimestampConstant):
self.stop_time = stop_time self.stop_time = stop_time
elif isinstance(stop_time, datetime.date): elif isinstance(stop_time, datetime.date):
self.stop_time = TimestampConstant(stop_time) self.stop_time = TimestampConstant(stop_time)
elif isinstance(stop_time, StringConstant):
self.stop_time = StringConstant(stop_time.value)
else: else:
raise ValueError("%s is not a valid argument for a Start/Stop Qualifier" % stop_time) raise ValueError("%s is not a valid argument for a Start/Stop Qualifier" % stop_time)

View File

@ -17,10 +17,7 @@ from .exceptions import (
MutuallyExclusivePropertiesError, MutuallyExclusivePropertiesError,
) )
from .parsing import STIX2_OBJ_MAPS, parse, parse_observable from .parsing import STIX2_OBJ_MAPS, parse, parse_observable
from .utils import ( from .utils import _get_dict, get_class_hierarchy_names, parse_into_datetime
TYPE_21_REGEX, TYPE_REGEX, _get_dict, get_class_hierarchy_names,
parse_into_datetime,
)
ID_REGEX_interoperability = re.compile(r"[0-9a-fA-F]{8}-" ID_REGEX_interoperability = re.compile(r"[0-9a-fA-F]{8}-"
"[0-9a-fA-F]{4}-" "[0-9a-fA-F]{4}-"
@ -33,6 +30,8 @@ try:
except ImportError: except ImportError:
from collections import Mapping, defaultdict from collections import Mapping, defaultdict
TYPE_REGEX = re.compile(r'^\-?[a-z0-9]+(-[a-z0-9]+)*\-?$')
TYPE_21_REGEX = re.compile(r'^([a-z][a-z0-9]*)+(-[a-z0-9]+)*\-?$')
ERROR_INVALID_ID = ( ERROR_INVALID_ID = (
"not a valid STIX identifier, must match <object-type>--<UUID>: {}" "not a valid STIX identifier, must match <object-type>--<UUID>: {}"
) )
@ -547,7 +546,7 @@ def enumerate_types(types, spec_version):
return return_types return return_types
SELECTOR_REGEX = re.compile(r"^[a-z0-9_-]{3,250}(\.(\[\d+\]|[a-z0-9_-]{1,250}))*$") SELECTOR_REGEX = re.compile(r"^([a-z0-9_-]{3,250}(\.(\[\d+\]|[a-z0-9_-]{1,250}))*|id)$")
class SelectorProperty(Property): class SelectorProperty(Property):

View File

@ -1089,3 +1089,17 @@ def test_clear_marking_not_present(data):
"""Test clearing markings for a selector that has no associated markings.""" """Test clearing markings for a selector that has no associated markings."""
with pytest.raises(MarkingNotFoundError): with pytest.raises(MarkingNotFoundError):
data = markings.clear_markings(data, ["labels"]) data = markings.clear_markings(data, ["labels"])
def test_set_marking_on_id_property():
malware = Malware(
granular_markings=[
{
"selectors": ["id"],
"marking_ref": MARKING_IDS[0],
},
],
**MALWARE_KWARGS
)
assert "id" in malware["granular_markings"][0]["selectors"]

View File

@ -14,6 +14,7 @@ from .constants import MARKING_IDS
MALWARE_KWARGS = MALWARE_KWARGS_CONST.copy() MALWARE_KWARGS = MALWARE_KWARGS_CONST.copy()
MALWARE_KWARGS.update({ MALWARE_KWARGS.update({
'id': MALWARE_ID, 'id': MALWARE_ID,
'type': 'malware',
'created': FAKE_TIME, 'created': FAKE_TIME,
'modified': FAKE_TIME, 'modified': FAKE_TIME,
}) })

View File

@ -312,8 +312,8 @@ def test_set_op():
def test_timestamp(): def test_timestamp():
ts = stix2.TimestampConstant('2014-01-13T07:03:17Z') ts = stix2.StringConstant('2014-01-13T07:03:17Z')
assert str(ts) == "t'2014-01-13T07:03:17Z'" assert str(ts) == "'2014-01-13T07:03:17Z'"
def test_boolean(): def test_boolean():
@ -363,11 +363,6 @@ def test_invalid_integer_constant():
stix2.IntegerConstant('foo') stix2.IntegerConstant('foo')
def test_invalid_timestamp_constant():
with pytest.raises(ValueError):
stix2.TimestampConstant('foo')
def test_invalid_float_constant(): def test_invalid_float_constant():
with pytest.raises(ValueError): with pytest.raises(ValueError):
stix2.FloatConstant('foo') stix2.FloatConstant('foo')
@ -461,23 +456,23 @@ def test_invalid_within_qualifier():
def test_startstop_qualifier(): def test_startstop_qualifier():
qual = stix2.StartStopQualifier( qual = stix2.StartStopQualifier(
stix2.TimestampConstant('2016-06-01T00:00:00Z'), stix2.StringConstant('2016-06-01T00:00:00Z'),
datetime.datetime(2017, 3, 12, 8, 30, 0), stix2.StringConstant('2017-03-12T08:30:00Z'),
) )
assert str(qual) == "START t'2016-06-01T00:00:00Z' STOP t'2017-03-12T08:30:00Z'" assert str(qual) == "START '2016-06-01T00:00:00Z' STOP '2017-03-12T08:30:00Z'"
qual2 = stix2.StartStopQualifier( qual2 = stix2.StartStopQualifier(
datetime.date(2016, 6, 1), stix2.StringConstant("2016-06-01T00:00:00Z"),
stix2.TimestampConstant('2016-07-01T00:00:00Z'), stix2.StringConstant('2016-07-01T00:00:00Z'),
) )
assert str(qual2) == "START t'2016-06-01T00:00:00Z' STOP t'2016-07-01T00:00:00Z'" assert str(qual2) == "START '2016-06-01T00:00:00Z' STOP '2016-07-01T00:00:00Z'"
def test_invalid_startstop_qualifier(): def test_invalid_startstop_qualifier():
with pytest.raises(ValueError): with pytest.raises(ValueError):
stix2.StartStopQualifier( stix2.StartStopQualifier(
'foo', 'foo',
stix2.TimestampConstant('2016-06-01T00:00:00Z'), stix2.StringConstant('2016-06-01T00:00:00Z'),
) )
with pytest.raises(ValueError): with pytest.raises(ValueError):
@ -508,6 +503,19 @@ def test_parsing_qualified_expression():
) == "[network-traffic:dst_ref.type = 'domain-name' AND network-traffic:dst_ref.value = 'example.com'] REPEATS 5 TIMES WITHIN 1800 SECONDS" ) == "[network-traffic:dst_ref.type = 'domain-name' AND network-traffic:dst_ref.value = 'example.com'] REPEATS 5 TIMES WITHIN 1800 SECONDS"
def test_parsing_start_stop_qualified_expression():
patt_obj = create_pattern_object("[ipv4-addr:value = '1.2.3.4'] START '2016-06-01T00:00:00Z' STOP '2017-03-12T08:30:00Z'", version="2.0")
assert str(
patt_obj,
) == "[ipv4-addr:value = '1.2.3.4'] START '2016-06-01T00:00:00Z' STOP '2017-03-12T08:30:00Z'"
def test_parsing_illegal_start_stop_qualified_expression():
with pytest.raises(ValueError):
create_pattern_object("[ipv4-addr:value = '1.2.3.4'] START '2016-06-01' STOP '2017-03-12T08:30:00Z'", version="2.0")
def test_list_constant(): def test_list_constant():
patt_obj = create_pattern_object("[network-traffic:src_ref.value IN ('10.0.0.0', '10.0.0.1', '10.0.0.2')]", version="2.0") patt_obj = create_pattern_object("[network-traffic:src_ref.value IN ('10.0.0.0', '10.0.0.1', '10.0.0.2')]", version="2.0")
assert str(patt_obj) == "[network-traffic:src_ref.value IN ('10.0.0.0', '10.0.0.1', '10.0.0.2')]" assert str(patt_obj) == "[network-traffic:src_ref.value IN ('10.0.0.0', '10.0.0.1', '10.0.0.2')]"

View File

@ -1,6 +1,10 @@
import pytest import pytest
import stix2 import stix2
import stix2.exceptions
import stix2.utils
import stix2.v20
import stix2.versioning
from .constants import CAMPAIGN_MORE_KWARGS from .constants import CAMPAIGN_MORE_KWARGS
@ -142,7 +146,7 @@ def test_versioning_error_revoke_of_revoked():
def test_making_new_version_dict(): def test_making_new_version_dict():
campaign_v1 = CAMPAIGN_MORE_KWARGS campaign_v1 = CAMPAIGN_MORE_KWARGS
campaign_v2 = stix2.utils.new_version(CAMPAIGN_MORE_KWARGS, name="fred") campaign_v2 = stix2.versioning.new_version(CAMPAIGN_MORE_KWARGS, name="fred")
assert campaign_v1['id'] == campaign_v2['id'] assert campaign_v1['id'] == campaign_v2['id']
assert campaign_v1['created_by_ref'] == campaign_v2['created_by_ref'] assert campaign_v1['created_by_ref'] == campaign_v2['created_by_ref']
@ -155,7 +159,7 @@ def test_making_new_version_dict():
def test_versioning_error_dict_bad_modified_value(): def test_versioning_error_dict_bad_modified_value():
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo: with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.utils.new_version(CAMPAIGN_MORE_KWARGS, modified="2015-04-06T20:03:00.000Z") stix2.versioning.new_version(CAMPAIGN_MORE_KWARGS, modified="2015-04-06T20:03:00.000Z")
assert excinfo.value.cls == dict assert excinfo.value.cls == dict
assert excinfo.value.prop_name == "modified" assert excinfo.value.prop_name == "modified"
@ -171,7 +175,7 @@ def test_versioning_error_dict_no_modified_value():
'created': "2016-04-06T20:03:00.000Z", 'created': "2016-04-06T20:03:00.000Z",
'name': "Green Group Attacks Against Finance", 'name': "Green Group Attacks Against Finance",
} }
campaign_v2 = stix2.utils.new_version(campaign_v1, modified="2017-04-06T20:03:00.000Z") campaign_v2 = stix2.versioning.new_version(campaign_v1, modified="2017-04-06T20:03:00.000Z")
assert str(campaign_v2['modified']) == "2017-04-06T20:03:00.000Z" assert str(campaign_v2['modified']) == "2017-04-06T20:03:00.000Z"
@ -179,14 +183,14 @@ def test_versioning_error_dict_no_modified_value():
def test_making_new_version_invalid_cls(): def test_making_new_version_invalid_cls():
campaign_v1 = "This is a campaign." campaign_v1 = "This is a campaign."
with pytest.raises(ValueError) as excinfo: with pytest.raises(ValueError) as excinfo:
stix2.utils.new_version(campaign_v1, name="fred") stix2.versioning.new_version(campaign_v1, name="fred")
assert 'cannot create new version of object of this type' in str(excinfo.value) assert 'cannot create new version of object of this type' in str(excinfo.value)
def test_revoke_dict(): def test_revoke_dict():
campaign_v1 = CAMPAIGN_MORE_KWARGS campaign_v1 = CAMPAIGN_MORE_KWARGS
campaign_v2 = stix2.utils.revoke(campaign_v1) campaign_v2 = stix2.versioning.revoke(campaign_v1)
assert campaign_v1['id'] == campaign_v2['id'] assert campaign_v1['id'] == campaign_v2['id']
assert campaign_v1['created_by_ref'] == campaign_v2['created_by_ref'] assert campaign_v1['created_by_ref'] == campaign_v2['created_by_ref']
@ -198,12 +202,18 @@ def test_revoke_dict():
assert campaign_v2['revoked'] assert campaign_v2['revoked']
def test_revoke_unversionable():
sco = stix2.v20.File(name="data.txt")
with pytest.raises(ValueError):
sco.revoke()
def test_versioning_error_revoke_of_revoked_dict(): def test_versioning_error_revoke_of_revoked_dict():
campaign_v1 = CAMPAIGN_MORE_KWARGS campaign_v1 = CAMPAIGN_MORE_KWARGS
campaign_v2 = stix2.utils.revoke(campaign_v1) campaign_v2 = stix2.versioning.revoke(campaign_v1)
with pytest.raises(stix2.exceptions.RevokeError) as excinfo: with pytest.raises(stix2.exceptions.RevokeError) as excinfo:
stix2.utils.revoke(campaign_v2) stix2.versioning.revoke(campaign_v2)
assert excinfo.value.called_by == "revoke" assert excinfo.value.called_by == "revoke"
@ -211,7 +221,7 @@ def test_versioning_error_revoke_of_revoked_dict():
def test_revoke_invalid_cls(): def test_revoke_invalid_cls():
campaign_v1 = "This is a campaign." campaign_v1 = "This is a campaign."
with pytest.raises(ValueError) as excinfo: with pytest.raises(ValueError) as excinfo:
stix2.utils.revoke(campaign_v1) stix2.versioning.revoke(campaign_v1)
assert 'cannot revoke object of this type' in str(excinfo.value) assert 'cannot revoke object of this type' in str(excinfo.value)
@ -224,7 +234,7 @@ def test_remove_custom_stix_property():
allow_custom=True, allow_custom=True,
) )
mal_nc = stix2.utils.remove_custom_stix(mal) mal_nc = stix2.versioning.remove_custom_stix(mal)
assert "x_custom" not in mal_nc assert "x_custom" not in mal_nc
assert (stix2.utils.parse_into_datetime(mal["modified"], precision="millisecond") < assert (stix2.utils.parse_into_datetime(mal["modified"], precision="millisecond") <
@ -243,15 +253,136 @@ def test_remove_custom_stix_object():
animal = Animal(species="lion", animal_class="mammal") animal = Animal(species="lion", animal_class="mammal")
nc = stix2.utils.remove_custom_stix(animal) nc = stix2.versioning.remove_custom_stix(animal)
assert nc is None assert nc is None
def test_remove_custom_stix_no_custom(): def test_remove_custom_stix_no_custom():
campaign_v1 = stix2.v20.Campaign(**CAMPAIGN_MORE_KWARGS) campaign_v1 = stix2.v20.Campaign(**CAMPAIGN_MORE_KWARGS)
campaign_v2 = stix2.utils.remove_custom_stix(campaign_v1) campaign_v2 = stix2.versioning.remove_custom_stix(campaign_v1)
assert len(campaign_v1.keys()) == len(campaign_v2.keys()) assert len(campaign_v1.keys()) == len(campaign_v2.keys())
assert campaign_v1.id == campaign_v2.id assert campaign_v1.id == campaign_v2.id
assert campaign_v1.description == campaign_v2.description assert campaign_v1.description == campaign_v2.description
def test_version_unversionable_dict():
f = {
"type": "file",
"name": "data.txt",
}
with pytest.raises(ValueError):
stix2.versioning.new_version(f)
def test_version_sco_with_modified():
"""
Ensure new_version() doesn't get tripped up over unversionable objects with
properties not used for versioning, but whose names conflict with
versioning properties.
"""
file_sco = {
"type": "file",
"name": "data.txt",
"created": "1973-11-23T02:31:37Z",
"modified": "1991-05-13T19:24:57Z",
}
with pytest.raises(ValueError):
stix2.versioning.new_version(file_sco, name="newname.txt")
with pytest.raises(ValueError):
stix2.versioning.revoke(file_sco)
file_sco_obj = stix2.v20.File(
name="data.txt",
created="1973-11-23T02:31:37Z",
modified="1991-05-13T19:24:57Z",
)
with pytest.raises(ValueError):
stix2.versioning.new_version(file_sco_obj, name="newname.txt")
with pytest.raises(ValueError):
stix2.versioning.revoke(file_sco_obj)
def test_version_sco_with_custom():
"""
If we add custom properties named like versioning properties to an object
type which is otherwise unversionable, versioning should start working.
"""
file_sco_obj = stix2.v20.File(
name="data.txt",
created="1973-11-23T02:31:37Z",
modified="1991-05-13T19:24:57Z",
revoked=False, # the custom property
allow_custom=True,
)
new_file_sco_obj = stix2.versioning.new_version(
file_sco_obj, name="newname.txt",
)
assert new_file_sco_obj.name == "newname.txt"
revoked_obj = stix2.versioning.revoke(new_file_sco_obj)
assert revoked_obj.revoked
def test_version_disable_custom():
m = stix2.v20.Malware(
name="foo", labels=["label"], description="Steals your identity!",
x_custom=123, allow_custom=True,
)
# Remove the custom property, and disallow custom properties in the
# resulting object.
m2 = stix2.versioning.new_version(m, x_custom=None, allow_custom=False)
assert "x_custom" not in m2
# Remove a regular property and leave the custom one, disallow custom
# properties, and make sure we get an error.
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
stix2.versioning.new_version(m, description=None, allow_custom=False)
def test_version_enable_custom():
m = stix2.v20.Malware(
name="foo", labels=["label"], description="Steals your identity!",
)
# Add a custom property to an object for which it was previously disallowed
m2 = stix2.versioning.new_version(m, x_custom=123, allow_custom=True)
assert "x_custom" in m2
# Add a custom property without enabling it, make sure we get an error
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
stix2.versioning.new_version(m, x_custom=123, allow_custom=False)
def test_version_propagate_custom():
m = stix2.v20.Malware(
name="foo", labels=["label"],
)
# Remember custom-not-allowed setting from original; produce error
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
stix2.versioning.new_version(m, x_custom=123)
m2 = stix2.versioning.new_version(m, description="Steals your identity!")
assert "description" in m2
assert m2.description == "Steals your identity!"
m_custom = stix2.v20.Malware(
name="foo", labels=["label"], x_custom=123, allow_custom=True,
)
# Remember custom-allowed setting from original; should work
m2_custom = stix2.versioning.new_version(m_custom, x_other_custom="abc")
assert "x_other_custom" in m2_custom
assert m2_custom.x_other_custom == "abc"

View File

@ -0,0 +1,339 @@
from collections import OrderedDict
import datetime
import uuid
import pytest
import six
import stix2.base
import stix2.canonicalization.Canonicalize
import stix2.exceptions
from stix2.properties import (
BooleanProperty, DictionaryProperty, EmbeddedObjectProperty,
ExtensionsProperty, FloatProperty, HashesProperty, IDProperty,
IntegerProperty, ListProperty, StringProperty, TimestampProperty,
TypeProperty,
)
import stix2.v21
SCO_DET_ID_NAMESPACE = uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7")
def _uuid_from_id(id_):
dd_idx = id_.index("--")
uuid_str = id_[dd_idx+2:]
uuid_ = uuid.UUID(uuid_str)
return uuid_
def _make_uuid5(name):
"""
Make a STIX 2.1+ compliant UUIDv5 from a "name".
"""
if six.PY3:
uuid_ = uuid.uuid5(SCO_DET_ID_NAMESPACE, name)
else:
uuid_ = uuid.uuid5(
SCO_DET_ID_NAMESPACE, name.encode("utf-8"),
)
return uuid_
def test_no_contrib_props_defined():
class SomeSCO(stix2.v21._Observable):
_type = "some-sco"
_properties = OrderedDict((
('type', TypeProperty(_type, spec_version='2.1')),
('id', IDProperty(_type, spec_version='2.1')),
(
'extensions', ExtensionsProperty(
spec_version='2.1', enclosing_type=_type,
),
),
))
_id_contributing_properties = []
sco = SomeSCO()
uuid_ = _uuid_from_id(sco["id"])
assert uuid_.variant == uuid.RFC_4122
assert uuid_.version == 4
def test_json_compatible_prop_values():
class SomeSCO(stix2.v21._Observable):
_type = "some-sco"
_properties = OrderedDict((
('type', TypeProperty(_type, spec_version='2.1')),
('id', IDProperty(_type, spec_version='2.1')),
(
'extensions', ExtensionsProperty(
spec_version='2.1', enclosing_type=_type,
),
),
('string', StringProperty()),
('int', IntegerProperty()),
('float', FloatProperty()),
('bool', BooleanProperty()),
('list', ListProperty(IntegerProperty())),
('dict', DictionaryProperty(spec_version="2.1")),
))
_id_contributing_properties = [
'string', 'int', 'float', 'bool', 'list', 'dict',
]
obj = {
"string": "abc",
"int": 1,
"float": 1.5,
"bool": True,
"list": [1, 2, 3],
"dict": {"a": 1, "b": [2], "c": "three"},
}
sco = SomeSCO(**obj)
can_json = stix2.canonicalization.Canonicalize.canonicalize(obj, utf8=False)
expected_uuid5 = _make_uuid5(can_json)
actual_uuid5 = _uuid_from_id(sco["id"])
assert actual_uuid5 == expected_uuid5
def test_json_incompatible_timestamp_value():
class SomeSCO(stix2.v21._Observable):
_type = "some-sco"
_properties = OrderedDict((
('type', TypeProperty(_type, spec_version='2.1')),
('id', IDProperty(_type, spec_version='2.1')),
(
'extensions', ExtensionsProperty(
spec_version='2.1', enclosing_type=_type,
),
),
('timestamp', TimestampProperty()),
))
_id_contributing_properties = ['timestamp']
ts = datetime.datetime(1987, 1, 2, 3, 4, 5, 678900)
sco = SomeSCO(timestamp=ts)
obj = {
"timestamp": "1987-01-02T03:04:05.6789Z",
}
can_json = stix2.canonicalization.Canonicalize.canonicalize(obj, utf8=False)
expected_uuid5 = _make_uuid5(can_json)
actual_uuid5 = _uuid_from_id(sco["id"])
assert actual_uuid5 == expected_uuid5
def test_embedded_object():
class SubObj(stix2.base._STIXBase):
_type = "sub-object"
_properties = OrderedDict((
('value', StringProperty()),
))
class SomeSCO(stix2.v21._Observable):
_type = "some-sco"
_properties = OrderedDict((
('type', TypeProperty(_type, spec_version='2.1')),
('id', IDProperty(_type, spec_version='2.1')),
(
'extensions', ExtensionsProperty(
spec_version='2.1', enclosing_type=_type,
),
),
('sub_obj', EmbeddedObjectProperty(type=SubObj)),
))
_id_contributing_properties = ['sub_obj']
sub_obj = SubObj(value="foo")
sco = SomeSCO(sub_obj=sub_obj)
obj = {
"sub_obj": {
"value": "foo",
},
}
can_json = stix2.canonicalization.Canonicalize.canonicalize(obj, utf8=False)
expected_uuid5 = _make_uuid5(can_json)
actual_uuid5 = _uuid_from_id(sco["id"])
assert actual_uuid5 == expected_uuid5
def test_empty_hash():
class SomeSCO(stix2.v21._Observable):
_type = "some-sco"
_properties = OrderedDict((
('type', TypeProperty(_type, spec_version='2.1')),
('id', IDProperty(_type, spec_version='2.1')),
(
'extensions', ExtensionsProperty(
spec_version='2.1', enclosing_type=_type,
),
),
('hashes', HashesProperty()),
))
_id_contributing_properties = ['hashes']
with pytest.raises(stix2.exceptions.InvalidValueError):
SomeSCO(hashes={})
@pytest.mark.parametrize(
"json_escaped, expected_unescaped", [
("", ""),
("a", "a"),
(r"\n", "\n"),
(r"\n\r\b\t\\\/\"", "\n\r\b\t\\/\""),
(r"\\n", r"\n"),
(r"\\\n", "\\\n"),
],
)
def test_json_unescaping(json_escaped, expected_unescaped):
actual_unescaped = stix2.base._un_json_escape(json_escaped)
assert actual_unescaped == expected_unescaped
def test_json_unescaping_bad_escape():
with pytest.raises(ValueError):
stix2.base._un_json_escape(r"\x")
def test_deterministic_id_same_extra_prop_vals():
email_addr_1 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Johnny Doe",
)
email_addr_2 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Johnny Doe",
)
assert email_addr_1.id == email_addr_2.id
uuid_obj_1 = uuid.UUID(email_addr_1.id[-36:])
assert uuid_obj_1.variant == uuid.RFC_4122
assert uuid_obj_1.version == 5
uuid_obj_2 = uuid.UUID(email_addr_2.id[-36:])
assert uuid_obj_2.variant == uuid.RFC_4122
assert uuid_obj_2.version == 5
def test_deterministic_id_diff_extra_prop_vals():
email_addr_1 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Johnny Doe",
)
email_addr_2 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Janey Doe",
)
assert email_addr_1.id == email_addr_2.id
uuid_obj_1 = uuid.UUID(email_addr_1.id[-36:])
assert uuid_obj_1.variant == uuid.RFC_4122
assert uuid_obj_1.version == 5
uuid_obj_2 = uuid.UUID(email_addr_2.id[-36:])
assert uuid_obj_2.variant == uuid.RFC_4122
assert uuid_obj_2.version == 5
def test_deterministic_id_diff_contributing_prop_vals():
email_addr_1 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Johnny Doe",
)
email_addr_2 = stix2.v21.EmailAddress(
value="jane@example.com",
display_name="Janey Doe",
)
assert email_addr_1.id != email_addr_2.id
uuid_obj_1 = uuid.UUID(email_addr_1.id[-36:])
assert uuid_obj_1.variant == uuid.RFC_4122
assert uuid_obj_1.version == 5
uuid_obj_2 = uuid.UUID(email_addr_2.id[-36:])
assert uuid_obj_2.variant == uuid.RFC_4122
assert uuid_obj_2.version == 5
def test_deterministic_id_no_contributing_props():
email_msg_1 = stix2.v21.EmailMessage(
is_multipart=False,
)
email_msg_2 = stix2.v21.EmailMessage(
is_multipart=False,
)
assert email_msg_1.id != email_msg_2.id
uuid_obj_1 = uuid.UUID(email_msg_1.id[-36:])
assert uuid_obj_1.variant == uuid.RFC_4122
assert uuid_obj_1.version == 4
uuid_obj_2 = uuid.UUID(email_msg_2.id[-36:])
assert uuid_obj_2.variant == uuid.RFC_4122
assert uuid_obj_2.version == 4
def test_id_gen_recursive_dict_conversion_1():
file_observable = stix2.v21.File(
name="example.exe",
size=68 * 1000,
magic_number_hex="50000000",
hashes={
"SHA-256": "841a8921140aba50671ebb0770fecc4ee308c4952cfeff8de154ab14eeef4649",
},
extensions={
"windows-pebinary-ext": stix2.v21.WindowsPEBinaryExt(
pe_type="exe",
machine_hex="014c",
sections=[
stix2.v21.WindowsPESection(
name=".data",
size=4096,
entropy=7.980693,
hashes={"SHA-256": "6e3b6f3978e5cd96ba7abee35c24e867b7e64072e2ecb22d0ee7a6e6af6894d0"},
),
],
),
},
)
assert file_observable.id == "file--ced31cd4-bdcb-537d-aefa-92d291bfc11d"
def test_id_gen_recursive_dict_conversion_2():
wrko = stix2.v21.WindowsRegistryKey(
values=[
stix2.v21.WindowsRegistryValueType(
name="Foo",
data="qwerty",
),
stix2.v21.WindowsRegistryValueType(
name="Bar",
data="42",
),
],
)
assert wrko.id == "windows-registry-key--36594eba-bcc7-5014-9835-0e154264e588"

View File

@ -1307,3 +1307,17 @@ def test_clear_marking_not_present(data):
"""Test clearing markings for a selector that has no associated markings.""" """Test clearing markings for a selector that has no associated markings."""
with pytest.raises(MarkingNotFoundError): with pytest.raises(MarkingNotFoundError):
markings.clear_markings(data, ["malware_types"]) markings.clear_markings(data, ["malware_types"])
def test_set_marking_on_id_property():
malware = Malware(
granular_markings=[
{
"selectors": ["id"],
"marking_ref": MARKING_IDS[0],
},
],
**MALWARE_KWARGS
)
assert "id" in malware["granular_markings"][0]["selectors"]

View File

@ -13,6 +13,7 @@ from .constants import MARKING_IDS
MALWARE_KWARGS = MALWARE_KWARGS_CONST.copy() MALWARE_KWARGS = MALWARE_KWARGS_CONST.copy()
MALWARE_KWARGS.update({ MALWARE_KWARGS.update({
'id': MALWARE_ID, 'id': MALWARE_ID,
'type': 'malware',
'created': FAKE_TIME, 'created': FAKE_TIME,
'modified': FAKE_TIME, 'modified': FAKE_TIME,
}) })

View File

@ -1,6 +1,5 @@
import datetime as dt import datetime as dt
import re import re
import uuid
import pytest import pytest
import pytz import pytz
@ -900,6 +899,27 @@ def test_file_example_with_RasterImageExt_Object():
assert f.extensions["raster-image-ext"].exif_tags["XResolution"] == 4928 assert f.extensions["raster-image-ext"].exif_tags["XResolution"] == 4928
def test_file_with_archive_ext_object():
ad = stix2.v21.Directory(path="archived/path")
f_obj = stix2.v21.File(
name="foo", extensions={
"archive-ext": {
"contains_refs": [ad, ],
},
},
)
f_ref = stix2.v21.File(
name="foo", extensions={
"archive-ext": {
"contains_refs": [ad.id, ],
},
},
)
assert f_obj["id"] == f_ref["id"]
assert f_obj["extensions"]["archive-ext"]["contains_refs"][0] == ad["id"]
RASTER_IMAGE_EXT = """{ RASTER_IMAGE_EXT = """{
"type": "observed-data", "type": "observed-data",
"spec_version": "2.1", "spec_version": "2.1",
@ -1469,133 +1489,3 @@ def test_objects_deprecation():
}, },
}, },
) )
def test_deterministic_id_same_extra_prop_vals():
email_addr_1 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Johnny Doe",
)
email_addr_2 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Johnny Doe",
)
assert email_addr_1.id == email_addr_2.id
uuid_obj_1 = uuid.UUID(email_addr_1.id[-36:])
assert uuid_obj_1.variant == uuid.RFC_4122
assert uuid_obj_1.version == 5
uuid_obj_2 = uuid.UUID(email_addr_2.id[-36:])
assert uuid_obj_2.variant == uuid.RFC_4122
assert uuid_obj_2.version == 5
def test_deterministic_id_diff_extra_prop_vals():
email_addr_1 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Johnny Doe",
)
email_addr_2 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Janey Doe",
)
assert email_addr_1.id == email_addr_2.id
uuid_obj_1 = uuid.UUID(email_addr_1.id[-36:])
assert uuid_obj_1.variant == uuid.RFC_4122
assert uuid_obj_1.version == 5
uuid_obj_2 = uuid.UUID(email_addr_2.id[-36:])
assert uuid_obj_2.variant == uuid.RFC_4122
assert uuid_obj_2.version == 5
def test_deterministic_id_diff_contributing_prop_vals():
email_addr_1 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Johnny Doe",
)
email_addr_2 = stix2.v21.EmailAddress(
value="jane@example.com",
display_name="Janey Doe",
)
assert email_addr_1.id != email_addr_2.id
uuid_obj_1 = uuid.UUID(email_addr_1.id[-36:])
assert uuid_obj_1.variant == uuid.RFC_4122
assert uuid_obj_1.version == 5
uuid_obj_2 = uuid.UUID(email_addr_2.id[-36:])
assert uuid_obj_2.variant == uuid.RFC_4122
assert uuid_obj_2.version == 5
def test_deterministic_id_no_contributing_props():
email_msg_1 = stix2.v21.EmailMessage(
is_multipart=False,
)
email_msg_2 = stix2.v21.EmailMessage(
is_multipart=False,
)
assert email_msg_1.id != email_msg_2.id
uuid_obj_1 = uuid.UUID(email_msg_1.id[-36:])
assert uuid_obj_1.variant == uuid.RFC_4122
assert uuid_obj_1.version == 4
uuid_obj_2 = uuid.UUID(email_msg_2.id[-36:])
assert uuid_obj_2.variant == uuid.RFC_4122
assert uuid_obj_2.version == 4
def test_id_gen_recursive_dict_conversion_1():
file_observable = stix2.v21.File(
name="example.exe",
size=68 * 1000,
magic_number_hex="50000000",
hashes={
"SHA-256": "841a8921140aba50671ebb0770fecc4ee308c4952cfeff8de154ab14eeef4649",
},
extensions={
"windows-pebinary-ext": stix2.v21.WindowsPEBinaryExt(
pe_type="exe",
machine_hex="014c",
sections=[
stix2.v21.WindowsPESection(
name=".data",
size=4096,
entropy=7.980693,
hashes={"SHA-256": "6e3b6f3978e5cd96ba7abee35c24e867b7e64072e2ecb22d0ee7a6e6af6894d0"},
),
],
),
},
)
assert file_observable.id == "file--ced31cd4-bdcb-537d-aefa-92d291bfc11d"
def test_id_gen_recursive_dict_conversion_2():
wrko = stix2.v21.WindowsRegistryKey(
values=[
stix2.v21.WindowsRegistryValueType(
name="Foo",
data="qwerty",
),
stix2.v21.WindowsRegistryValueType(
name="Bar",
data="42",
),
],
)
assert wrko.id == "windows-registry-key--36594eba-bcc7-5014-9835-0e154264e588"

View File

@ -3,7 +3,10 @@ import datetime
import pytest import pytest
import stix2 import stix2
import stix2.exceptions
import stix2.utils import stix2.utils
import stix2.v21
import stix2.versioning
from .constants import CAMPAIGN_MORE_KWARGS from .constants import CAMPAIGN_MORE_KWARGS
@ -151,7 +154,7 @@ def test_versioning_error_revoke_of_revoked():
def test_making_new_version_dict(): def test_making_new_version_dict():
campaign_v1 = CAMPAIGN_MORE_KWARGS campaign_v1 = CAMPAIGN_MORE_KWARGS
campaign_v2 = stix2.utils.new_version(CAMPAIGN_MORE_KWARGS, name="fred") campaign_v2 = stix2.versioning.new_version(CAMPAIGN_MORE_KWARGS, name="fred")
assert campaign_v1['id'] == campaign_v2['id'] assert campaign_v1['id'] == campaign_v2['id']
assert campaign_v1['spec_version'] == campaign_v2['spec_version'] assert campaign_v1['spec_version'] == campaign_v2['spec_version']
@ -165,7 +168,7 @@ def test_making_new_version_dict():
def test_versioning_error_dict_bad_modified_value(): def test_versioning_error_dict_bad_modified_value():
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo: with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.utils.new_version(CAMPAIGN_MORE_KWARGS, modified="2015-04-06T20:03:00.000Z") stix2.versioning.new_version(CAMPAIGN_MORE_KWARGS, modified="2015-04-06T20:03:00.000Z")
assert excinfo.value.cls == dict assert excinfo.value.cls == dict
assert excinfo.value.prop_name == "modified" assert excinfo.value.prop_name == "modified"
@ -181,7 +184,7 @@ def test_versioning_error_dict_no_modified_value():
'created': "2016-04-06T20:03:00.000Z", 'created': "2016-04-06T20:03:00.000Z",
'name': "Green Group Attacks Against Finance", 'name': "Green Group Attacks Against Finance",
} }
campaign_v2 = stix2.utils.new_version(campaign_v1, modified="2017-04-06T20:03:00.000Z") campaign_v2 = stix2.versioning.new_version(campaign_v1, modified="2017-04-06T20:03:00.000Z")
assert str(campaign_v2['modified']) == "2017-04-06T20:03:00.000Z" assert str(campaign_v2['modified']) == "2017-04-06T20:03:00.000Z"
@ -189,14 +192,14 @@ def test_versioning_error_dict_no_modified_value():
def test_making_new_version_invalid_cls(): def test_making_new_version_invalid_cls():
campaign_v1 = "This is a campaign." campaign_v1 = "This is a campaign."
with pytest.raises(ValueError) as excinfo: with pytest.raises(ValueError) as excinfo:
stix2.utils.new_version(campaign_v1, name="fred") stix2.versioning.new_version(campaign_v1, name="fred")
assert 'cannot create new version of object of this type' in str(excinfo.value) assert 'cannot create new version of object of this type' in str(excinfo.value)
def test_revoke_dict(): def test_revoke_dict():
campaign_v1 = CAMPAIGN_MORE_KWARGS campaign_v1 = CAMPAIGN_MORE_KWARGS
campaign_v2 = stix2.utils.revoke(campaign_v1) campaign_v2 = stix2.versioning.revoke(campaign_v1)
assert campaign_v1['id'] == campaign_v2['id'] assert campaign_v1['id'] == campaign_v2['id']
assert campaign_v1['spec_version'] == campaign_v2['spec_version'] assert campaign_v1['spec_version'] == campaign_v2['spec_version']
@ -209,12 +212,18 @@ def test_revoke_dict():
assert campaign_v2['revoked'] assert campaign_v2['revoked']
def test_revoke_unversionable():
sco = stix2.v21.File(name="data.txt")
with pytest.raises(ValueError):
sco.revoke()
def test_versioning_error_revoke_of_revoked_dict(): def test_versioning_error_revoke_of_revoked_dict():
campaign_v1 = CAMPAIGN_MORE_KWARGS campaign_v1 = CAMPAIGN_MORE_KWARGS
campaign_v2 = stix2.utils.revoke(campaign_v1) campaign_v2 = stix2.versioning.revoke(campaign_v1)
with pytest.raises(stix2.exceptions.RevokeError) as excinfo: with pytest.raises(stix2.exceptions.RevokeError) as excinfo:
stix2.utils.revoke(campaign_v2) stix2.versioning.revoke(campaign_v2)
assert excinfo.value.called_by == "revoke" assert excinfo.value.called_by == "revoke"
@ -222,7 +231,7 @@ def test_versioning_error_revoke_of_revoked_dict():
def test_revoke_invalid_cls(): def test_revoke_invalid_cls():
campaign_v1 = "This is a campaign." campaign_v1 = "This is a campaign."
with pytest.raises(ValueError) as excinfo: with pytest.raises(ValueError) as excinfo:
stix2.utils.revoke(campaign_v1) stix2.versioning.revoke(campaign_v1)
assert 'cannot revoke object of this type' in str(excinfo.value) assert 'cannot revoke object of this type' in str(excinfo.value)
@ -236,7 +245,7 @@ def test_remove_custom_stix_property():
is_family=False, is_family=False,
) )
mal_nc = stix2.utils.remove_custom_stix(mal) mal_nc = stix2.versioning.remove_custom_stix(mal)
assert "x_custom" not in mal_nc assert "x_custom" not in mal_nc
assert mal["modified"] < mal_nc["modified"] assert mal["modified"] < mal_nc["modified"]
@ -254,14 +263,14 @@ def test_remove_custom_stix_object():
animal = Animal(species="lion", animal_class="mammal") animal = Animal(species="lion", animal_class="mammal")
nc = stix2.utils.remove_custom_stix(animal) nc = stix2.versioning.remove_custom_stix(animal)
assert nc is None assert nc is None
def test_remove_custom_stix_no_custom(): def test_remove_custom_stix_no_custom():
campaign_v1 = stix2.v21.Campaign(**CAMPAIGN_MORE_KWARGS) campaign_v1 = stix2.v21.Campaign(**CAMPAIGN_MORE_KWARGS)
campaign_v2 = stix2.utils.remove_custom_stix(campaign_v1) campaign_v2 = stix2.versioning.remove_custom_stix(campaign_v1)
assert len(campaign_v1.keys()) == len(campaign_v2.keys()) assert len(campaign_v1.keys()) == len(campaign_v2.keys())
assert campaign_v1.id == campaign_v2.id assert campaign_v1.id == campaign_v2.id
@ -294,5 +303,96 @@ def test_fudge_modified(old, candidate_new, expected_new, use_stix21):
expected_new, "%Y-%m-%dT%H:%M:%S.%fZ", expected_new, "%Y-%m-%dT%H:%M:%S.%fZ",
) )
fudged = stix2.utils._fudge_modified(old_dt, candidate_new_dt, use_stix21) fudged = stix2.versioning._fudge_modified(
old_dt, candidate_new_dt, use_stix21,
)
assert fudged == expected_new_dt assert fudged == expected_new_dt
def test_version_unversionable_dict():
f = {
"type": "file",
"id": "file--4efb5217-e987-4438-9a1b-c800099401df",
"name": "data.txt",
}
with pytest.raises(ValueError):
stix2.versioning.new_version(f)
def test_version_sco_with_custom():
"""
If we add custom properties named like versioning properties to an object
type which is otherwise unversionable, versioning should start working.
"""
file_sco_obj = stix2.v21.File(
name="data.txt",
created="1973-11-23T02:31:37Z",
modified="1991-05-13T19:24:57Z",
revoked=False,
allow_custom=True,
)
new_file_sco_obj = stix2.versioning.new_version(
file_sco_obj, size=1234,
)
assert new_file_sco_obj.size == 1234
revoked_obj = stix2.versioning.revoke(new_file_sco_obj)
assert revoked_obj.revoked
def test_version_disable_custom():
m = stix2.v21.Malware(
name="foo", description="Steals your identity!", is_family=False,
x_custom=123, allow_custom=True,
)
# Remove the custom property, and disallow custom properties in the
# resulting object.
m2 = stix2.versioning.new_version(m, x_custom=None, allow_custom=False)
assert "x_custom" not in m2
# Remove a regular property and leave the custom one, disallow custom
# properties, and make sure we get an error.
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
stix2.versioning.new_version(m, description=None, allow_custom=False)
def test_version_enable_custom():
m = stix2.v21.Malware(
name="foo", description="Steals your identity!", is_family=False,
)
# Add a custom property to an object for which it was previously disallowed
m2 = stix2.versioning.new_version(m, x_custom=123, allow_custom=True)
assert "x_custom" in m2
# Add a custom property without enabling it, make sure we get an error
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
stix2.versioning.new_version(m, x_custom=123, allow_custom=False)
def test_version_propagate_custom():
m = stix2.v21.Malware(
name="foo", is_family=False,
)
# Remember custom-not-allowed setting from original; produce error
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
stix2.versioning.new_version(m, x_custom=123)
m2 = stix2.versioning.new_version(m, description="Steals your identity!")
assert "description" in m2
assert m2.description == "Steals your identity!"
m_custom = stix2.v21.Malware(
name="foo", is_family=False, x_custom=123, allow_custom=True,
)
# Remember custom-allowed setting from original; should work
m2_custom = stix2.versioning.new_version(m_custom, x_other_custom="abc")
assert "x_other_custom" in m2_custom
assert m2_custom.x_other_custom == "abc"

View File

@ -1,10 +1,5 @@
"""Utility functions and classes for the STIX2 library.""" """Utility functions and classes for the STIX2 library."""
try:
from collections.abc import Mapping
except ImportError:
from collections import Mapping
import copy
import datetime as dt import datetime as dt
import enum import enum
import json import json
@ -15,20 +10,11 @@ import six
import stix2 import stix2
from .exceptions import (
InvalidValueError, RevokeError, UnmodifiablePropertyError,
)
# Sentinel value for properties that should be set to the current time. # Sentinel value for properties that should be set to the current time.
# We can't use the standard 'default' approach, since if there are multiple # We can't use the standard 'default' approach, since if there are multiple
# timestamps in a single object, the timestamps will vary by a few microseconds. # timestamps in a single object, the timestamps will vary by a few microseconds.
NOW = object() NOW = object()
# STIX object properties that cannot be modified
STIX_UNMOD_PROPERTIES = ['created', 'created_by_ref', 'id', 'type']
TYPE_REGEX = re.compile(r'^\-?[a-z0-9]+(-[a-z0-9]+)*\-?$')
TYPE_21_REGEX = re.compile(r'^([a-z][a-z0-9]*)+(-[a-z0-9]+)*\-?$')
PREFIX_21_REGEX = re.compile(r'^[a-z].*') PREFIX_21_REGEX = re.compile(r'^[a-z].*')
_TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%SZ" _TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
@ -389,121 +375,6 @@ def find_property_index(obj, search_key, search_value):
return idx return idx
def _fudge_modified(old_modified, new_modified, use_stix21):
"""
Ensures a new modified timestamp is newer than the old. When they are
too close together, new_modified must be pushed further ahead to ensure
it is distinct and later, after JSON serialization (which may mean it's
actually being pushed a little ways into the future). JSON serialization
can remove precision, which can cause distinct timestamps to accidentally
become equal, if we're not careful.
:param old_modified: A previous "modified" timestamp, as a datetime object
:param new_modified: A candidate new "modified" timestamp, as a datetime
object
:param use_stix21: Whether to use STIX 2.1+ versioning timestamp precision
rules (boolean). This is important so that we are aware of how
timestamp precision will be truncated, so we know how close together
the timestamps can be, and how far ahead to potentially push the new
one.
:return: A suitable new "modified" timestamp. This may be different from
what was passed in, if it had to be pushed ahead.
"""
if use_stix21:
# 2.1+: we can use full precision
if new_modified <= old_modified:
new_modified = old_modified + dt.timedelta(microseconds=1)
else:
# 2.0: we must use millisecond precision
one_ms = dt.timedelta(milliseconds=1)
if new_modified - old_modified < one_ms:
new_modified = old_modified + one_ms
return new_modified
def new_version(data, **kwargs):
"""Create a new version of a STIX object, by modifying properties and
updating the ``modified`` property.
"""
if not isinstance(data, Mapping):
raise ValueError(
"cannot create new version of object of this type! "
"Try a dictionary or instance of an SDO or SRO class.",
)
unchangable_properties = []
if data.get('revoked'):
raise RevokeError("new_version")
try:
new_obj_inner = copy.deepcopy(data._inner)
except AttributeError:
new_obj_inner = copy.deepcopy(data)
properties_to_change = kwargs.keys()
# Make sure certain properties aren't trying to change
for prop in STIX_UNMOD_PROPERTIES:
if prop in properties_to_change:
unchangable_properties.append(prop)
if unchangable_properties:
raise UnmodifiablePropertyError(unchangable_properties)
# Different versioning precision rules in STIX 2.0 vs 2.1, so we need
# to know which rules to apply.
is_21 = "spec_version" in data
precision_constraint = "min" if is_21 else "exact"
cls = type(data)
if 'modified' not in kwargs:
old_modified = parse_into_datetime(
data["modified"], precision="millisecond",
precision_constraint=precision_constraint,
)
new_modified = get_timestamp()
new_modified = _fudge_modified(old_modified, new_modified, is_21)
kwargs['modified'] = new_modified
elif 'modified' in data:
old_modified_property = parse_into_datetime(
data.get('modified'), precision='millisecond',
precision_constraint=precision_constraint,
)
new_modified_property = parse_into_datetime(
kwargs['modified'], precision='millisecond',
precision_constraint=precision_constraint,
)
if new_modified_property <= old_modified_property:
raise InvalidValueError(
cls, 'modified',
"The new modified datetime cannot be before than or equal to the current modified datetime."
"It cannot be equal, as according to STIX 2 specification, objects that are different "
"but have the same id and modified timestamp do not have defined consumer behavior.",
)
new_obj_inner.update(kwargs)
# Exclude properties with a value of 'None' in case data is not an instance of a _STIXBase subclass
return cls(**{k: v for k, v in new_obj_inner.items() if v is not None})
def revoke(data):
"""Revoke a STIX object.
Returns:
A new version of the object with ``revoked`` set to ``True``.
"""
if not isinstance(data, Mapping):
raise ValueError(
"cannot revoke object of this type! Try a dictionary "
"or instance of an SDO or SRO class.",
)
if data.get('revoked'):
raise RevokeError("revoke")
return new_version(data, revoked=True, allow_custom=True)
def get_class_hierarchy_names(obj): def get_class_hierarchy_names(obj):
"""Given an object, return the names of the class hierarchy.""" """Given an object, return the names of the class hierarchy."""
names = [] names = []
@ -512,64 +383,6 @@ def get_class_hierarchy_names(obj):
return names return names
def remove_custom_stix(stix_obj):
"""Remove any custom STIX objects or properties.
Warnings:
This function is a best effort utility, in that it will remove custom
objects and properties based on the type names; i.e. if "x-" prefixes
object types, and "x\\_" prefixes property types. According to the
STIX2 spec, those naming conventions are a SHOULDs not MUSTs, meaning
that valid custom STIX content may ignore those conventions and in
effect render this utility function invalid when used on that STIX
content.
Args:
stix_obj (dict OR python-stix obj): a single python-stix object
or dict of a STIX object
Returns:
A new version of the object with any custom content removed
"""
if stix_obj['type'].startswith('x-'):
# if entire object is custom, discard
return None
custom_props = []
for prop in stix_obj.items():
if prop[0].startswith('x_'):
# for every custom property, record it and set value to None
# (so we can pass it to new_version() and it will be dropped)
custom_props.append((prop[0], None))
if custom_props:
# obtain set of object properties that can be transferred
# to a new object version. This is 1)custom props with their
# values set to None, and 2)any properties left that are not
# unmodifiable STIX properties or the "modified" property
# set of properties that are not supplied to new_version()
# to be used for updating properties. This includes unmodifiable
# properties (properties that new_version() just re-uses from the
# existing STIX object) and the "modified" property. We dont supply the
# "modified" property so that new_version() creates a new datetime
# value for this property
non_supplied_props = STIX_UNMOD_PROPERTIES + ['modified']
props = [(prop, stix_obj[prop]) for prop in stix_obj if prop not in non_supplied_props]
# add to set the custom properties we want to get rid of (with their value=None)
props.extend(custom_props)
new_obj = new_version(stix_obj, **(dict(props)))
return new_obj
else:
return stix_obj
def get_type_from_id(stix_id): def get_type_from_id(stix_id):
return stix_id.split('--', 1)[0] return stix_id.split('--', 1)[0]

View File

@ -103,3 +103,33 @@ EXT_MAP = {
'unix-account-ext': UNIXAccountExt, 'unix-account-ext': UNIXAccountExt,
}, },
} }
# Ensure star-imports from this module get the right symbols. "base" is a
# known problem, since there are multiple modules with that name and one can
# accidentally overwrite another.
__all__ = """
Bundle,
TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, CustomMarking, ExternalReference,
GranularMarking, KillChainPhase, MarkingDefinition, StatementMarking,
TLPMarking,
URL, AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem,
CustomExtension, CustomObservable, Directory, DomainName, EmailAddress,
EmailMessage, EmailMIMEComponent, File, HTTPRequestExt, ICMPExt,
IPv4Address, IPv6Address, MACAddress, Mutex, NetworkTraffic, NTFSExt,
PDFExt, Process, RasterImageExt, SocketExt, Software, TCPExt,
UNIXAccountExt, UserAccount, WindowsPEBinaryExt,
WindowsPEOptionalHeaderType, WindowsPESection, WindowsProcessExt,
WindowsRegistryKey, WindowsRegistryValueType, WindowsServiceExt,
X509Certificate, X509V3ExtenstionsType,
AttackPattern, Campaign, CourseOfAction, CustomObject, Identity, Indicator,
IntrusionSet, Malware, ObservedData, Report, ThreatActor, Tool,
Vulnerability,
Relationship, Sighting,
OBJ_MAP, OBJ_MAP_OBSERVABLE, EXT_MAP
""".replace(",", " ").split()

View File

@ -111,3 +111,34 @@ EXT_MAP = {
'unix-account-ext': UNIXAccountExt, 'unix-account-ext': UNIXAccountExt,
}, },
} }
# Ensure star-imports from this module get the right symbols. "base" is a
# known problem, since there are multiple modules with that name and one can
# accidentally overwrite another.
__all__ = """
Bundle,
TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, CustomMarking, ExternalReference,
GranularMarking, KillChainPhase, LanguageContent, MarkingDefinition,
StatementMarking, TLPMarking,
URL, AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem,
CustomExtension, CustomObservable, Directory, DomainName, EmailAddress,
EmailMessage, EmailMIMEComponent, File, HTTPRequestExt, ICMPExt,
IPv4Address, IPv6Address, MACAddress, Mutex, NetworkTraffic, NTFSExt,
PDFExt, Process, RasterImageExt, SocketExt, Software, TCPExt,
UNIXAccountExt, UserAccount, WindowsPEBinaryExt,
WindowsPEOptionalHeaderType, WindowsPESection, WindowsProcessExt,
WindowsRegistryKey, WindowsRegistryValueType, WindowsServiceExt,
X509Certificate, X509V3ExtenstionsType,
AttackPattern, Campaign, CourseOfAction, CustomObject, Grouping, Identity,
Indicator, Infrastructure, IntrusionSet, Location, Malware,
MalwareAnalysis, Note, ObservedData, Opinion, Report, ThreatActor, Tool,
Vulnerability,
Relationship, Sighting,
OBJ_MAP, OBJ_MAP_OBSERVABLE, EXT_MAP
""".replace(",", " ").split()

View File

@ -14,8 +14,7 @@ from ..properties import (
BinaryProperty, BooleanProperty, DictionaryProperty, BinaryProperty, BooleanProperty, DictionaryProperty,
EmbeddedObjectProperty, EnumProperty, ExtensionsProperty, FloatProperty, EmbeddedObjectProperty, EnumProperty, ExtensionsProperty, FloatProperty,
HashesProperty, HexProperty, IDProperty, IntegerProperty, ListProperty, HashesProperty, HexProperty, IDProperty, IntegerProperty, ListProperty,
ObjectReferenceProperty, ReferenceProperty, StringProperty, ReferenceProperty, StringProperty, TimestampProperty, TypeProperty,
TimestampProperty, TypeProperty,
) )
from .base import _Extension, _Observable, _STIXBase21 from .base import _Extension, _Observable, _STIXBase21
from .common import GranularMarking from .common import GranularMarking
@ -144,7 +143,7 @@ class EmailMIMEComponent(_STIXBase21):
_properties = OrderedDict([ _properties = OrderedDict([
('body', StringProperty()), ('body', StringProperty()),
('body_raw_ref', ObjectReferenceProperty(valid_types=['artifact', 'file'])), ('body_raw_ref', ReferenceProperty(valid_types=['artifact', 'file'], spec_version="2.1")),
('content_type', StringProperty()), ('content_type', StringProperty()),
('content_disposition', StringProperty()), ('content_disposition', StringProperty()),
]) ])
@ -201,7 +200,7 @@ class ArchiveExt(_Extension):
_type = 'archive-ext' _type = 'archive-ext'
_properties = OrderedDict([ _properties = OrderedDict([
('contains_refs', ListProperty(ObjectReferenceProperty(valid_types=['file', 'directory']), required=True)), ('contains_refs', ListProperty(ReferenceProperty(valid_types=['file', 'directory'], spec_version="2.1"), required=True)),
('comment', StringProperty()), ('comment', StringProperty()),
]) ])
@ -465,7 +464,7 @@ class HTTPRequestExt(_Extension):
('request_version', StringProperty()), ('request_version', StringProperty()),
('request_header', DictionaryProperty(spec_version='2.1')), ('request_header', DictionaryProperty(spec_version='2.1')),
('message_body_length', IntegerProperty()), ('message_body_length', IntegerProperty()),
('message_body_data_ref', ObjectReferenceProperty(valid_types='artifact')), ('message_body_data_ref', ReferenceProperty(valid_types='artifact', spec_version="2.1")),
]) ])
@ -654,7 +653,7 @@ class WindowsServiceExt(_Extension):
"SERVICE_SYSTEM_ALERT", "SERVICE_SYSTEM_ALERT",
]), ]),
), ),
('service_dll_refs', ListProperty(ObjectReferenceProperty(valid_types='file'))), ('service_dll_refs', ListProperty(ReferenceProperty(valid_types='file', spec_version="2.1"))),
( (
'service_type', EnumProperty(allowed=[ 'service_type', EnumProperty(allowed=[
"SERVICE_KERNEL_DRIVER", "SERVICE_KERNEL_DRIVER",

275
stix2/versioning.py Normal file
View File

@ -0,0 +1,275 @@
import copy
import datetime as dt
import itertools
import uuid
import six
from six.moves.collections_abc import Mapping
import stix2.base
from stix2.utils import get_timestamp, parse_into_datetime
import stix2.v20
from .exceptions import (
InvalidValueError, RevokeError, UnmodifiablePropertyError,
)
# STIX object properties that cannot be modified
STIX_UNMOD_PROPERTIES = ['created', 'created_by_ref', 'id', 'type']
_VERSIONING_PROPERTIES = {"created", "modified", "revoked"}
def _fudge_modified(old_modified, new_modified, use_stix21):
"""
Ensures a new modified timestamp is newer than the old. When they are
too close together, new_modified must be pushed further ahead to ensure
it is distinct and later, after JSON serialization (which may mean it's
actually being pushed a little ways into the future). JSON serialization
can remove precision, which can cause distinct timestamps to accidentally
become equal, if we're not careful.
:param old_modified: A previous "modified" timestamp, as a datetime object
:param new_modified: A candidate new "modified" timestamp, as a datetime
object
:param use_stix21: Whether to use STIX 2.1+ versioning timestamp precision
rules (boolean). This is important so that we are aware of how
timestamp precision will be truncated, so we know how close together
the timestamps can be, and how far ahead to potentially push the new
one.
:return: A suitable new "modified" timestamp. This may be different from
what was passed in, if it had to be pushed ahead.
"""
if use_stix21:
# 2.1+: we can use full precision
if new_modified <= old_modified:
new_modified = old_modified + dt.timedelta(microseconds=1)
else:
# 2.0: we must use millisecond precision
one_ms = dt.timedelta(milliseconds=1)
if new_modified - old_modified < one_ms:
new_modified = old_modified + one_ms
return new_modified
def _is_versionable(data):
"""
Determine whether the given object is versionable. This check is done on
the basis of support for three properties for the object type: "created",
"modified", and "revoked". If all three are supported, the object is
versionable; otherwise it is not. Dicts must have a "type" property whose
value is for a registered object type. This is used to determine a
complete set of supported properties for the type.
Also, detect whether it represents a STIX 2.1 or greater spec version.
:param data: The object to check. Must be either a stix object, or a dict
with a "type" property.
:return: A 2-tuple of bools: the first is True if the object is versionable
and False if not; the second is True if the object is STIX 2.1+ and
False if not.
"""
is_versionable = False
is_21 = False
stix_vid = None
if isinstance(data, Mapping):
# First, determine spec version. It's easy for our stix2 objects; more
# work for dicts.
is_21 = False
if isinstance(data, stix2.base._STIXBase) and \
not isinstance(data, stix2.v20._STIXBase20):
# (is_21 means 2.1 or later; try not to be 2.1-specific)
is_21 = True
elif isinstance(data, dict):
stix_vid = stix2.parsing._detect_spec_version(data)
is_21 = stix_vid != "v20"
# Then, determine versionability.
if six.PY2:
# dumb python2 compatibility: map.keys() returns a list, not a set!
# six.viewkeys() compatibility function uses dict.viewkeys() on
# python2, which is not a Mapping mixin method, so that doesn't
# work either (for our stix2 objects).
keys = set(data)
else:
keys = data.keys()
# This should be sufficient for STIX objects; maybe we get lucky with
# dicts here but probably not.
if keys >= _VERSIONING_PROPERTIES:
is_versionable = True
# Tougher to handle dicts. We need to consider STIX version, map to a
# registered class, and from that get a more complete picture of its
# properties.
elif isinstance(data, dict):
class_maps = stix2.parsing.STIX2_OBJ_MAPS[stix_vid]
obj_type = data["type"]
if obj_type in class_maps["objects"]:
# Should we bother checking properties for SDOs/SROs?
# They were designed to be versionable.
is_versionable = True
elif obj_type in class_maps["observables"]:
# but do check SCOs
cls = class_maps["observables"][obj_type]
is_versionable = _VERSIONING_PROPERTIES.issubset(
cls._properties,
)
return is_versionable, is_21
def new_version(data, allow_custom=None, **kwargs):
"""
Create a new version of a STIX object, by modifying properties and
updating the ``modified`` property.
:param data: The object to create a new version of. Maybe a stix2 object
or dict.
:param allow_custom: Whether to allow custom properties on the new object.
If True, allow them (regardless of whether the original had custom
properties); if False disallow them; if None, propagate the preference
from the original object.
:param kwargs: The properties to change. Setting to None requests property
removal.
:return: The new object.
"""
is_versionable, is_21 = _is_versionable(data)
if not is_versionable:
raise ValueError(
"cannot create new version of object of this type! "
"Try a dictionary or instance of an SDO or SRO class.",
)
if data.get('revoked'):
raise RevokeError("new_version")
try:
new_obj_inner = copy.deepcopy(data._inner)
except AttributeError:
new_obj_inner = copy.deepcopy(data)
# Make sure certain properties aren't trying to change
# ID contributing properties of 2.1+ SCOs may also not change if a UUIDv5
# is in use (depending on whether they were used to create it... but they
# probably were). That would imply an ID change, which is not allowed
# across versions.
sco_locked_props = []
if is_21 and isinstance(data, stix2.base._Observable):
uuid_ = uuid.UUID(data["id"][-36:])
if uuid_.variant == uuid.RFC_4122 and uuid_.version == 5:
sco_locked_props = data._id_contributing_properties
unchangable_properties = set()
for prop in itertools.chain(STIX_UNMOD_PROPERTIES, sco_locked_props):
if prop in kwargs:
unchangable_properties.add(prop)
if unchangable_properties:
raise UnmodifiablePropertyError(unchangable_properties)
# Different versioning precision rules in STIX 2.0 vs 2.1, so we need
# to know which rules to apply.
precision_constraint = "min" if is_21 else "exact"
cls = type(data)
if 'modified' not in kwargs:
old_modified = parse_into_datetime(
data["modified"], precision="millisecond",
precision_constraint=precision_constraint,
)
new_modified = get_timestamp()
new_modified = _fudge_modified(old_modified, new_modified, is_21)
kwargs['modified'] = new_modified
elif 'modified' in data:
old_modified_property = parse_into_datetime(
data.get('modified'), precision='millisecond',
precision_constraint=precision_constraint,
)
new_modified_property = parse_into_datetime(
kwargs['modified'], precision='millisecond',
precision_constraint=precision_constraint,
)
if new_modified_property <= old_modified_property:
raise InvalidValueError(
cls, 'modified',
"The new modified datetime cannot be before than or equal to the current modified datetime."
"It cannot be equal, as according to STIX 2 specification, objects that are different "
"but have the same id and modified timestamp do not have defined consumer behavior.",
)
new_obj_inner.update(kwargs)
# Set allow_custom appropriately if versioning an object. We will ignore
# it for dicts.
if isinstance(data, stix2.base._STIXBase):
if allow_custom is None:
new_obj_inner["allow_custom"] = data._allow_custom
else:
new_obj_inner["allow_custom"] = allow_custom
# Exclude properties with a value of 'None' in case data is not an instance of a _STIXBase subclass
return cls(**{k: v for k, v in new_obj_inner.items() if v is not None})
def revoke(data):
"""Revoke a STIX object.
Returns:
A new version of the object with ``revoked`` set to ``True``.
"""
if not isinstance(data, Mapping):
raise ValueError(
"cannot revoke object of this type! Try a dictionary "
"or instance of an SDO or SRO class.",
)
if data.get('revoked'):
raise RevokeError("revoke")
return new_version(data, revoked=True)
def remove_custom_stix(stix_obj):
"""Remove any custom STIX objects or properties.
Warnings:
This function is a best effort utility, in that it will remove custom
objects and properties based on the type names; i.e. if "x-" prefixes
object types, and "x\\_" prefixes property types. According to the
STIX2 spec, those naming conventions are a SHOULDs not MUSTs, meaning
that valid custom STIX content may ignore those conventions and in
effect render this utility function invalid when used on that STIX
content.
Args:
stix_obj (dict OR python-stix obj): a single python-stix object
or dict of a STIX object
Returns:
A new version of the object with any custom content removed
"""
if stix_obj['type'].startswith('x-'):
# if entire object is custom, discard
return None
custom_props = {
k: None
for k in stix_obj if k.startswith("x_")
}
if custom_props:
new_obj = new_version(stix_obj, allow_custom=False, **custom_props)
return new_obj
else:
return stix_obj