diff --git a/stix2/__init__.py b/stix2/__init__.py index f3e02b4..26eed6f 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -53,8 +53,8 @@ from .patterns import ( RepeatQualifier, StartStopQualifier, StringConstant, TimestampConstant, WithinQualifier, ) -from .utils import new_version, revoke from .v20 import * # This import will always be the latest STIX 2.X version from .version import __version__ +from .versioning import new_version, revoke _collect_stix2_mappings() diff --git a/stix2/base.py b/stix2/base.py index 645064a..bd5ce55 100644 --- a/stix2/base.py +++ b/stix2/base.py @@ -21,8 +21,8 @@ from .markings.utils import validate from .utils import ( NOW, PREFIX_21_REGEX, find_property_index, format_datetime, get_timestamp, ) -from .utils import new_version as _new_version -from .utils import revoke as _revoke +from .versioning import new_version as _new_version +from .versioning import revoke as _revoke try: from collections.abc import Mapping @@ -351,24 +351,21 @@ class _Observable(_STIXBase): def __init__(self, **kwargs): # the constructor might be called independently of an observed data object self._STIXBase__valid_refs = kwargs.pop('_valid_refs', []) - - self._allow_custom = kwargs.get('allow_custom', False) self._properties['extensions'].allow_custom = kwargs.get('allow_custom', False) - - try: - # Since `spec_version` is optional, this is how we check for a 2.1 SCO - self._id_contributing_properties - - if 'id' not in kwargs: - possible_id = self._generate_id(kwargs) - if possible_id is not None: - kwargs['id'] = possible_id - except AttributeError: - # End up here if handling a 2.0 SCO, and don't need to do anything further - pass - super(_Observable, self).__init__(**kwargs) + if 'id' not in kwargs and not isinstance(self, stix2.v20._Observable): + # Specific to 2.1+ observables: generate a deterministic ID + id_ = self._generate_id() + + # Spec says fall back to UUIDv4 if no contributing properties were + # given. That's what already happened (the following is actually + # overwriting the default uuidv4), so nothing to do here. + if id_ is not None: + # Can't assign to self (we're immutable), so slip the ID in + # more sneakily. + self._inner["id"] = id_ + def _check_ref(self, ref, prop, prop_name): """ Only for checking `*_ref` or `*_refs` properties in spec_version 2.0 @@ -413,42 +410,53 @@ class _Observable(_STIXBase): for ref in kwargs[prop_name]: self._check_ref(ref, prop, prop_name) - def _generate_id(self, kwargs): - required_prefix = self._type + "--" + def _generate_id(self): + """ + Generate a UUIDv5 for this observable, using its "ID contributing + properties". - properties_to_use = self._id_contributing_properties - if properties_to_use: - streamlined_object = {} - if "hashes" in kwargs and "hashes" in properties_to_use: - possible_hash = _choose_one_hash(kwargs["hashes"]) - if possible_hash: - streamlined_object["hashes"] = possible_hash - for key in properties_to_use: - if key != "hashes" and key in kwargs: - if isinstance(kwargs[key], dict) or isinstance(kwargs[key], _STIXBase): - temp_deep_copy = copy.deepcopy(dict(kwargs[key])) - _recursive_stix_to_dict(temp_deep_copy) - streamlined_object[key] = temp_deep_copy - elif isinstance(kwargs[key], list): - temp_deep_copy = copy.deepcopy(kwargs[key]) - _recursive_stix_list_to_dict(temp_deep_copy) - streamlined_object[key] = temp_deep_copy - else: - streamlined_object[key] = kwargs[key] - if streamlined_object: - data = canonicalize(streamlined_object, utf8=False) + :return: The ID, or None if no ID contributing properties are set + """ + + id_ = None + json_serializable_object = {} + + for key in self._id_contributing_properties: + + if key in self: + obj_value = self[key] + + if key == "hashes": + serializable_value = _choose_one_hash(obj_value) + + if serializable_value is None: + raise InvalidValueError( + self, key, "No hashes given", + ) - # The situation is complicated w.r.t. python 2/3 behavior, so - # I'd rather not rely on particular exceptions being raised to - # determine what to do. Better to just check the python version - # directly. - if six.PY3: - return required_prefix + six.text_type(uuid.uuid5(SCO_DET_ID_NAMESPACE, data)) else: - return required_prefix + six.text_type(uuid.uuid5(SCO_DET_ID_NAMESPACE, data.encode("utf-8"))) + serializable_value = _make_json_serializable(obj_value) - # We return None if there are no values specified for any of the id-contributing-properties - return None + json_serializable_object[key] = serializable_value + + if json_serializable_object: + + data = canonicalize(json_serializable_object, utf8=False) + + # The situation is complicated w.r.t. python 2/3 behavior, so + # I'd rather not rely on particular exceptions being raised to + # determine what to do. Better to just check the python version + # directly. + if six.PY3: + uuid_ = uuid.uuid5(SCO_DET_ID_NAMESPACE, data) + else: + uuid_ = uuid.uuid5( + SCO_DET_ID_NAMESPACE, data.encode("utf-8"), + ) + + id_ = "{}--{}".format(self._type, six.text_type(uuid_)) + + return id_ class _Extension(_STIXBase): @@ -472,35 +480,102 @@ def _choose_one_hash(hash_dict): if k is not None: return {k: hash_dict[k]} + return None + def _cls_init(cls, obj, kwargs): if getattr(cls, '__init__', object.__init__) is not object.__init__: cls.__init__(obj, **kwargs) -def _recursive_stix_to_dict(input_dict): - for key in input_dict: - if isinstance(input_dict[key], dict): - _recursive_stix_to_dict(input_dict[key]) - elif isinstance(input_dict[key], _STIXBase): - input_dict[key] = dict(input_dict[key]) +def _make_json_serializable(value): + """ + Make the given value JSON-serializable; required for the JSON canonicalizer + to work. This recurses into lists/dicts, converts stix objects to dicts, + etc. "Convenience" types this library uses as property values are + JSON-serialized to produce a JSON-serializable value. (So you will always + get strings for those.) - # There may stil be nested _STIXBase objects - _recursive_stix_to_dict(input_dict[key]) - elif isinstance(input_dict[key], list): - _recursive_stix_list_to_dict(input_dict[key]) - else: - pass + The conversion will not affect the passed in value. + + :param value: The value to make JSON-serializable. + :return: The JSON-serializable value. + :raises ValueError: If value is None (since nulls are not allowed in STIX + objects). + """ + if value is None: + raise ValueError("Illegal null value found in a STIX object") + + json_value = value # default assumption + + if isinstance(value, Mapping): + json_value = { + k: _make_json_serializable(v) + for k, v in value.items() + } + + elif isinstance(value, list): + json_value = [ + _make_json_serializable(v) + for v in value + ] + + elif not isinstance(value, (int, float, six.string_types, bool)): + # If a "simple" value which is not already JSON-serializable, + # JSON-serialize to a string and use that as our JSON-serializable + # value. This applies to our datetime objects currently (timestamp + # properties), and could apply to any other "convenience" types this + # library uses for property values in the future. + json_value = json.dumps(value, ensure_ascii=False, cls=STIXJSONEncoder) + + # If it looks like a string literal was output, strip off the quotes. + # Otherwise, a second pair will be added when it's canonicalized. Also + # to be extra safe, we need to unescape. + if len(json_value) >= 2 and \ + json_value[0] == '"' and json_value[-1] == '"': + json_value = _un_json_escape(json_value[1:-1]) + + return json_value -def _recursive_stix_list_to_dict(input_list): - for i in range(len(input_list)): - if isinstance(input_list[i], _STIXBase): - input_list[i] = dict(input_list[i]) - elif isinstance(input_list[i], dict): - pass - elif isinstance(input_list[i], list): - _recursive_stix_list_to_dict(input_list[i]) - else: - continue - _recursive_stix_to_dict(input_list[i]) +_JSON_ESCAPE_RE = re.compile(r"\\.") +# I don't think I should need to worry about the unicode escapes (\uXXXX) +# since I use ensure_ascii=False when generating it. I will just fix all +# the other escapes, e.g. \n, \r, etc. +# +# This list is taken from RFC8259 section 7: +# https://tools.ietf.org/html/rfc8259#section-7 +# Maps the second char of a "\X" style escape, to a replacement char +_JSON_ESCAPE_MAP = { + '"': '"', + "\\": "\\", + "/": "/", + "b": "\b", + "f": "\f", + "n": "\n", + "r": "\r", + "t": "\t", +} + + +def _un_json_escape(json_string): + """ + Removes JSON string literal escapes. We should undo these things Python's + serializer does, so we can ensure they're done canonically. The + canonicalizer should be in charge of everything, as much as is feasible. + + :param json_string: String literal output of Python's JSON serializer, + minus the surrounding quotes. + :return: The unescaped string + """ + + def replace(m): + replacement = _JSON_ESCAPE_MAP.get(m.group(0)[1]) + if replacement is None: + raise ValueError("Unrecognized JSON escape: " + m.group(0)) + + return replacement + + result = _JSON_ESCAPE_RE.sub(replace, json_string) + + return result diff --git a/stix2/markings/granular_markings.py b/stix2/markings/granular_markings.py index 5456f83..5021ec8 100644 --- a/stix2/markings/granular_markings.py +++ b/stix2/markings/granular_markings.py @@ -2,7 +2,8 @@ from stix2 import exceptions from stix2.markings import utils -from stix2.utils import is_marking, new_version +from stix2.utils import is_marking +from stix2.versioning import new_version def get_markings(obj, selectors, inherited=False, descendants=False, marking_ref=True, lang=True): diff --git a/stix2/markings/object_markings.py b/stix2/markings/object_markings.py index dc85dfa..5d4e4f1 100644 --- a/stix2/markings/object_markings.py +++ b/stix2/markings/object_markings.py @@ -2,7 +2,7 @@ from stix2 import exceptions from stix2.markings import utils -from stix2.utils import new_version +from stix2.versioning import new_version def get_markings(obj): diff --git a/stix2/pattern_visitor.py b/stix2/pattern_visitor.py index 317ffa1..4ec2b20 100644 --- a/stix2/pattern_visitor.py +++ b/stix2/pattern_visitor.py @@ -40,6 +40,11 @@ def remove_terminal_nodes(parse_tree_nodes): return values +_TIMESTAMP_RE = re.compile(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d{1,6})?Z') + + +def check_for_valid_timetamp_syntax(timestamp_string): + return _TIMESTAMP_RE.match(timestamp_string) @@ -214,6 +219,14 @@ class STIXPatternVisitorForSTIX2(): # Visit a parse tree produced by STIXPatternParser#startStopQualifier. def visitStartStopQualifier(self, ctx): children = self.visitChildren(ctx) + # 2.0 parser will accept any string, need to make sure it is a full STIX timestamp + if isinstance(children[1], StringConstant): + if not check_for_valid_timetamp_syntax(children[1].value): + raise (ValueError("Start time is not a legal timestamp")) + if isinstance(children[3], StringConstant): + if not check_for_valid_timetamp_syntax(children[3].value): + raise (ValueError("Stop time is not a legal timestamp")) + return StartStopQualifier(children[1], children[3]) # Visit a parse tree produced by STIXPatternParser#withinQualifier. diff --git a/stix2/patterns.py b/stix2/patterns.py index 6592335..5d7c0a2 100644 --- a/stix2/patterns.py +++ b/stix2/patterns.py @@ -669,12 +669,16 @@ class StartStopQualifier(_ExpressionQualifier): self.start_time = start_time elif isinstance(start_time, datetime.date): self.start_time = TimestampConstant(start_time) + elif isinstance(start_time, StringConstant): + self.start_time = StringConstant(start_time.value) else: raise ValueError("%s is not a valid argument for a Start/Stop Qualifier" % start_time) if isinstance(stop_time, TimestampConstant): self.stop_time = stop_time elif isinstance(stop_time, datetime.date): self.stop_time = TimestampConstant(stop_time) + elif isinstance(stop_time, StringConstant): + self.stop_time = StringConstant(stop_time.value) else: raise ValueError("%s is not a valid argument for a Start/Stop Qualifier" % stop_time) diff --git a/stix2/properties.py b/stix2/properties.py index bce5202..df962dd 100644 --- a/stix2/properties.py +++ b/stix2/properties.py @@ -17,10 +17,7 @@ from .exceptions import ( MutuallyExclusivePropertiesError, ) from .parsing import STIX2_OBJ_MAPS, parse, parse_observable -from .utils import ( - TYPE_21_REGEX, TYPE_REGEX, _get_dict, get_class_hierarchy_names, - parse_into_datetime, -) +from .utils import _get_dict, get_class_hierarchy_names, parse_into_datetime ID_REGEX_interoperability = re.compile(r"[0-9a-fA-F]{8}-" "[0-9a-fA-F]{4}-" @@ -33,6 +30,8 @@ try: except ImportError: from collections import Mapping, defaultdict +TYPE_REGEX = re.compile(r'^\-?[a-z0-9]+(-[a-z0-9]+)*\-?$') +TYPE_21_REGEX = re.compile(r'^([a-z][a-z0-9]*)+(-[a-z0-9]+)*\-?$') ERROR_INVALID_ID = ( "not a valid STIX identifier, must match --: {}" ) @@ -547,7 +546,7 @@ def enumerate_types(types, spec_version): return return_types -SELECTOR_REGEX = re.compile(r"^[a-z0-9_-]{3,250}(\.(\[\d+\]|[a-z0-9_-]{1,250}))*$") +SELECTOR_REGEX = re.compile(r"^([a-z0-9_-]{3,250}(\.(\[\d+\]|[a-z0-9_-]{1,250}))*|id)$") class SelectorProperty(Property): diff --git a/stix2/test/v20/test_granular_markings.py b/stix2/test/v20/test_granular_markings.py index e912cc1..ae2da3b 100644 --- a/stix2/test/v20/test_granular_markings.py +++ b/stix2/test/v20/test_granular_markings.py @@ -1089,3 +1089,17 @@ def test_clear_marking_not_present(data): """Test clearing markings for a selector that has no associated markings.""" with pytest.raises(MarkingNotFoundError): data = markings.clear_markings(data, ["labels"]) + + +def test_set_marking_on_id_property(): + malware = Malware( + granular_markings=[ + { + "selectors": ["id"], + "marking_ref": MARKING_IDS[0], + }, + ], + **MALWARE_KWARGS + ) + + assert "id" in malware["granular_markings"][0]["selectors"] diff --git a/stix2/test/v20/test_object_markings.py b/stix2/test/v20/test_object_markings.py index 191f33a..6bd2269 100644 --- a/stix2/test/v20/test_object_markings.py +++ b/stix2/test/v20/test_object_markings.py @@ -14,6 +14,7 @@ from .constants import MARKING_IDS MALWARE_KWARGS = MALWARE_KWARGS_CONST.copy() MALWARE_KWARGS.update({ 'id': MALWARE_ID, + 'type': 'malware', 'created': FAKE_TIME, 'modified': FAKE_TIME, }) diff --git a/stix2/test/v20/test_pattern_expressions.py b/stix2/test/v20/test_pattern_expressions.py index d5cbb5b..a96d3b8 100644 --- a/stix2/test/v20/test_pattern_expressions.py +++ b/stix2/test/v20/test_pattern_expressions.py @@ -312,8 +312,8 @@ def test_set_op(): def test_timestamp(): - ts = stix2.TimestampConstant('2014-01-13T07:03:17Z') - assert str(ts) == "t'2014-01-13T07:03:17Z'" + ts = stix2.StringConstant('2014-01-13T07:03:17Z') + assert str(ts) == "'2014-01-13T07:03:17Z'" def test_boolean(): @@ -363,11 +363,6 @@ def test_invalid_integer_constant(): stix2.IntegerConstant('foo') -def test_invalid_timestamp_constant(): - with pytest.raises(ValueError): - stix2.TimestampConstant('foo') - - def test_invalid_float_constant(): with pytest.raises(ValueError): stix2.FloatConstant('foo') @@ -461,23 +456,23 @@ def test_invalid_within_qualifier(): def test_startstop_qualifier(): qual = stix2.StartStopQualifier( - stix2.TimestampConstant('2016-06-01T00:00:00Z'), - datetime.datetime(2017, 3, 12, 8, 30, 0), + stix2.StringConstant('2016-06-01T00:00:00Z'), + stix2.StringConstant('2017-03-12T08:30:00Z'), ) - assert str(qual) == "START t'2016-06-01T00:00:00Z' STOP t'2017-03-12T08:30:00Z'" + assert str(qual) == "START '2016-06-01T00:00:00Z' STOP '2017-03-12T08:30:00Z'" qual2 = stix2.StartStopQualifier( - datetime.date(2016, 6, 1), - stix2.TimestampConstant('2016-07-01T00:00:00Z'), + stix2.StringConstant("2016-06-01T00:00:00Z"), + stix2.StringConstant('2016-07-01T00:00:00Z'), ) - assert str(qual2) == "START t'2016-06-01T00:00:00Z' STOP t'2016-07-01T00:00:00Z'" + assert str(qual2) == "START '2016-06-01T00:00:00Z' STOP '2016-07-01T00:00:00Z'" def test_invalid_startstop_qualifier(): with pytest.raises(ValueError): stix2.StartStopQualifier( 'foo', - stix2.TimestampConstant('2016-06-01T00:00:00Z'), + stix2.StringConstant('2016-06-01T00:00:00Z'), ) with pytest.raises(ValueError): @@ -508,6 +503,19 @@ def test_parsing_qualified_expression(): ) == "[network-traffic:dst_ref.type = 'domain-name' AND network-traffic:dst_ref.value = 'example.com'] REPEATS 5 TIMES WITHIN 1800 SECONDS" +def test_parsing_start_stop_qualified_expression(): + patt_obj = create_pattern_object("[ipv4-addr:value = '1.2.3.4'] START '2016-06-01T00:00:00Z' STOP '2017-03-12T08:30:00Z'", version="2.0") + + assert str( + patt_obj, + ) == "[ipv4-addr:value = '1.2.3.4'] START '2016-06-01T00:00:00Z' STOP '2017-03-12T08:30:00Z'" + + +def test_parsing_illegal_start_stop_qualified_expression(): + with pytest.raises(ValueError): + create_pattern_object("[ipv4-addr:value = '1.2.3.4'] START '2016-06-01' STOP '2017-03-12T08:30:00Z'", version="2.0") + + def test_list_constant(): patt_obj = create_pattern_object("[network-traffic:src_ref.value IN ('10.0.0.0', '10.0.0.1', '10.0.0.2')]", version="2.0") assert str(patt_obj) == "[network-traffic:src_ref.value IN ('10.0.0.0', '10.0.0.1', '10.0.0.2')]" diff --git a/stix2/test/v20/test_versioning.py b/stix2/test/v20/test_versioning.py index 9974e42..03d43cc 100644 --- a/stix2/test/v20/test_versioning.py +++ b/stix2/test/v20/test_versioning.py @@ -1,6 +1,10 @@ import pytest import stix2 +import stix2.exceptions +import stix2.utils +import stix2.v20 +import stix2.versioning from .constants import CAMPAIGN_MORE_KWARGS @@ -142,7 +146,7 @@ def test_versioning_error_revoke_of_revoked(): def test_making_new_version_dict(): campaign_v1 = CAMPAIGN_MORE_KWARGS - campaign_v2 = stix2.utils.new_version(CAMPAIGN_MORE_KWARGS, name="fred") + campaign_v2 = stix2.versioning.new_version(CAMPAIGN_MORE_KWARGS, name="fred") assert campaign_v1['id'] == campaign_v2['id'] assert campaign_v1['created_by_ref'] == campaign_v2['created_by_ref'] @@ -155,7 +159,7 @@ def test_making_new_version_dict(): def test_versioning_error_dict_bad_modified_value(): with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo: - stix2.utils.new_version(CAMPAIGN_MORE_KWARGS, modified="2015-04-06T20:03:00.000Z") + stix2.versioning.new_version(CAMPAIGN_MORE_KWARGS, modified="2015-04-06T20:03:00.000Z") assert excinfo.value.cls == dict assert excinfo.value.prop_name == "modified" @@ -171,7 +175,7 @@ def test_versioning_error_dict_no_modified_value(): 'created': "2016-04-06T20:03:00.000Z", 'name': "Green Group Attacks Against Finance", } - campaign_v2 = stix2.utils.new_version(campaign_v1, modified="2017-04-06T20:03:00.000Z") + campaign_v2 = stix2.versioning.new_version(campaign_v1, modified="2017-04-06T20:03:00.000Z") assert str(campaign_v2['modified']) == "2017-04-06T20:03:00.000Z" @@ -179,14 +183,14 @@ def test_versioning_error_dict_no_modified_value(): def test_making_new_version_invalid_cls(): campaign_v1 = "This is a campaign." with pytest.raises(ValueError) as excinfo: - stix2.utils.new_version(campaign_v1, name="fred") + stix2.versioning.new_version(campaign_v1, name="fred") assert 'cannot create new version of object of this type' in str(excinfo.value) def test_revoke_dict(): campaign_v1 = CAMPAIGN_MORE_KWARGS - campaign_v2 = stix2.utils.revoke(campaign_v1) + campaign_v2 = stix2.versioning.revoke(campaign_v1) assert campaign_v1['id'] == campaign_v2['id'] assert campaign_v1['created_by_ref'] == campaign_v2['created_by_ref'] @@ -198,12 +202,18 @@ def test_revoke_dict(): assert campaign_v2['revoked'] +def test_revoke_unversionable(): + sco = stix2.v20.File(name="data.txt") + with pytest.raises(ValueError): + sco.revoke() + + def test_versioning_error_revoke_of_revoked_dict(): campaign_v1 = CAMPAIGN_MORE_KWARGS - campaign_v2 = stix2.utils.revoke(campaign_v1) + campaign_v2 = stix2.versioning.revoke(campaign_v1) with pytest.raises(stix2.exceptions.RevokeError) as excinfo: - stix2.utils.revoke(campaign_v2) + stix2.versioning.revoke(campaign_v2) assert excinfo.value.called_by == "revoke" @@ -211,7 +221,7 @@ def test_versioning_error_revoke_of_revoked_dict(): def test_revoke_invalid_cls(): campaign_v1 = "This is a campaign." with pytest.raises(ValueError) as excinfo: - stix2.utils.revoke(campaign_v1) + stix2.versioning.revoke(campaign_v1) assert 'cannot revoke object of this type' in str(excinfo.value) @@ -224,7 +234,7 @@ def test_remove_custom_stix_property(): allow_custom=True, ) - mal_nc = stix2.utils.remove_custom_stix(mal) + mal_nc = stix2.versioning.remove_custom_stix(mal) assert "x_custom" not in mal_nc assert (stix2.utils.parse_into_datetime(mal["modified"], precision="millisecond") < @@ -243,15 +253,136 @@ def test_remove_custom_stix_object(): animal = Animal(species="lion", animal_class="mammal") - nc = stix2.utils.remove_custom_stix(animal) + nc = stix2.versioning.remove_custom_stix(animal) assert nc is None def test_remove_custom_stix_no_custom(): campaign_v1 = stix2.v20.Campaign(**CAMPAIGN_MORE_KWARGS) - campaign_v2 = stix2.utils.remove_custom_stix(campaign_v1) + campaign_v2 = stix2.versioning.remove_custom_stix(campaign_v1) assert len(campaign_v1.keys()) == len(campaign_v2.keys()) assert campaign_v1.id == campaign_v2.id assert campaign_v1.description == campaign_v2.description + + +def test_version_unversionable_dict(): + f = { + "type": "file", + "name": "data.txt", + } + + with pytest.raises(ValueError): + stix2.versioning.new_version(f) + + +def test_version_sco_with_modified(): + """ + Ensure new_version() doesn't get tripped up over unversionable objects with + properties not used for versioning, but whose names conflict with + versioning properties. + """ + + file_sco = { + "type": "file", + "name": "data.txt", + "created": "1973-11-23T02:31:37Z", + "modified": "1991-05-13T19:24:57Z", + } + + with pytest.raises(ValueError): + stix2.versioning.new_version(file_sco, name="newname.txt") + + with pytest.raises(ValueError): + stix2.versioning.revoke(file_sco) + + file_sco_obj = stix2.v20.File( + name="data.txt", + created="1973-11-23T02:31:37Z", + modified="1991-05-13T19:24:57Z", + ) + + with pytest.raises(ValueError): + stix2.versioning.new_version(file_sco_obj, name="newname.txt") + + with pytest.raises(ValueError): + stix2.versioning.revoke(file_sco_obj) + + +def test_version_sco_with_custom(): + """ + If we add custom properties named like versioning properties to an object + type which is otherwise unversionable, versioning should start working. + """ + + file_sco_obj = stix2.v20.File( + name="data.txt", + created="1973-11-23T02:31:37Z", + modified="1991-05-13T19:24:57Z", + revoked=False, # the custom property + allow_custom=True, + ) + + new_file_sco_obj = stix2.versioning.new_version( + file_sco_obj, name="newname.txt", + ) + + assert new_file_sco_obj.name == "newname.txt" + + revoked_obj = stix2.versioning.revoke(new_file_sco_obj) + assert revoked_obj.revoked + + +def test_version_disable_custom(): + m = stix2.v20.Malware( + name="foo", labels=["label"], description="Steals your identity!", + x_custom=123, allow_custom=True, + ) + + # Remove the custom property, and disallow custom properties in the + # resulting object. + m2 = stix2.versioning.new_version(m, x_custom=None, allow_custom=False) + assert "x_custom" not in m2 + + # Remove a regular property and leave the custom one, disallow custom + # properties, and make sure we get an error. + with pytest.raises(stix2.exceptions.ExtraPropertiesError): + stix2.versioning.new_version(m, description=None, allow_custom=False) + + +def test_version_enable_custom(): + m = stix2.v20.Malware( + name="foo", labels=["label"], description="Steals your identity!", + ) + + # Add a custom property to an object for which it was previously disallowed + m2 = stix2.versioning.new_version(m, x_custom=123, allow_custom=True) + assert "x_custom" in m2 + + # Add a custom property without enabling it, make sure we get an error + with pytest.raises(stix2.exceptions.ExtraPropertiesError): + stix2.versioning.new_version(m, x_custom=123, allow_custom=False) + + +def test_version_propagate_custom(): + m = stix2.v20.Malware( + name="foo", labels=["label"], + ) + + # Remember custom-not-allowed setting from original; produce error + with pytest.raises(stix2.exceptions.ExtraPropertiesError): + stix2.versioning.new_version(m, x_custom=123) + + m2 = stix2.versioning.new_version(m, description="Steals your identity!") + assert "description" in m2 + assert m2.description == "Steals your identity!" + + m_custom = stix2.v20.Malware( + name="foo", labels=["label"], x_custom=123, allow_custom=True, + ) + + # Remember custom-allowed setting from original; should work + m2_custom = stix2.versioning.new_version(m_custom, x_other_custom="abc") + assert "x_other_custom" in m2_custom + assert m2_custom.x_other_custom == "abc" diff --git a/stix2/test/v21/test_deterministic_ids.py b/stix2/test/v21/test_deterministic_ids.py new file mode 100644 index 0000000..1e6e2d4 --- /dev/null +++ b/stix2/test/v21/test_deterministic_ids.py @@ -0,0 +1,339 @@ +from collections import OrderedDict +import datetime +import uuid + +import pytest +import six + +import stix2.base +import stix2.canonicalization.Canonicalize +import stix2.exceptions +from stix2.properties import ( + BooleanProperty, DictionaryProperty, EmbeddedObjectProperty, + ExtensionsProperty, FloatProperty, HashesProperty, IDProperty, + IntegerProperty, ListProperty, StringProperty, TimestampProperty, + TypeProperty, +) +import stix2.v21 + +SCO_DET_ID_NAMESPACE = uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7") + + +def _uuid_from_id(id_): + dd_idx = id_.index("--") + uuid_str = id_[dd_idx+2:] + uuid_ = uuid.UUID(uuid_str) + + return uuid_ + + +def _make_uuid5(name): + """ + Make a STIX 2.1+ compliant UUIDv5 from a "name". + """ + if six.PY3: + uuid_ = uuid.uuid5(SCO_DET_ID_NAMESPACE, name) + else: + uuid_ = uuid.uuid5( + SCO_DET_ID_NAMESPACE, name.encode("utf-8"), + ) + + return uuid_ + + +def test_no_contrib_props_defined(): + + class SomeSCO(stix2.v21._Observable): + _type = "some-sco" + _properties = OrderedDict(( + ('type', TypeProperty(_type, spec_version='2.1')), + ('id', IDProperty(_type, spec_version='2.1')), + ( + 'extensions', ExtensionsProperty( + spec_version='2.1', enclosing_type=_type, + ), + ), + )) + _id_contributing_properties = [] + + sco = SomeSCO() + uuid_ = _uuid_from_id(sco["id"]) + + assert uuid_.variant == uuid.RFC_4122 + assert uuid_.version == 4 + + +def test_json_compatible_prop_values(): + class SomeSCO(stix2.v21._Observable): + _type = "some-sco" + _properties = OrderedDict(( + ('type', TypeProperty(_type, spec_version='2.1')), + ('id', IDProperty(_type, spec_version='2.1')), + ( + 'extensions', ExtensionsProperty( + spec_version='2.1', enclosing_type=_type, + ), + ), + ('string', StringProperty()), + ('int', IntegerProperty()), + ('float', FloatProperty()), + ('bool', BooleanProperty()), + ('list', ListProperty(IntegerProperty())), + ('dict', DictionaryProperty(spec_version="2.1")), + )) + _id_contributing_properties = [ + 'string', 'int', 'float', 'bool', 'list', 'dict', + ] + + obj = { + "string": "abc", + "int": 1, + "float": 1.5, + "bool": True, + "list": [1, 2, 3], + "dict": {"a": 1, "b": [2], "c": "three"}, + } + + sco = SomeSCO(**obj) + + can_json = stix2.canonicalization.Canonicalize.canonicalize(obj, utf8=False) + expected_uuid5 = _make_uuid5(can_json) + actual_uuid5 = _uuid_from_id(sco["id"]) + + assert actual_uuid5 == expected_uuid5 + + +def test_json_incompatible_timestamp_value(): + class SomeSCO(stix2.v21._Observable): + _type = "some-sco" + _properties = OrderedDict(( + ('type', TypeProperty(_type, spec_version='2.1')), + ('id', IDProperty(_type, spec_version='2.1')), + ( + 'extensions', ExtensionsProperty( + spec_version='2.1', enclosing_type=_type, + ), + ), + ('timestamp', TimestampProperty()), + )) + _id_contributing_properties = ['timestamp'] + + ts = datetime.datetime(1987, 1, 2, 3, 4, 5, 678900) + + sco = SomeSCO(timestamp=ts) + + obj = { + "timestamp": "1987-01-02T03:04:05.6789Z", + } + + can_json = stix2.canonicalization.Canonicalize.canonicalize(obj, utf8=False) + expected_uuid5 = _make_uuid5(can_json) + actual_uuid5 = _uuid_from_id(sco["id"]) + + assert actual_uuid5 == expected_uuid5 + + +def test_embedded_object(): + class SubObj(stix2.base._STIXBase): + _type = "sub-object" + _properties = OrderedDict(( + ('value', StringProperty()), + )) + + class SomeSCO(stix2.v21._Observable): + _type = "some-sco" + _properties = OrderedDict(( + ('type', TypeProperty(_type, spec_version='2.1')), + ('id', IDProperty(_type, spec_version='2.1')), + ( + 'extensions', ExtensionsProperty( + spec_version='2.1', enclosing_type=_type, + ), + ), + ('sub_obj', EmbeddedObjectProperty(type=SubObj)), + )) + _id_contributing_properties = ['sub_obj'] + + sub_obj = SubObj(value="foo") + sco = SomeSCO(sub_obj=sub_obj) + + obj = { + "sub_obj": { + "value": "foo", + }, + } + + can_json = stix2.canonicalization.Canonicalize.canonicalize(obj, utf8=False) + expected_uuid5 = _make_uuid5(can_json) + actual_uuid5 = _uuid_from_id(sco["id"]) + + assert actual_uuid5 == expected_uuid5 + + +def test_empty_hash(): + class SomeSCO(stix2.v21._Observable): + _type = "some-sco" + _properties = OrderedDict(( + ('type', TypeProperty(_type, spec_version='2.1')), + ('id', IDProperty(_type, spec_version='2.1')), + ( + 'extensions', ExtensionsProperty( + spec_version='2.1', enclosing_type=_type, + ), + ), + ('hashes', HashesProperty()), + )) + _id_contributing_properties = ['hashes'] + + with pytest.raises(stix2.exceptions.InvalidValueError): + SomeSCO(hashes={}) + + +@pytest.mark.parametrize( + "json_escaped, expected_unescaped", [ + ("", ""), + ("a", "a"), + (r"\n", "\n"), + (r"\n\r\b\t\\\/\"", "\n\r\b\t\\/\""), + (r"\\n", r"\n"), + (r"\\\n", "\\\n"), + ], +) +def test_json_unescaping(json_escaped, expected_unescaped): + actual_unescaped = stix2.base._un_json_escape(json_escaped) + assert actual_unescaped == expected_unescaped + + +def test_json_unescaping_bad_escape(): + with pytest.raises(ValueError): + stix2.base._un_json_escape(r"\x") + + +def test_deterministic_id_same_extra_prop_vals(): + email_addr_1 = stix2.v21.EmailAddress( + value="john@example.com", + display_name="Johnny Doe", + ) + + email_addr_2 = stix2.v21.EmailAddress( + value="john@example.com", + display_name="Johnny Doe", + ) + + assert email_addr_1.id == email_addr_2.id + + uuid_obj_1 = uuid.UUID(email_addr_1.id[-36:]) + assert uuid_obj_1.variant == uuid.RFC_4122 + assert uuid_obj_1.version == 5 + + uuid_obj_2 = uuid.UUID(email_addr_2.id[-36:]) + assert uuid_obj_2.variant == uuid.RFC_4122 + assert uuid_obj_2.version == 5 + + +def test_deterministic_id_diff_extra_prop_vals(): + email_addr_1 = stix2.v21.EmailAddress( + value="john@example.com", + display_name="Johnny Doe", + ) + + email_addr_2 = stix2.v21.EmailAddress( + value="john@example.com", + display_name="Janey Doe", + ) + + assert email_addr_1.id == email_addr_2.id + + uuid_obj_1 = uuid.UUID(email_addr_1.id[-36:]) + assert uuid_obj_1.variant == uuid.RFC_4122 + assert uuid_obj_1.version == 5 + + uuid_obj_2 = uuid.UUID(email_addr_2.id[-36:]) + assert uuid_obj_2.variant == uuid.RFC_4122 + assert uuid_obj_2.version == 5 + + +def test_deterministic_id_diff_contributing_prop_vals(): + email_addr_1 = stix2.v21.EmailAddress( + value="john@example.com", + display_name="Johnny Doe", + ) + + email_addr_2 = stix2.v21.EmailAddress( + value="jane@example.com", + display_name="Janey Doe", + ) + + assert email_addr_1.id != email_addr_2.id + + uuid_obj_1 = uuid.UUID(email_addr_1.id[-36:]) + assert uuid_obj_1.variant == uuid.RFC_4122 + assert uuid_obj_1.version == 5 + + uuid_obj_2 = uuid.UUID(email_addr_2.id[-36:]) + assert uuid_obj_2.variant == uuid.RFC_4122 + assert uuid_obj_2.version == 5 + + +def test_deterministic_id_no_contributing_props(): + email_msg_1 = stix2.v21.EmailMessage( + is_multipart=False, + ) + + email_msg_2 = stix2.v21.EmailMessage( + is_multipart=False, + ) + + assert email_msg_1.id != email_msg_2.id + + uuid_obj_1 = uuid.UUID(email_msg_1.id[-36:]) + assert uuid_obj_1.variant == uuid.RFC_4122 + assert uuid_obj_1.version == 4 + + uuid_obj_2 = uuid.UUID(email_msg_2.id[-36:]) + assert uuid_obj_2.variant == uuid.RFC_4122 + assert uuid_obj_2.version == 4 + + +def test_id_gen_recursive_dict_conversion_1(): + file_observable = stix2.v21.File( + name="example.exe", + size=68 * 1000, + magic_number_hex="50000000", + hashes={ + "SHA-256": "841a8921140aba50671ebb0770fecc4ee308c4952cfeff8de154ab14eeef4649", + }, + extensions={ + "windows-pebinary-ext": stix2.v21.WindowsPEBinaryExt( + pe_type="exe", + machine_hex="014c", + sections=[ + stix2.v21.WindowsPESection( + name=".data", + size=4096, + entropy=7.980693, + hashes={"SHA-256": "6e3b6f3978e5cd96ba7abee35c24e867b7e64072e2ecb22d0ee7a6e6af6894d0"}, + ), + ], + ), + }, + ) + + assert file_observable.id == "file--ced31cd4-bdcb-537d-aefa-92d291bfc11d" + + +def test_id_gen_recursive_dict_conversion_2(): + wrko = stix2.v21.WindowsRegistryKey( + values=[ + stix2.v21.WindowsRegistryValueType( + name="Foo", + data="qwerty", + ), + stix2.v21.WindowsRegistryValueType( + name="Bar", + data="42", + ), + ], + ) + + assert wrko.id == "windows-registry-key--36594eba-bcc7-5014-9835-0e154264e588" diff --git a/stix2/test/v21/test_granular_markings.py b/stix2/test/v21/test_granular_markings.py index 1c3194b..ff8fe26 100644 --- a/stix2/test/v21/test_granular_markings.py +++ b/stix2/test/v21/test_granular_markings.py @@ -1307,3 +1307,17 @@ def test_clear_marking_not_present(data): """Test clearing markings for a selector that has no associated markings.""" with pytest.raises(MarkingNotFoundError): markings.clear_markings(data, ["malware_types"]) + + +def test_set_marking_on_id_property(): + malware = Malware( + granular_markings=[ + { + "selectors": ["id"], + "marking_ref": MARKING_IDS[0], + }, + ], + **MALWARE_KWARGS + ) + + assert "id" in malware["granular_markings"][0]["selectors"] diff --git a/stix2/test/v21/test_object_markings.py b/stix2/test/v21/test_object_markings.py index a21fbf6..bb1c4ab 100644 --- a/stix2/test/v21/test_object_markings.py +++ b/stix2/test/v21/test_object_markings.py @@ -13,6 +13,7 @@ from .constants import MARKING_IDS MALWARE_KWARGS = MALWARE_KWARGS_CONST.copy() MALWARE_KWARGS.update({ 'id': MALWARE_ID, + 'type': 'malware', 'created': FAKE_TIME, 'modified': FAKE_TIME, }) diff --git a/stix2/test/v21/test_observed_data.py b/stix2/test/v21/test_observed_data.py index c13148a..e0fa456 100644 --- a/stix2/test/v21/test_observed_data.py +++ b/stix2/test/v21/test_observed_data.py @@ -1,6 +1,5 @@ import datetime as dt import re -import uuid import pytest import pytz @@ -900,6 +899,27 @@ def test_file_example_with_RasterImageExt_Object(): assert f.extensions["raster-image-ext"].exif_tags["XResolution"] == 4928 +def test_file_with_archive_ext_object(): + ad = stix2.v21.Directory(path="archived/path") + f_obj = stix2.v21.File( + name="foo", extensions={ + "archive-ext": { + "contains_refs": [ad, ], + }, + }, + ) + f_ref = stix2.v21.File( + name="foo", extensions={ + "archive-ext": { + "contains_refs": [ad.id, ], + }, + }, + ) + + assert f_obj["id"] == f_ref["id"] + assert f_obj["extensions"]["archive-ext"]["contains_refs"][0] == ad["id"] + + RASTER_IMAGE_EXT = """{ "type": "observed-data", "spec_version": "2.1", @@ -1469,133 +1489,3 @@ def test_objects_deprecation(): }, }, ) - - -def test_deterministic_id_same_extra_prop_vals(): - email_addr_1 = stix2.v21.EmailAddress( - value="john@example.com", - display_name="Johnny Doe", - ) - - email_addr_2 = stix2.v21.EmailAddress( - value="john@example.com", - display_name="Johnny Doe", - ) - - assert email_addr_1.id == email_addr_2.id - - uuid_obj_1 = uuid.UUID(email_addr_1.id[-36:]) - assert uuid_obj_1.variant == uuid.RFC_4122 - assert uuid_obj_1.version == 5 - - uuid_obj_2 = uuid.UUID(email_addr_2.id[-36:]) - assert uuid_obj_2.variant == uuid.RFC_4122 - assert uuid_obj_2.version == 5 - - -def test_deterministic_id_diff_extra_prop_vals(): - email_addr_1 = stix2.v21.EmailAddress( - value="john@example.com", - display_name="Johnny Doe", - ) - - email_addr_2 = stix2.v21.EmailAddress( - value="john@example.com", - display_name="Janey Doe", - ) - - assert email_addr_1.id == email_addr_2.id - - uuid_obj_1 = uuid.UUID(email_addr_1.id[-36:]) - assert uuid_obj_1.variant == uuid.RFC_4122 - assert uuid_obj_1.version == 5 - - uuid_obj_2 = uuid.UUID(email_addr_2.id[-36:]) - assert uuid_obj_2.variant == uuid.RFC_4122 - assert uuid_obj_2.version == 5 - - -def test_deterministic_id_diff_contributing_prop_vals(): - email_addr_1 = stix2.v21.EmailAddress( - value="john@example.com", - display_name="Johnny Doe", - ) - - email_addr_2 = stix2.v21.EmailAddress( - value="jane@example.com", - display_name="Janey Doe", - ) - - assert email_addr_1.id != email_addr_2.id - - uuid_obj_1 = uuid.UUID(email_addr_1.id[-36:]) - assert uuid_obj_1.variant == uuid.RFC_4122 - assert uuid_obj_1.version == 5 - - uuid_obj_2 = uuid.UUID(email_addr_2.id[-36:]) - assert uuid_obj_2.variant == uuid.RFC_4122 - assert uuid_obj_2.version == 5 - - -def test_deterministic_id_no_contributing_props(): - email_msg_1 = stix2.v21.EmailMessage( - is_multipart=False, - ) - - email_msg_2 = stix2.v21.EmailMessage( - is_multipart=False, - ) - - assert email_msg_1.id != email_msg_2.id - - uuid_obj_1 = uuid.UUID(email_msg_1.id[-36:]) - assert uuid_obj_1.variant == uuid.RFC_4122 - assert uuid_obj_1.version == 4 - - uuid_obj_2 = uuid.UUID(email_msg_2.id[-36:]) - assert uuid_obj_2.variant == uuid.RFC_4122 - assert uuid_obj_2.version == 4 - - -def test_id_gen_recursive_dict_conversion_1(): - file_observable = stix2.v21.File( - name="example.exe", - size=68 * 1000, - magic_number_hex="50000000", - hashes={ - "SHA-256": "841a8921140aba50671ebb0770fecc4ee308c4952cfeff8de154ab14eeef4649", - }, - extensions={ - "windows-pebinary-ext": stix2.v21.WindowsPEBinaryExt( - pe_type="exe", - machine_hex="014c", - sections=[ - stix2.v21.WindowsPESection( - name=".data", - size=4096, - entropy=7.980693, - hashes={"SHA-256": "6e3b6f3978e5cd96ba7abee35c24e867b7e64072e2ecb22d0ee7a6e6af6894d0"}, - ), - ], - ), - }, - ) - - assert file_observable.id == "file--ced31cd4-bdcb-537d-aefa-92d291bfc11d" - - -def test_id_gen_recursive_dict_conversion_2(): - wrko = stix2.v21.WindowsRegistryKey( - values=[ - stix2.v21.WindowsRegistryValueType( - name="Foo", - data="qwerty", - ), - stix2.v21.WindowsRegistryValueType( - name="Bar", - data="42", - ), - ], - ) - - assert wrko.id == "windows-registry-key--36594eba-bcc7-5014-9835-0e154264e588" diff --git a/stix2/test/v21/test_versioning.py b/stix2/test/v21/test_versioning.py index bee0c07..adfa7a0 100644 --- a/stix2/test/v21/test_versioning.py +++ b/stix2/test/v21/test_versioning.py @@ -3,7 +3,10 @@ import datetime import pytest import stix2 +import stix2.exceptions import stix2.utils +import stix2.v21 +import stix2.versioning from .constants import CAMPAIGN_MORE_KWARGS @@ -151,7 +154,7 @@ def test_versioning_error_revoke_of_revoked(): def test_making_new_version_dict(): campaign_v1 = CAMPAIGN_MORE_KWARGS - campaign_v2 = stix2.utils.new_version(CAMPAIGN_MORE_KWARGS, name="fred") + campaign_v2 = stix2.versioning.new_version(CAMPAIGN_MORE_KWARGS, name="fred") assert campaign_v1['id'] == campaign_v2['id'] assert campaign_v1['spec_version'] == campaign_v2['spec_version'] @@ -165,7 +168,7 @@ def test_making_new_version_dict(): def test_versioning_error_dict_bad_modified_value(): with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo: - stix2.utils.new_version(CAMPAIGN_MORE_KWARGS, modified="2015-04-06T20:03:00.000Z") + stix2.versioning.new_version(CAMPAIGN_MORE_KWARGS, modified="2015-04-06T20:03:00.000Z") assert excinfo.value.cls == dict assert excinfo.value.prop_name == "modified" @@ -181,7 +184,7 @@ def test_versioning_error_dict_no_modified_value(): 'created': "2016-04-06T20:03:00.000Z", 'name': "Green Group Attacks Against Finance", } - campaign_v2 = stix2.utils.new_version(campaign_v1, modified="2017-04-06T20:03:00.000Z") + campaign_v2 = stix2.versioning.new_version(campaign_v1, modified="2017-04-06T20:03:00.000Z") assert str(campaign_v2['modified']) == "2017-04-06T20:03:00.000Z" @@ -189,14 +192,14 @@ def test_versioning_error_dict_no_modified_value(): def test_making_new_version_invalid_cls(): campaign_v1 = "This is a campaign." with pytest.raises(ValueError) as excinfo: - stix2.utils.new_version(campaign_v1, name="fred") + stix2.versioning.new_version(campaign_v1, name="fred") assert 'cannot create new version of object of this type' in str(excinfo.value) def test_revoke_dict(): campaign_v1 = CAMPAIGN_MORE_KWARGS - campaign_v2 = stix2.utils.revoke(campaign_v1) + campaign_v2 = stix2.versioning.revoke(campaign_v1) assert campaign_v1['id'] == campaign_v2['id'] assert campaign_v1['spec_version'] == campaign_v2['spec_version'] @@ -209,12 +212,18 @@ def test_revoke_dict(): assert campaign_v2['revoked'] +def test_revoke_unversionable(): + sco = stix2.v21.File(name="data.txt") + with pytest.raises(ValueError): + sco.revoke() + + def test_versioning_error_revoke_of_revoked_dict(): campaign_v1 = CAMPAIGN_MORE_KWARGS - campaign_v2 = stix2.utils.revoke(campaign_v1) + campaign_v2 = stix2.versioning.revoke(campaign_v1) with pytest.raises(stix2.exceptions.RevokeError) as excinfo: - stix2.utils.revoke(campaign_v2) + stix2.versioning.revoke(campaign_v2) assert excinfo.value.called_by == "revoke" @@ -222,7 +231,7 @@ def test_versioning_error_revoke_of_revoked_dict(): def test_revoke_invalid_cls(): campaign_v1 = "This is a campaign." with pytest.raises(ValueError) as excinfo: - stix2.utils.revoke(campaign_v1) + stix2.versioning.revoke(campaign_v1) assert 'cannot revoke object of this type' in str(excinfo.value) @@ -236,7 +245,7 @@ def test_remove_custom_stix_property(): is_family=False, ) - mal_nc = stix2.utils.remove_custom_stix(mal) + mal_nc = stix2.versioning.remove_custom_stix(mal) assert "x_custom" not in mal_nc assert mal["modified"] < mal_nc["modified"] @@ -254,14 +263,14 @@ def test_remove_custom_stix_object(): animal = Animal(species="lion", animal_class="mammal") - nc = stix2.utils.remove_custom_stix(animal) + nc = stix2.versioning.remove_custom_stix(animal) assert nc is None def test_remove_custom_stix_no_custom(): campaign_v1 = stix2.v21.Campaign(**CAMPAIGN_MORE_KWARGS) - campaign_v2 = stix2.utils.remove_custom_stix(campaign_v1) + campaign_v2 = stix2.versioning.remove_custom_stix(campaign_v1) assert len(campaign_v1.keys()) == len(campaign_v2.keys()) assert campaign_v1.id == campaign_v2.id @@ -294,5 +303,96 @@ def test_fudge_modified(old, candidate_new, expected_new, use_stix21): expected_new, "%Y-%m-%dT%H:%M:%S.%fZ", ) - fudged = stix2.utils._fudge_modified(old_dt, candidate_new_dt, use_stix21) + fudged = stix2.versioning._fudge_modified( + old_dt, candidate_new_dt, use_stix21, + ) assert fudged == expected_new_dt + + +def test_version_unversionable_dict(): + f = { + "type": "file", + "id": "file--4efb5217-e987-4438-9a1b-c800099401df", + "name": "data.txt", + } + + with pytest.raises(ValueError): + stix2.versioning.new_version(f) + + +def test_version_sco_with_custom(): + """ + If we add custom properties named like versioning properties to an object + type which is otherwise unversionable, versioning should start working. + """ + + file_sco_obj = stix2.v21.File( + name="data.txt", + created="1973-11-23T02:31:37Z", + modified="1991-05-13T19:24:57Z", + revoked=False, + allow_custom=True, + ) + + new_file_sco_obj = stix2.versioning.new_version( + file_sco_obj, size=1234, + ) + + assert new_file_sco_obj.size == 1234 + + revoked_obj = stix2.versioning.revoke(new_file_sco_obj) + assert revoked_obj.revoked + + +def test_version_disable_custom(): + m = stix2.v21.Malware( + name="foo", description="Steals your identity!", is_family=False, + x_custom=123, allow_custom=True, + ) + + # Remove the custom property, and disallow custom properties in the + # resulting object. + m2 = stix2.versioning.new_version(m, x_custom=None, allow_custom=False) + assert "x_custom" not in m2 + + # Remove a regular property and leave the custom one, disallow custom + # properties, and make sure we get an error. + with pytest.raises(stix2.exceptions.ExtraPropertiesError): + stix2.versioning.new_version(m, description=None, allow_custom=False) + + +def test_version_enable_custom(): + m = stix2.v21.Malware( + name="foo", description="Steals your identity!", is_family=False, + ) + + # Add a custom property to an object for which it was previously disallowed + m2 = stix2.versioning.new_version(m, x_custom=123, allow_custom=True) + assert "x_custom" in m2 + + # Add a custom property without enabling it, make sure we get an error + with pytest.raises(stix2.exceptions.ExtraPropertiesError): + stix2.versioning.new_version(m, x_custom=123, allow_custom=False) + + +def test_version_propagate_custom(): + m = stix2.v21.Malware( + name="foo", is_family=False, + ) + + # Remember custom-not-allowed setting from original; produce error + with pytest.raises(stix2.exceptions.ExtraPropertiesError): + stix2.versioning.new_version(m, x_custom=123) + + m2 = stix2.versioning.new_version(m, description="Steals your identity!") + assert "description" in m2 + assert m2.description == "Steals your identity!" + + m_custom = stix2.v21.Malware( + name="foo", is_family=False, x_custom=123, allow_custom=True, + ) + + # Remember custom-allowed setting from original; should work + m2_custom = stix2.versioning.new_version(m_custom, x_other_custom="abc") + assert "x_other_custom" in m2_custom + assert m2_custom.x_other_custom == "abc" diff --git a/stix2/utils.py b/stix2/utils.py index 766fd4b..7a8d8cb 100644 --- a/stix2/utils.py +++ b/stix2/utils.py @@ -1,10 +1,5 @@ """Utility functions and classes for the STIX2 library.""" -try: - from collections.abc import Mapping -except ImportError: - from collections import Mapping -import copy import datetime as dt import enum import json @@ -15,20 +10,11 @@ import six import stix2 -from .exceptions import ( - InvalidValueError, RevokeError, UnmodifiablePropertyError, -) - # Sentinel value for properties that should be set to the current time. # We can't use the standard 'default' approach, since if there are multiple # timestamps in a single object, the timestamps will vary by a few microseconds. NOW = object() -# STIX object properties that cannot be modified -STIX_UNMOD_PROPERTIES = ['created', 'created_by_ref', 'id', 'type'] - -TYPE_REGEX = re.compile(r'^\-?[a-z0-9]+(-[a-z0-9]+)*\-?$') -TYPE_21_REGEX = re.compile(r'^([a-z][a-z0-9]*)+(-[a-z0-9]+)*\-?$') PREFIX_21_REGEX = re.compile(r'^[a-z].*') _TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%SZ" @@ -389,121 +375,6 @@ def find_property_index(obj, search_key, search_value): return idx -def _fudge_modified(old_modified, new_modified, use_stix21): - """ - Ensures a new modified timestamp is newer than the old. When they are - too close together, new_modified must be pushed further ahead to ensure - it is distinct and later, after JSON serialization (which may mean it's - actually being pushed a little ways into the future). JSON serialization - can remove precision, which can cause distinct timestamps to accidentally - become equal, if we're not careful. - - :param old_modified: A previous "modified" timestamp, as a datetime object - :param new_modified: A candidate new "modified" timestamp, as a datetime - object - :param use_stix21: Whether to use STIX 2.1+ versioning timestamp precision - rules (boolean). This is important so that we are aware of how - timestamp precision will be truncated, so we know how close together - the timestamps can be, and how far ahead to potentially push the new - one. - :return: A suitable new "modified" timestamp. This may be different from - what was passed in, if it had to be pushed ahead. - """ - if use_stix21: - # 2.1+: we can use full precision - if new_modified <= old_modified: - new_modified = old_modified + dt.timedelta(microseconds=1) - else: - # 2.0: we must use millisecond precision - one_ms = dt.timedelta(milliseconds=1) - if new_modified - old_modified < one_ms: - new_modified = old_modified + one_ms - - return new_modified - - -def new_version(data, **kwargs): - """Create a new version of a STIX object, by modifying properties and - updating the ``modified`` property. - """ - - if not isinstance(data, Mapping): - raise ValueError( - "cannot create new version of object of this type! " - "Try a dictionary or instance of an SDO or SRO class.", - ) - - unchangable_properties = [] - if data.get('revoked'): - raise RevokeError("new_version") - try: - new_obj_inner = copy.deepcopy(data._inner) - except AttributeError: - new_obj_inner = copy.deepcopy(data) - properties_to_change = kwargs.keys() - - # Make sure certain properties aren't trying to change - for prop in STIX_UNMOD_PROPERTIES: - if prop in properties_to_change: - unchangable_properties.append(prop) - if unchangable_properties: - raise UnmodifiablePropertyError(unchangable_properties) - - # Different versioning precision rules in STIX 2.0 vs 2.1, so we need - # to know which rules to apply. - is_21 = "spec_version" in data - precision_constraint = "min" if is_21 else "exact" - - cls = type(data) - if 'modified' not in kwargs: - old_modified = parse_into_datetime( - data["modified"], precision="millisecond", - precision_constraint=precision_constraint, - ) - - new_modified = get_timestamp() - new_modified = _fudge_modified(old_modified, new_modified, is_21) - - kwargs['modified'] = new_modified - - elif 'modified' in data: - old_modified_property = parse_into_datetime( - data.get('modified'), precision='millisecond', - precision_constraint=precision_constraint, - ) - new_modified_property = parse_into_datetime( - kwargs['modified'], precision='millisecond', - precision_constraint=precision_constraint, - ) - if new_modified_property <= old_modified_property: - raise InvalidValueError( - cls, 'modified', - "The new modified datetime cannot be before than or equal to the current modified datetime." - "It cannot be equal, as according to STIX 2 specification, objects that are different " - "but have the same id and modified timestamp do not have defined consumer behavior.", - ) - new_obj_inner.update(kwargs) - # Exclude properties with a value of 'None' in case data is not an instance of a _STIXBase subclass - return cls(**{k: v for k, v in new_obj_inner.items() if v is not None}) - - -def revoke(data): - """Revoke a STIX object. - - Returns: - A new version of the object with ``revoked`` set to ``True``. - """ - if not isinstance(data, Mapping): - raise ValueError( - "cannot revoke object of this type! Try a dictionary " - "or instance of an SDO or SRO class.", - ) - - if data.get('revoked'): - raise RevokeError("revoke") - return new_version(data, revoked=True, allow_custom=True) - - def get_class_hierarchy_names(obj): """Given an object, return the names of the class hierarchy.""" names = [] @@ -512,64 +383,6 @@ def get_class_hierarchy_names(obj): return names -def remove_custom_stix(stix_obj): - """Remove any custom STIX objects or properties. - - Warnings: - This function is a best effort utility, in that it will remove custom - objects and properties based on the type names; i.e. if "x-" prefixes - object types, and "x\\_" prefixes property types. According to the - STIX2 spec, those naming conventions are a SHOULDs not MUSTs, meaning - that valid custom STIX content may ignore those conventions and in - effect render this utility function invalid when used on that STIX - content. - - Args: - stix_obj (dict OR python-stix obj): a single python-stix object - or dict of a STIX object - - Returns: - A new version of the object with any custom content removed - """ - - if stix_obj['type'].startswith('x-'): - # if entire object is custom, discard - return None - - custom_props = [] - for prop in stix_obj.items(): - if prop[0].startswith('x_'): - # for every custom property, record it and set value to None - # (so we can pass it to new_version() and it will be dropped) - custom_props.append((prop[0], None)) - - if custom_props: - # obtain set of object properties that can be transferred - # to a new object version. This is 1)custom props with their - # values set to None, and 2)any properties left that are not - # unmodifiable STIX properties or the "modified" property - - # set of properties that are not supplied to new_version() - # to be used for updating properties. This includes unmodifiable - # properties (properties that new_version() just re-uses from the - # existing STIX object) and the "modified" property. We dont supply the - # "modified" property so that new_version() creates a new datetime - # value for this property - non_supplied_props = STIX_UNMOD_PROPERTIES + ['modified'] - - props = [(prop, stix_obj[prop]) for prop in stix_obj if prop not in non_supplied_props] - - # add to set the custom properties we want to get rid of (with their value=None) - props.extend(custom_props) - - new_obj = new_version(stix_obj, **(dict(props))) - - return new_obj - - else: - return stix_obj - - def get_type_from_id(stix_id): return stix_id.split('--', 1)[0] diff --git a/stix2/v20/__init__.py b/stix2/v20/__init__.py index b77d46c..36d09be 100644 --- a/stix2/v20/__init__.py +++ b/stix2/v20/__init__.py @@ -103,3 +103,33 @@ EXT_MAP = { 'unix-account-ext': UNIXAccountExt, }, } + + +# Ensure star-imports from this module get the right symbols. "base" is a +# known problem, since there are multiple modules with that name and one can +# accidentally overwrite another. +__all__ = """ + Bundle, + + TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, CustomMarking, ExternalReference, + GranularMarking, KillChainPhase, MarkingDefinition, StatementMarking, + TLPMarking, + + URL, AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, + CustomExtension, CustomObservable, Directory, DomainName, EmailAddress, + EmailMessage, EmailMIMEComponent, File, HTTPRequestExt, ICMPExt, + IPv4Address, IPv6Address, MACAddress, Mutex, NetworkTraffic, NTFSExt, + PDFExt, Process, RasterImageExt, SocketExt, Software, TCPExt, + UNIXAccountExt, UserAccount, WindowsPEBinaryExt, + WindowsPEOptionalHeaderType, WindowsPESection, WindowsProcessExt, + WindowsRegistryKey, WindowsRegistryValueType, WindowsServiceExt, + X509Certificate, X509V3ExtenstionsType, + + AttackPattern, Campaign, CourseOfAction, CustomObject, Identity, Indicator, + IntrusionSet, Malware, ObservedData, Report, ThreatActor, Tool, + Vulnerability, + + Relationship, Sighting, + + OBJ_MAP, OBJ_MAP_OBSERVABLE, EXT_MAP +""".replace(",", " ").split() diff --git a/stix2/v21/__init__.py b/stix2/v21/__init__.py index eea61dd..77a28ee 100644 --- a/stix2/v21/__init__.py +++ b/stix2/v21/__init__.py @@ -111,3 +111,34 @@ EXT_MAP = { 'unix-account-ext': UNIXAccountExt, }, } + + +# Ensure star-imports from this module get the right symbols. "base" is a +# known problem, since there are multiple modules with that name and one can +# accidentally overwrite another. +__all__ = """ + Bundle, + + TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, CustomMarking, ExternalReference, + GranularMarking, KillChainPhase, LanguageContent, MarkingDefinition, + StatementMarking, TLPMarking, + + URL, AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, + CustomExtension, CustomObservable, Directory, DomainName, EmailAddress, + EmailMessage, EmailMIMEComponent, File, HTTPRequestExt, ICMPExt, + IPv4Address, IPv6Address, MACAddress, Mutex, NetworkTraffic, NTFSExt, + PDFExt, Process, RasterImageExt, SocketExt, Software, TCPExt, + UNIXAccountExt, UserAccount, WindowsPEBinaryExt, + WindowsPEOptionalHeaderType, WindowsPESection, WindowsProcessExt, + WindowsRegistryKey, WindowsRegistryValueType, WindowsServiceExt, + X509Certificate, X509V3ExtenstionsType, + + AttackPattern, Campaign, CourseOfAction, CustomObject, Grouping, Identity, + Indicator, Infrastructure, IntrusionSet, Location, Malware, + MalwareAnalysis, Note, ObservedData, Opinion, Report, ThreatActor, Tool, + Vulnerability, + + Relationship, Sighting, + + OBJ_MAP, OBJ_MAP_OBSERVABLE, EXT_MAP +""".replace(",", " ").split() diff --git a/stix2/v21/observables.py b/stix2/v21/observables.py index d73c1cf..a15d5bd 100644 --- a/stix2/v21/observables.py +++ b/stix2/v21/observables.py @@ -14,8 +14,7 @@ from ..properties import ( BinaryProperty, BooleanProperty, DictionaryProperty, EmbeddedObjectProperty, EnumProperty, ExtensionsProperty, FloatProperty, HashesProperty, HexProperty, IDProperty, IntegerProperty, ListProperty, - ObjectReferenceProperty, ReferenceProperty, StringProperty, - TimestampProperty, TypeProperty, + ReferenceProperty, StringProperty, TimestampProperty, TypeProperty, ) from .base import _Extension, _Observable, _STIXBase21 from .common import GranularMarking @@ -144,7 +143,7 @@ class EmailMIMEComponent(_STIXBase21): _properties = OrderedDict([ ('body', StringProperty()), - ('body_raw_ref', ObjectReferenceProperty(valid_types=['artifact', 'file'])), + ('body_raw_ref', ReferenceProperty(valid_types=['artifact', 'file'], spec_version="2.1")), ('content_type', StringProperty()), ('content_disposition', StringProperty()), ]) @@ -201,7 +200,7 @@ class ArchiveExt(_Extension): _type = 'archive-ext' _properties = OrderedDict([ - ('contains_refs', ListProperty(ObjectReferenceProperty(valid_types=['file', 'directory']), required=True)), + ('contains_refs', ListProperty(ReferenceProperty(valid_types=['file', 'directory'], spec_version="2.1"), required=True)), ('comment', StringProperty()), ]) @@ -465,7 +464,7 @@ class HTTPRequestExt(_Extension): ('request_version', StringProperty()), ('request_header', DictionaryProperty(spec_version='2.1')), ('message_body_length', IntegerProperty()), - ('message_body_data_ref', ObjectReferenceProperty(valid_types='artifact')), + ('message_body_data_ref', ReferenceProperty(valid_types='artifact', spec_version="2.1")), ]) @@ -654,7 +653,7 @@ class WindowsServiceExt(_Extension): "SERVICE_SYSTEM_ALERT", ]), ), - ('service_dll_refs', ListProperty(ObjectReferenceProperty(valid_types='file'))), + ('service_dll_refs', ListProperty(ReferenceProperty(valid_types='file', spec_version="2.1"))), ( 'service_type', EnumProperty(allowed=[ "SERVICE_KERNEL_DRIVER", diff --git a/stix2/versioning.py b/stix2/versioning.py new file mode 100644 index 0000000..8db8f7a --- /dev/null +++ b/stix2/versioning.py @@ -0,0 +1,275 @@ +import copy +import datetime as dt +import itertools +import uuid + +import six +from six.moves.collections_abc import Mapping + +import stix2.base +from stix2.utils import get_timestamp, parse_into_datetime +import stix2.v20 + +from .exceptions import ( + InvalidValueError, RevokeError, UnmodifiablePropertyError, +) + +# STIX object properties that cannot be modified +STIX_UNMOD_PROPERTIES = ['created', 'created_by_ref', 'id', 'type'] +_VERSIONING_PROPERTIES = {"created", "modified", "revoked"} + + +def _fudge_modified(old_modified, new_modified, use_stix21): + """ + Ensures a new modified timestamp is newer than the old. When they are + too close together, new_modified must be pushed further ahead to ensure + it is distinct and later, after JSON serialization (which may mean it's + actually being pushed a little ways into the future). JSON serialization + can remove precision, which can cause distinct timestamps to accidentally + become equal, if we're not careful. + + :param old_modified: A previous "modified" timestamp, as a datetime object + :param new_modified: A candidate new "modified" timestamp, as a datetime + object + :param use_stix21: Whether to use STIX 2.1+ versioning timestamp precision + rules (boolean). This is important so that we are aware of how + timestamp precision will be truncated, so we know how close together + the timestamps can be, and how far ahead to potentially push the new + one. + :return: A suitable new "modified" timestamp. This may be different from + what was passed in, if it had to be pushed ahead. + """ + if use_stix21: + # 2.1+: we can use full precision + if new_modified <= old_modified: + new_modified = old_modified + dt.timedelta(microseconds=1) + else: + # 2.0: we must use millisecond precision + one_ms = dt.timedelta(milliseconds=1) + if new_modified - old_modified < one_ms: + new_modified = old_modified + one_ms + + return new_modified + + +def _is_versionable(data): + """ + Determine whether the given object is versionable. This check is done on + the basis of support for three properties for the object type: "created", + "modified", and "revoked". If all three are supported, the object is + versionable; otherwise it is not. Dicts must have a "type" property whose + value is for a registered object type. This is used to determine a + complete set of supported properties for the type. + + Also, detect whether it represents a STIX 2.1 or greater spec version. + + :param data: The object to check. Must be either a stix object, or a dict + with a "type" property. + :return: A 2-tuple of bools: the first is True if the object is versionable + and False if not; the second is True if the object is STIX 2.1+ and + False if not. + """ + + is_versionable = False + is_21 = False + stix_vid = None + + if isinstance(data, Mapping): + + # First, determine spec version. It's easy for our stix2 objects; more + # work for dicts. + is_21 = False + if isinstance(data, stix2.base._STIXBase) and \ + not isinstance(data, stix2.v20._STIXBase20): + # (is_21 means 2.1 or later; try not to be 2.1-specific) + is_21 = True + elif isinstance(data, dict): + stix_vid = stix2.parsing._detect_spec_version(data) + is_21 = stix_vid != "v20" + + # Then, determine versionability. + + if six.PY2: + # dumb python2 compatibility: map.keys() returns a list, not a set! + # six.viewkeys() compatibility function uses dict.viewkeys() on + # python2, which is not a Mapping mixin method, so that doesn't + # work either (for our stix2 objects). + keys = set(data) + else: + keys = data.keys() + + # This should be sufficient for STIX objects; maybe we get lucky with + # dicts here but probably not. + if keys >= _VERSIONING_PROPERTIES: + is_versionable = True + + # Tougher to handle dicts. We need to consider STIX version, map to a + # registered class, and from that get a more complete picture of its + # properties. + elif isinstance(data, dict): + class_maps = stix2.parsing.STIX2_OBJ_MAPS[stix_vid] + obj_type = data["type"] + + if obj_type in class_maps["objects"]: + # Should we bother checking properties for SDOs/SROs? + # They were designed to be versionable. + is_versionable = True + + elif obj_type in class_maps["observables"]: + # but do check SCOs + cls = class_maps["observables"][obj_type] + is_versionable = _VERSIONING_PROPERTIES.issubset( + cls._properties, + ) + + return is_versionable, is_21 + + +def new_version(data, allow_custom=None, **kwargs): + """ + Create a new version of a STIX object, by modifying properties and + updating the ``modified`` property. + + :param data: The object to create a new version of. Maybe a stix2 object + or dict. + :param allow_custom: Whether to allow custom properties on the new object. + If True, allow them (regardless of whether the original had custom + properties); if False disallow them; if None, propagate the preference + from the original object. + :param kwargs: The properties to change. Setting to None requests property + removal. + :return: The new object. + """ + + is_versionable, is_21 = _is_versionable(data) + + if not is_versionable: + raise ValueError( + "cannot create new version of object of this type! " + "Try a dictionary or instance of an SDO or SRO class.", + ) + + if data.get('revoked'): + raise RevokeError("new_version") + try: + new_obj_inner = copy.deepcopy(data._inner) + except AttributeError: + new_obj_inner = copy.deepcopy(data) + + # Make sure certain properties aren't trying to change + # ID contributing properties of 2.1+ SCOs may also not change if a UUIDv5 + # is in use (depending on whether they were used to create it... but they + # probably were). That would imply an ID change, which is not allowed + # across versions. + sco_locked_props = [] + if is_21 and isinstance(data, stix2.base._Observable): + uuid_ = uuid.UUID(data["id"][-36:]) + if uuid_.variant == uuid.RFC_4122 and uuid_.version == 5: + sco_locked_props = data._id_contributing_properties + + unchangable_properties = set() + for prop in itertools.chain(STIX_UNMOD_PROPERTIES, sco_locked_props): + if prop in kwargs: + unchangable_properties.add(prop) + if unchangable_properties: + raise UnmodifiablePropertyError(unchangable_properties) + + # Different versioning precision rules in STIX 2.0 vs 2.1, so we need + # to know which rules to apply. + precision_constraint = "min" if is_21 else "exact" + + cls = type(data) + if 'modified' not in kwargs: + old_modified = parse_into_datetime( + data["modified"], precision="millisecond", + precision_constraint=precision_constraint, + ) + + new_modified = get_timestamp() + new_modified = _fudge_modified(old_modified, new_modified, is_21) + + kwargs['modified'] = new_modified + + elif 'modified' in data: + old_modified_property = parse_into_datetime( + data.get('modified'), precision='millisecond', + precision_constraint=precision_constraint, + ) + new_modified_property = parse_into_datetime( + kwargs['modified'], precision='millisecond', + precision_constraint=precision_constraint, + ) + if new_modified_property <= old_modified_property: + raise InvalidValueError( + cls, 'modified', + "The new modified datetime cannot be before than or equal to the current modified datetime." + "It cannot be equal, as according to STIX 2 specification, objects that are different " + "but have the same id and modified timestamp do not have defined consumer behavior.", + ) + new_obj_inner.update(kwargs) + + # Set allow_custom appropriately if versioning an object. We will ignore + # it for dicts. + if isinstance(data, stix2.base._STIXBase): + if allow_custom is None: + new_obj_inner["allow_custom"] = data._allow_custom + else: + new_obj_inner["allow_custom"] = allow_custom + + # Exclude properties with a value of 'None' in case data is not an instance of a _STIXBase subclass + return cls(**{k: v for k, v in new_obj_inner.items() if v is not None}) + + +def revoke(data): + """Revoke a STIX object. + + Returns: + A new version of the object with ``revoked`` set to ``True``. + """ + if not isinstance(data, Mapping): + raise ValueError( + "cannot revoke object of this type! Try a dictionary " + "or instance of an SDO or SRO class.", + ) + + if data.get('revoked'): + raise RevokeError("revoke") + return new_version(data, revoked=True) + + +def remove_custom_stix(stix_obj): + """Remove any custom STIX objects or properties. + + Warnings: + This function is a best effort utility, in that it will remove custom + objects and properties based on the type names; i.e. if "x-" prefixes + object types, and "x\\_" prefixes property types. According to the + STIX2 spec, those naming conventions are a SHOULDs not MUSTs, meaning + that valid custom STIX content may ignore those conventions and in + effect render this utility function invalid when used on that STIX + content. + + Args: + stix_obj (dict OR python-stix obj): a single python-stix object + or dict of a STIX object + + Returns: + A new version of the object with any custom content removed + """ + + if stix_obj['type'].startswith('x-'): + # if entire object is custom, discard + return None + + custom_props = { + k: None + for k in stix_obj if k.startswith("x_") + } + + if custom_props: + new_obj = new_version(stix_obj, allow_custom=False, **custom_props) + + return new_obj + + else: + return stix_obj