Revamp deterministic ID generation code to fix bugs.

pull/1/head
Michael Chisholm 2020-06-01 20:24:22 -04:00
parent 41525f9be0
commit 6c2c4781e7
1 changed files with 130 additions and 70 deletions

View File

@ -334,24 +334,21 @@ class _Observable(_STIXBase):
def __init__(self, **kwargs): def __init__(self, **kwargs):
# the constructor might be called independently of an observed data object # the constructor might be called independently of an observed data object
self._STIXBase__valid_refs = kwargs.pop('_valid_refs', []) self._STIXBase__valid_refs = kwargs.pop('_valid_refs', [])
self._allow_custom = kwargs.get('allow_custom', False)
self._properties['extensions'].allow_custom = kwargs.get('allow_custom', False) self._properties['extensions'].allow_custom = kwargs.get('allow_custom', False)
try:
# Since `spec_version` is optional, this is how we check for a 2.1 SCO
self._id_contributing_properties
if 'id' not in kwargs:
possible_id = self._generate_id(kwargs)
if possible_id is not None:
kwargs['id'] = possible_id
except AttributeError:
# End up here if handling a 2.0 SCO, and don't need to do anything further
pass
super(_Observable, self).__init__(**kwargs) super(_Observable, self).__init__(**kwargs)
if 'id' not in kwargs and not isinstance(self, stix2.v20._Observable):
# Specific to 2.1+ observables: generate a deterministic ID
id_ = self._generate_id()
# Spec says fall back to UUIDv4 if no contributing properties were
# given. That's what already happened (the following is actually
# overwriting the default uuidv4), so nothing to do here.
if id_ is not None:
# Can't assign to self (we're immutable), so slip the ID in
# more sneakily.
self._inner["id"] = id_
def _check_ref(self, ref, prop, prop_name): def _check_ref(self, ref, prop, prop_name):
""" """
Only for checking `*_ref` or `*_refs` properties in spec_version 2.0 Only for checking `*_ref` or `*_refs` properties in spec_version 2.0
@ -396,42 +393,50 @@ class _Observable(_STIXBase):
for ref in kwargs[prop_name]: for ref in kwargs[prop_name]:
self._check_ref(ref, prop, prop_name) self._check_ref(ref, prop, prop_name)
def _generate_id(self, kwargs): def _generate_id(self):
required_prefix = self._type + "--" """
Generate a UUIDv5 for this observable, using its "ID contributing
properties".
properties_to_use = self._id_contributing_properties :return: The ID, or None if no ID contributing properties are set
if properties_to_use: """
streamlined_object = {}
if "hashes" in kwargs and "hashes" in properties_to_use: id_ = None
possible_hash = _choose_one_hash(kwargs["hashes"]) json_serializable_object = {}
if possible_hash:
streamlined_object["hashes"] = possible_hash for key in self._id_contributing_properties:
for key in properties_to_use:
if key != "hashes" and key in kwargs: if key in self:
if isinstance(kwargs[key], dict) or isinstance(kwargs[key], _STIXBase): obj_value = self[key]
temp_deep_copy = copy.deepcopy(dict(kwargs[key]))
_recursive_stix_to_dict(temp_deep_copy) if key == "hashes":
streamlined_object[key] = temp_deep_copy possible_hash = _choose_one_hash(obj_value)
elif isinstance(kwargs[key], list): if possible_hash:
temp_deep_copy = copy.deepcopy(kwargs[key]) serializable_value = possible_hash
_recursive_stix_list_to_dict(temp_deep_copy)
streamlined_object[key] = temp_deep_copy
else:
streamlined_object[key] = kwargs[key]
if streamlined_object:
data = canonicalize(streamlined_object, utf8=False)
# The situation is complicated w.r.t. python 2/3 behavior, so
# I'd rather not rely on particular exceptions being raised to
# determine what to do. Better to just check the python version
# directly.
if six.PY3:
return required_prefix + six.text_type(uuid.uuid5(SCO_DET_ID_NAMESPACE, data))
else: else:
return required_prefix + six.text_type(uuid.uuid5(SCO_DET_ID_NAMESPACE, data.encode("utf-8"))) serializable_value = _make_json_serializable(obj_value)
# We return None if there are no values specified for any of the id-contributing-properties json_serializable_object[key] = serializable_value
return None
if json_serializable_object:
data = canonicalize(json_serializable_object, utf8=False)
# The situation is complicated w.r.t. python 2/3 behavior, so
# I'd rather not rely on particular exceptions being raised to
# determine what to do. Better to just check the python version
# directly.
if six.PY3:
uuid_ = uuid.uuid5(SCO_DET_ID_NAMESPACE, data)
else:
uuid_ = uuid.uuid5(
SCO_DET_ID_NAMESPACE, data.encode("utf-8")
)
id_ = "{}--{}".format(self._type, six.text_type(uuid_))
return id_
class _Extension(_STIXBase): class _Extension(_STIXBase):
@ -455,35 +460,90 @@ def _choose_one_hash(hash_dict):
if k is not None: if k is not None:
return {k: hash_dict[k]} return {k: hash_dict[k]}
return None
def _cls_init(cls, obj, kwargs): def _cls_init(cls, obj, kwargs):
if getattr(cls, '__init__', object.__init__) is not object.__init__: if getattr(cls, '__init__', object.__init__) is not object.__init__:
cls.__init__(obj, **kwargs) cls.__init__(obj, **kwargs)
def _recursive_stix_to_dict(input_dict): def _make_json_serializable(value):
for key in input_dict: """
if isinstance(input_dict[key], dict): Make the given value JSON-serializable; required for the JSON canonicalizer
_recursive_stix_to_dict(input_dict[key]) to work. This recurses into lists/dicts, converts stix objects to dicts,
elif isinstance(input_dict[key], _STIXBase): etc. "Convenience" types this library uses as property values are
input_dict[key] = dict(input_dict[key]) JSON-serialized to produce a JSON-serializable value. (So you will always
get strings for those.)
# There may stil be nested _STIXBase objects The conversion will not affect the passed in value.
_recursive_stix_to_dict(input_dict[key])
elif isinstance(input_dict[key], list): :param value: The value to make JSON-serializable.
_recursive_stix_list_to_dict(input_dict[key]) :return: The JSON-serializable value.
else: :raises ValueError: If value is None (since nulls are not allowed in STIX
pass objects).
"""
if value is None:
raise ValueError("Illegal null value found in a STIX object")
json_value = value # default assumption
if isinstance(value, Mapping):
json_value = {
k: _make_json_serializable(v)
for k, v in value.items()
}
elif isinstance(value, list):
json_value = [
_make_json_serializable(v)
for v in value
]
elif not isinstance(value, (int, float, six.string_types, bool)):
# If a "simple" value which is not already JSON-serializable,
# JSON-serialize to a string and use that as our JSON-serializable
# value. This applies to our datetime objects currently (timestamp
# properties), and could apply to any other "convenience" types this
# library uses for property values in the future.
json_value = json.dumps(value, ensure_ascii=False, cls=STIXJSONEncoder)
# If it looks like a string literal was output, strip off the quotes.
# Otherwise, a second pair will be added when it's canonicalized. Also
# to be extra safe, we need to unescape.
if len(json_value) >= 2 and \
json_value[0] == '"' and json_value[-1] == '"':
json_value = _un_json_escape(json_value[1:-1])
return json_value
def _recursive_stix_list_to_dict(input_list): def _un_json_escape(json_string):
for i in range(len(input_list)): """
if isinstance(input_list[i], _STIXBase): Removes JSON string literal escapes. We should undo these things Python's
input_list[i] = dict(input_list[i]) serializer does, so we can ensure they're done canonically. The
elif isinstance(input_list[i], dict): canonicalizer should be in charge of everything, as much as is feasible.
pass
elif isinstance(input_list[i], list): :param json_string: String literal output of Python's JSON serializer,
_recursive_stix_list_to_dict(input_list[i]) minus the surrounding quotes.
else: :return: The unescaped string
continue """
_recursive_stix_to_dict(input_list[i])
# I don't think I should need to worry about the unicode escapes (\uXXXX)
# since I use ensure_ascii=False when generating it. I will just fix all
# the other escapes, e.g. \n, \r, etc.
#
# This list is taken from RFC7159 section 7:
# https://tools.ietf.org/html/rfc7159.html#section-7
result = json_string\
.replace(r"\"", "\"")\
.replace(r"\/", "/")\
.replace(r"\b", "\b")\
.replace(r"\f", "\f")\
.replace(r"\n", "\n")\
.replace(r"\r", "\r")\
.replace(r"\t", "\t")\
.replace(r"\\", "\\") # Must do this one last!
return result