Merge pull request #402 from chisholm/fix_deterministic_ids

Fix deterministic ids
pull/1/head
Chris Lenk 2020-06-08 07:51:45 -04:00 committed by GitHub
commit 6faf6b9fa1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 480 additions and 200 deletions

View File

@ -334,24 +334,21 @@ class _Observable(_STIXBase):
def __init__(self, **kwargs):
# the constructor might be called independently of an observed data object
self._STIXBase__valid_refs = kwargs.pop('_valid_refs', [])
self._allow_custom = kwargs.get('allow_custom', False)
self._properties['extensions'].allow_custom = kwargs.get('allow_custom', False)
try:
# Since `spec_version` is optional, this is how we check for a 2.1 SCO
self._id_contributing_properties
if 'id' not in kwargs:
possible_id = self._generate_id(kwargs)
if possible_id is not None:
kwargs['id'] = possible_id
except AttributeError:
# End up here if handling a 2.0 SCO, and don't need to do anything further
pass
super(_Observable, self).__init__(**kwargs)
if 'id' not in kwargs and not isinstance(self, stix2.v20._Observable):
# Specific to 2.1+ observables: generate a deterministic ID
id_ = self._generate_id()
# Spec says fall back to UUIDv4 if no contributing properties were
# given. That's what already happened (the following is actually
# overwriting the default uuidv4), so nothing to do here.
if id_ is not None:
# Can't assign to self (we're immutable), so slip the ID in
# more sneakily.
self._inner["id"] = id_
def _check_ref(self, ref, prop, prop_name):
"""
Only for checking `*_ref` or `*_refs` properties in spec_version 2.0
@ -396,42 +393,53 @@ class _Observable(_STIXBase):
for ref in kwargs[prop_name]:
self._check_ref(ref, prop, prop_name)
def _generate_id(self, kwargs):
required_prefix = self._type + "--"
def _generate_id(self):
"""
Generate a UUIDv5 for this observable, using its "ID contributing
properties".
properties_to_use = self._id_contributing_properties
if properties_to_use:
streamlined_object = {}
if "hashes" in kwargs and "hashes" in properties_to_use:
possible_hash = _choose_one_hash(kwargs["hashes"])
if possible_hash:
streamlined_object["hashes"] = possible_hash
for key in properties_to_use:
if key != "hashes" and key in kwargs:
if isinstance(kwargs[key], dict) or isinstance(kwargs[key], _STIXBase):
temp_deep_copy = copy.deepcopy(dict(kwargs[key]))
_recursive_stix_to_dict(temp_deep_copy)
streamlined_object[key] = temp_deep_copy
elif isinstance(kwargs[key], list):
temp_deep_copy = copy.deepcopy(kwargs[key])
_recursive_stix_list_to_dict(temp_deep_copy)
streamlined_object[key] = temp_deep_copy
else:
streamlined_object[key] = kwargs[key]
if streamlined_object:
data = canonicalize(streamlined_object, utf8=False)
:return: The ID, or None if no ID contributing properties are set
"""
id_ = None
json_serializable_object = {}
for key in self._id_contributing_properties:
if key in self:
obj_value = self[key]
if key == "hashes":
serializable_value = _choose_one_hash(obj_value)
if serializable_value is None:
raise InvalidValueError(
self, key, "No hashes given",
)
# The situation is complicated w.r.t. python 2/3 behavior, so
# I'd rather not rely on particular exceptions being raised to
# determine what to do. Better to just check the python version
# directly.
if six.PY3:
return required_prefix + six.text_type(uuid.uuid5(SCO_DET_ID_NAMESPACE, data))
else:
return required_prefix + six.text_type(uuid.uuid5(SCO_DET_ID_NAMESPACE, data.encode("utf-8")))
serializable_value = _make_json_serializable(obj_value)
# We return None if there are no values specified for any of the id-contributing-properties
return None
json_serializable_object[key] = serializable_value
if json_serializable_object:
data = canonicalize(json_serializable_object, utf8=False)
# The situation is complicated w.r.t. python 2/3 behavior, so
# I'd rather not rely on particular exceptions being raised to
# determine what to do. Better to just check the python version
# directly.
if six.PY3:
uuid_ = uuid.uuid5(SCO_DET_ID_NAMESPACE, data)
else:
uuid_ = uuid.uuid5(
SCO_DET_ID_NAMESPACE, data.encode("utf-8"),
)
id_ = "{}--{}".format(self._type, six.text_type(uuid_))
return id_
class _Extension(_STIXBase):
@ -455,35 +463,100 @@ def _choose_one_hash(hash_dict):
if k is not None:
return {k: hash_dict[k]}
return None
def _cls_init(cls, obj, kwargs):
if getattr(cls, '__init__', object.__init__) is not object.__init__:
cls.__init__(obj, **kwargs)
def _recursive_stix_to_dict(input_dict):
for key in input_dict:
if isinstance(input_dict[key], dict):
_recursive_stix_to_dict(input_dict[key])
elif isinstance(input_dict[key], _STIXBase):
input_dict[key] = dict(input_dict[key])
def _make_json_serializable(value):
"""
Make the given value JSON-serializable; required for the JSON canonicalizer
to work. This recurses into lists/dicts, converts stix objects to dicts,
etc. "Convenience" types this library uses as property values are
JSON-serialized to produce a JSON-serializable value. (So you will always
get strings for those.)
# There may stil be nested _STIXBase objects
_recursive_stix_to_dict(input_dict[key])
elif isinstance(input_dict[key], list):
_recursive_stix_list_to_dict(input_dict[key])
else:
pass
The conversion will not affect the passed in value.
:param value: The value to make JSON-serializable.
:return: The JSON-serializable value.
:raises ValueError: If value is None (since nulls are not allowed in STIX
objects).
"""
if value is None:
raise ValueError("Illegal null value found in a STIX object")
json_value = value # default assumption
if isinstance(value, Mapping):
json_value = {
k: _make_json_serializable(v)
for k, v in value.items()
}
elif isinstance(value, list):
json_value = [
_make_json_serializable(v)
for v in value
]
elif not isinstance(value, (int, float, six.string_types, bool)):
# If a "simple" value which is not already JSON-serializable,
# JSON-serialize to a string and use that as our JSON-serializable
# value. This applies to our datetime objects currently (timestamp
# properties), and could apply to any other "convenience" types this
# library uses for property values in the future.
json_value = json.dumps(value, ensure_ascii=False, cls=STIXJSONEncoder)
# If it looks like a string literal was output, strip off the quotes.
# Otherwise, a second pair will be added when it's canonicalized. Also
# to be extra safe, we need to unescape.
if len(json_value) >= 2 and \
json_value[0] == '"' and json_value[-1] == '"':
json_value = _un_json_escape(json_value[1:-1])
return json_value
def _recursive_stix_list_to_dict(input_list):
for i in range(len(input_list)):
if isinstance(input_list[i], _STIXBase):
input_list[i] = dict(input_list[i])
elif isinstance(input_list[i], dict):
pass
elif isinstance(input_list[i], list):
_recursive_stix_list_to_dict(input_list[i])
else:
continue
_recursive_stix_to_dict(input_list[i])
_JSON_ESCAPE_RE = re.compile(r"\\.")
# I don't think I should need to worry about the unicode escapes (\uXXXX)
# since I use ensure_ascii=False when generating it. I will just fix all
# the other escapes, e.g. \n, \r, etc.
#
# This list is taken from RFC8259 section 7:
# https://tools.ietf.org/html/rfc8259#section-7
# Maps the second char of a "\X" style escape, to a replacement char
_JSON_ESCAPE_MAP = {
'"': '"',
"\\": "\\",
"/": "/",
"b": "\b",
"f": "\f",
"n": "\n",
"r": "\r",
"t": "\t"
}
def _un_json_escape(json_string):
"""
Removes JSON string literal escapes. We should undo these things Python's
serializer does, so we can ensure they're done canonically. The
canonicalizer should be in charge of everything, as much as is feasible.
:param json_string: String literal output of Python's JSON serializer,
minus the surrounding quotes.
:return: The unescaped string
"""
def replace(m):
replacement = _JSON_ESCAPE_MAP.get(m.group(0)[1])
if replacement is None:
raise ValueError("Unrecognized JSON escape: " + m.group(0))
return replacement
result = _JSON_ESCAPE_RE.sub(replace, json_string)
return result

View File

@ -0,0 +1,337 @@
from collections import OrderedDict
import datetime
import uuid
import pytest
import six
import stix2.base
import stix2.canonicalization.Canonicalize
import stix2.exceptions
from stix2.properties import (
BooleanProperty, DictionaryProperty, EmbeddedObjectProperty,
ExtensionsProperty, FloatProperty, HashesProperty, IDProperty,
IntegerProperty, ListProperty, StringProperty, TimestampProperty,
TypeProperty,
)
import stix2.v21
SCO_DET_ID_NAMESPACE = uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7")
def _uuid_from_id(id_):
dd_idx = id_.index("--")
uuid_str = id_[dd_idx+2:]
uuid_ = uuid.UUID(uuid_str)
return uuid_
def _make_uuid5(name):
"""
Make a STIX 2.1+ compliant UUIDv5 from a "name".
"""
if six.PY3:
uuid_ = uuid.uuid5(SCO_DET_ID_NAMESPACE, name)
else:
uuid_ = uuid.uuid5(
SCO_DET_ID_NAMESPACE, name.encode("utf-8"),
)
return uuid_
def test_no_contrib_props_defined():
class SomeSCO(stix2.v21._Observable):
_type = "some-sco"
_properties = OrderedDict((
('type', TypeProperty(_type, spec_version='2.1')),
('id', IDProperty(_type, spec_version='2.1')),
(
'extensions', ExtensionsProperty(
spec_version='2.1', enclosing_type=_type,
),
),
))
_id_contributing_properties = []
sco = SomeSCO()
uuid_ = _uuid_from_id(sco["id"])
assert uuid_.variant == uuid.RFC_4122
assert uuid_.version == 4
def test_json_compatible_prop_values():
class SomeSCO(stix2.v21._Observable):
_type = "some-sco"
_properties = OrderedDict((
('type', TypeProperty(_type, spec_version='2.1')),
('id', IDProperty(_type, spec_version='2.1')),
(
'extensions', ExtensionsProperty(
spec_version='2.1', enclosing_type=_type,
),
),
('string', StringProperty()),
('int', IntegerProperty()),
('float', FloatProperty()),
('bool', BooleanProperty()),
('list', ListProperty(IntegerProperty())),
('dict', DictionaryProperty(spec_version="2.1")),
))
_id_contributing_properties = [
'string', 'int', 'float', 'bool', 'list', 'dict',
]
obj = {
"string": "abc",
"int": 1,
"float": 1.5,
"bool": True,
"list": [1, 2, 3],
"dict": {"a": 1, "b": [2], "c": "three"},
}
sco = SomeSCO(**obj)
can_json = stix2.canonicalization.Canonicalize.canonicalize(obj, utf8=False)
expected_uuid5 = _make_uuid5(can_json)
actual_uuid5 = _uuid_from_id(sco["id"])
assert actual_uuid5 == expected_uuid5
def test_json_incompatible_timestamp_value():
class SomeSCO(stix2.v21._Observable):
_type = "some-sco"
_properties = OrderedDict((
('type', TypeProperty(_type, spec_version='2.1')),
('id', IDProperty(_type, spec_version='2.1')),
(
'extensions', ExtensionsProperty(
spec_version='2.1', enclosing_type=_type,
),
),
('timestamp', TimestampProperty()),
))
_id_contributing_properties = ['timestamp']
ts = datetime.datetime(1987, 1, 2, 3, 4, 5, 678900)
sco = SomeSCO(timestamp=ts)
obj = {
"timestamp": "1987-01-02T03:04:05.6789Z",
}
can_json = stix2.canonicalization.Canonicalize.canonicalize(obj, utf8=False)
expected_uuid5 = _make_uuid5(can_json)
actual_uuid5 = _uuid_from_id(sco["id"])
assert actual_uuid5 == expected_uuid5
def test_embedded_object():
class SubObj(stix2.base._STIXBase):
_type = "sub-object"
_properties = OrderedDict((
('value', StringProperty()),
))
class SomeSCO(stix2.v21._Observable):
_type = "some-sco"
_properties = OrderedDict((
('type', TypeProperty(_type, spec_version='2.1')),
('id', IDProperty(_type, spec_version='2.1')),
(
'extensions', ExtensionsProperty(
spec_version='2.1', enclosing_type=_type,
),
),
('sub_obj', EmbeddedObjectProperty(type=SubObj)),
))
_id_contributing_properties = ['sub_obj']
sub_obj = SubObj(value="foo")
sco = SomeSCO(sub_obj=sub_obj)
obj = {
"sub_obj": {
"value": "foo",
},
}
can_json = stix2.canonicalization.Canonicalize.canonicalize(obj, utf8=False)
expected_uuid5 = _make_uuid5(can_json)
actual_uuid5 = _uuid_from_id(sco["id"])
assert actual_uuid5 == expected_uuid5
def test_empty_hash():
class SomeSCO(stix2.v21._Observable):
_type = "some-sco"
_properties = OrderedDict((
('type', TypeProperty(_type, spec_version='2.1')),
('id', IDProperty(_type, spec_version='2.1')),
(
'extensions', ExtensionsProperty(
spec_version='2.1', enclosing_type=_type,
),
),
('hashes', HashesProperty()),
))
_id_contributing_properties = ['hashes']
with pytest.raises(stix2.exceptions.InvalidValueError):
SomeSCO(hashes={})
@pytest.mark.parametrize("json_escaped, expected_unescaped", [
("", ""),
("a", "a"),
(r"\n", "\n"),
(r"\n\r\b\t\\\/\"", "\n\r\b\t\\/\""),
(r"\\n", r"\n"),
(r"\\\n", "\\\n")
])
def test_json_unescaping(json_escaped, expected_unescaped):
actual_unescaped = stix2.base._un_json_escape(json_escaped)
assert actual_unescaped == expected_unescaped
def test_json_unescaping_bad_escape():
with pytest.raises(ValueError):
stix2.base._un_json_escape(r"\x")
def test_deterministic_id_same_extra_prop_vals():
email_addr_1 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Johnny Doe",
)
email_addr_2 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Johnny Doe",
)
assert email_addr_1.id == email_addr_2.id
uuid_obj_1 = uuid.UUID(email_addr_1.id[-36:])
assert uuid_obj_1.variant == uuid.RFC_4122
assert uuid_obj_1.version == 5
uuid_obj_2 = uuid.UUID(email_addr_2.id[-36:])
assert uuid_obj_2.variant == uuid.RFC_4122
assert uuid_obj_2.version == 5
def test_deterministic_id_diff_extra_prop_vals():
email_addr_1 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Johnny Doe",
)
email_addr_2 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Janey Doe",
)
assert email_addr_1.id == email_addr_2.id
uuid_obj_1 = uuid.UUID(email_addr_1.id[-36:])
assert uuid_obj_1.variant == uuid.RFC_4122
assert uuid_obj_1.version == 5
uuid_obj_2 = uuid.UUID(email_addr_2.id[-36:])
assert uuid_obj_2.variant == uuid.RFC_4122
assert uuid_obj_2.version == 5
def test_deterministic_id_diff_contributing_prop_vals():
email_addr_1 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Johnny Doe",
)
email_addr_2 = stix2.v21.EmailAddress(
value="jane@example.com",
display_name="Janey Doe",
)
assert email_addr_1.id != email_addr_2.id
uuid_obj_1 = uuid.UUID(email_addr_1.id[-36:])
assert uuid_obj_1.variant == uuid.RFC_4122
assert uuid_obj_1.version == 5
uuid_obj_2 = uuid.UUID(email_addr_2.id[-36:])
assert uuid_obj_2.variant == uuid.RFC_4122
assert uuid_obj_2.version == 5
def test_deterministic_id_no_contributing_props():
email_msg_1 = stix2.v21.EmailMessage(
is_multipart=False,
)
email_msg_2 = stix2.v21.EmailMessage(
is_multipart=False,
)
assert email_msg_1.id != email_msg_2.id
uuid_obj_1 = uuid.UUID(email_msg_1.id[-36:])
assert uuid_obj_1.variant == uuid.RFC_4122
assert uuid_obj_1.version == 4
uuid_obj_2 = uuid.UUID(email_msg_2.id[-36:])
assert uuid_obj_2.variant == uuid.RFC_4122
assert uuid_obj_2.version == 4
def test_id_gen_recursive_dict_conversion_1():
file_observable = stix2.v21.File(
name="example.exe",
size=68 * 1000,
magic_number_hex="50000000",
hashes={
"SHA-256": "841a8921140aba50671ebb0770fecc4ee308c4952cfeff8de154ab14eeef4649",
},
extensions={
"windows-pebinary-ext": stix2.v21.WindowsPEBinaryExt(
pe_type="exe",
machine_hex="014c",
sections=[
stix2.v21.WindowsPESection(
name=".data",
size=4096,
entropy=7.980693,
hashes={"SHA-256": "6e3b6f3978e5cd96ba7abee35c24e867b7e64072e2ecb22d0ee7a6e6af6894d0"},
),
],
),
},
)
assert file_observable.id == "file--ced31cd4-bdcb-537d-aefa-92d291bfc11d"
def test_id_gen_recursive_dict_conversion_2():
wrko = stix2.v21.WindowsRegistryKey(
values=[
stix2.v21.WindowsRegistryValueType(
name="Foo",
data="qwerty",
),
stix2.v21.WindowsRegistryValueType(
name="Bar",
data="42",
),
],
)
assert wrko.id == "windows-registry-key--36594eba-bcc7-5014-9835-0e154264e588"

View File

@ -1469,133 +1469,3 @@ def test_objects_deprecation():
},
},
)
def test_deterministic_id_same_extra_prop_vals():
email_addr_1 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Johnny Doe",
)
email_addr_2 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Johnny Doe",
)
assert email_addr_1.id == email_addr_2.id
uuid_obj_1 = uuid.UUID(email_addr_1.id[-36:])
assert uuid_obj_1.variant == uuid.RFC_4122
assert uuid_obj_1.version == 5
uuid_obj_2 = uuid.UUID(email_addr_2.id[-36:])
assert uuid_obj_2.variant == uuid.RFC_4122
assert uuid_obj_2.version == 5
def test_deterministic_id_diff_extra_prop_vals():
email_addr_1 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Johnny Doe",
)
email_addr_2 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Janey Doe",
)
assert email_addr_1.id == email_addr_2.id
uuid_obj_1 = uuid.UUID(email_addr_1.id[-36:])
assert uuid_obj_1.variant == uuid.RFC_4122
assert uuid_obj_1.version == 5
uuid_obj_2 = uuid.UUID(email_addr_2.id[-36:])
assert uuid_obj_2.variant == uuid.RFC_4122
assert uuid_obj_2.version == 5
def test_deterministic_id_diff_contributing_prop_vals():
email_addr_1 = stix2.v21.EmailAddress(
value="john@example.com",
display_name="Johnny Doe",
)
email_addr_2 = stix2.v21.EmailAddress(
value="jane@example.com",
display_name="Janey Doe",
)
assert email_addr_1.id != email_addr_2.id
uuid_obj_1 = uuid.UUID(email_addr_1.id[-36:])
assert uuid_obj_1.variant == uuid.RFC_4122
assert uuid_obj_1.version == 5
uuid_obj_2 = uuid.UUID(email_addr_2.id[-36:])
assert uuid_obj_2.variant == uuid.RFC_4122
assert uuid_obj_2.version == 5
def test_deterministic_id_no_contributing_props():
email_msg_1 = stix2.v21.EmailMessage(
is_multipart=False,
)
email_msg_2 = stix2.v21.EmailMessage(
is_multipart=False,
)
assert email_msg_1.id != email_msg_2.id
uuid_obj_1 = uuid.UUID(email_msg_1.id[-36:])
assert uuid_obj_1.variant == uuid.RFC_4122
assert uuid_obj_1.version == 4
uuid_obj_2 = uuid.UUID(email_msg_2.id[-36:])
assert uuid_obj_2.variant == uuid.RFC_4122
assert uuid_obj_2.version == 4
def test_id_gen_recursive_dict_conversion_1():
file_observable = stix2.v21.File(
name="example.exe",
size=68 * 1000,
magic_number_hex="50000000",
hashes={
"SHA-256": "841a8921140aba50671ebb0770fecc4ee308c4952cfeff8de154ab14eeef4649",
},
extensions={
"windows-pebinary-ext": stix2.v21.WindowsPEBinaryExt(
pe_type="exe",
machine_hex="014c",
sections=[
stix2.v21.WindowsPESection(
name=".data",
size=4096,
entropy=7.980693,
hashes={"SHA-256": "6e3b6f3978e5cd96ba7abee35c24e867b7e64072e2ecb22d0ee7a6e6af6894d0"},
),
],
),
},
)
assert file_observable.id == "file--ced31cd4-bdcb-537d-aefa-92d291bfc11d"
def test_id_gen_recursive_dict_conversion_2():
wrko = stix2.v21.WindowsRegistryKey(
values=[
stix2.v21.WindowsRegistryValueType(
name="Foo",
data="qwerty",
),
stix2.v21.WindowsRegistryValueType(
name="Bar",
data="42",
),
],
)
assert wrko.id == "windows-registry-key--36594eba-bcc7-5014-9835-0e154264e588"