chg: [interoperability] The interoperability is managed directly within the needed properties

pull/1/head
Christian Studer 2022-05-23 11:54:41 +02:00
parent 9a966f45c3
commit 276410537d
No known key found for this signature in database
GPG Key ID: 6BBED1B63A6D639F
3 changed files with 35 additions and 95 deletions

View File

@ -36,7 +36,7 @@ def get_required_properties(properties):
class _STIXBase(collections.abc.Mapping):
"""Base class for STIX object types"""
def _check_property(self, prop_name, prop, kwargs, allow_custom):
def _check_property(self, prop_name, prop, kwargs, allow_custom, interoperability):
if prop_name not in kwargs:
if hasattr(prop, 'default'):
value = prop.default()
@ -46,10 +46,11 @@ class _STIXBase(collections.abc.Mapping):
has_custom = False
if prop_name in kwargs:
arguments = [kwargs[prop_name], allow_custom]
if isinstance(prop, self.__INTEROPERABILITY_types):
arguments.append(interoperability)
try:
kwargs[prop_name], has_custom = prop.clean(
kwargs[prop_name], allow_custom,
)
kwargs[prop_name], has_custom = prop.clean(*arguments)
except InvalidValueError:
# No point in wrapping InvalidValueError in another
# InvalidValueError... so let those propagate.
@ -114,11 +115,16 @@ class _STIXBase(collections.abc.Mapping):
def __init__(self, allow_custom=False, interoperability=False, **kwargs):
cls = self.__class__
self.__interoperability = interoperability
# Use the same timestamp for any auto-generated datetimes
self.__now = get_timestamp()
self.__INTEROPERABILITY_types = (
stix2.properties.EmbeddedObjectProperty, stix2.properties.IDProperty,
stix2.properties.ListProperty, stix2.properties.OpenVocabProperty,
stix2.properties.ReferenceProperty
)
custom_props = kwargs.pop('custom_properties', {})
if custom_props and not isinstance(custom_props, dict):
raise ValueError("'custom_properties' must be a dictionary")
@ -202,7 +208,7 @@ class _STIXBase(collections.abc.Mapping):
prop = defined_properties.get(prop_name)
if prop:
temp_custom = self._check_property(
prop_name, prop, setting_kwargs, allow_custom,
prop_name, prop, setting_kwargs, allow_custom, interoperability
)
has_custom = has_custom or temp_custom
@ -292,8 +298,7 @@ class _STIXBase(collections.abc.Mapping):
if isinstance(self, _Observable):
# Assume: valid references in the original object are still valid in the new version
new_inner['_valid_refs'] = {'*': '*'}
new_inner['interoperability'] = self.__interoperability
return cls(allow_custom=True, **new_inner)
return cls(allow_custom=True, interoperability=False, **new_inner)
def properties_populated(self):
return list(self._inner.keys())
@ -368,26 +373,11 @@ class _STIXBase(collections.abc.Mapping):
class _DomainObject(_STIXBase, _MarkingsMixin):
def __init__(self, *args, **kwargs):
interoperability = kwargs.get('interoperability', False)
self.__interoperability = interoperability
self._properties['id'].interoperability = interoperability
self._properties['created_by_ref'].interoperability = interoperability
if kwargs.get('object_marking_refs'):
self._properties['object_marking_refs'].contained.interoperability = interoperability
super(_DomainObject, self).__init__(*args, **kwargs)
pass
class _RelationshipObject(_STIXBase, _MarkingsMixin):
def __init__(self, *args, **kwargs):
interoperability = kwargs.get('interoperability', False)
self.__interoperability = interoperability
self._properties['id'].interoperability = interoperability
if kwargs.get('created_by_ref'):
self._properties['created_by_ref'].interoperability = interoperability
if kwargs.get('object_marking_refs'):
self._properties['object_marking_refs'].contained.interoperability = interoperability
super(_RelationshipObject, self).__init__(*args, **kwargs)
pass
class _Observable(_STIXBase):
@ -426,8 +416,8 @@ class _Observable(_STIXBase):
if ref_type not in allowed_types:
raise InvalidObjRefError(self.__class__, prop_name, "object reference '%s' is of an invalid type '%s'" % (ref, ref_type))
def _check_property(self, prop_name, prop, kwargs, allow_custom):
has_custom = super(_Observable, self)._check_property(prop_name, prop, kwargs, allow_custom)
def _check_property(self, prop_name, prop, kwargs, allow_custom, interoperability):
has_custom = super(_Observable, self)._check_property(prop_name, prop, kwargs, allow_custom, interoperability)
if prop_name in kwargs:
from .properties import ObjectReferenceProperty
@ -516,7 +506,6 @@ def _make_json_serializable(value):
etc. "Convenience" types this library uses as property values are
JSON-serialized to produce a JSON-serializable value. (So you will always
get strings for those.)
>>>>>>> e9d417de2592c0c7367c312ca0fd25dc8f8a9818
The conversion will not affect the passed in value.

View File

@ -42,47 +42,6 @@ def parse(data, allow_custom=False, interoperability=False, version=None):
return obj
def _detect_spec_version(stix_dict):
"""
Given a dict representing a STIX object, try to detect what spec version
it is likely to comply with.
:param stix_dict: A dict with some STIX content. Must at least have a
"type" property.
:return: A string in "vXX" format, where "XX" indicates the spec version,
e.g. "v20", "v21", etc.
"""
obj_type = stix_dict["type"]
if 'spec_version' in stix_dict:
# For STIX 2.0, applies to bundles only.
# For STIX 2.1+, applies to SCOs, SDOs, SROs, and markings only.
v = 'v' + stix_dict['spec_version'].replace('.', '')
elif "id" not in stix_dict:
# Only 2.0 SCOs don't have ID properties
v = "v20"
elif obj_type == 'bundle':
# Bundle without a spec_version property: must be 2.1. But to
# future-proof, use max version over all contained SCOs, with 2.1
# minimum.
v = max(
"v21",
max(
_detect_spec_version(obj) for obj in stix_dict["objects"]
),
)
elif obj_type in STIX2_OBJ_MAPS["v21"]["observables"]:
# Non-bundle object with an ID and without spec_version. Could be a
# 2.1 SCO or 2.0 SDO/SRO/marking. Check for 2.1 SCO...
v = "v21"
else:
# Not a 2.1 SCO; must be a 2.0 object.
v = "v20"
return v
def dict_to_stix2(stix_dict, allow_custom=False, interoperability=False, version=None):
"""convert dictionary to full python-stix2 object
@ -140,7 +99,7 @@ def dict_to_stix2(stix_dict, allow_custom=False, interoperability=False, version
return obj_class(allow_custom=allow_custom, interoperability=interoperability, **stix_dict)
def parse_observable(data, _valid_refs=None, allow_custom=False, version=None):
def parse_observable(data, _valid_refs=None, allow_custom=False, interoperability=False, version=None):
"""Deserialize a string or file-like object into a STIX Cyber Observable
object.
@ -185,4 +144,4 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None):
"use the CustomObservable decorator." % obj['type'],
)
return obj_class(allow_custom=allow_custom, **obj)
return obj_class(allow_custom=allow_custom, interoperability=interoperability, **obj)

View File

@ -21,13 +21,9 @@ from .utils import (
)
from .version import DEFAULT_VERSION
ID_REGEX_interoperability = re.compile(r"[0-9a-fA-F]{8}-"
"[0-9a-fA-F]{4}-"
"[0-9a-fA-F]{4}-"
"[0-9a-fA-F]{4}-"
"[0-9a-fA-F]{12}$")
ID_REGEX_interoperability = re.compile(
r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"
)
TYPE_REGEX = re.compile(r'^-?[a-z0-9]+(-[a-z0-9]+)*-?$')
TYPE_21_REGEX = re.compile(r'^([a-z][a-z0-9]*)+([a-z0-9-]+)*-?$')
ERROR_INVALID_ID = (
@ -234,7 +230,7 @@ class ListProperty(Property):
super(ListProperty, self).__init__(**kwargs)
def clean(self, value, allow_custom):
def clean(self, value, allow_custom, interoperability):
try:
iter(value)
except TypeError:
@ -247,7 +243,7 @@ class ListProperty(Property):
has_custom = False
if isinstance(self.contained, Property):
for item in value:
valid, temp_custom = self.contained.clean(item, allow_custom)
valid, temp_custom = self.contained.clean(item, allow_custom, interoperability)
result.append(valid)
has_custom = has_custom or temp_custom
@ -258,7 +254,7 @@ class ListProperty(Property):
elif isinstance(item, collections.abc.Mapping):
# attempt a mapping-like usage...
valid = self.contained(allow_custom=allow_custom, **item)
valid = self.contained(allow_custom=allow_custom, interoperability=interoperability, **item)
else:
raise ValueError(
@ -285,7 +281,7 @@ class StringProperty(Property):
def __init__(self, **kwargs):
super(StringProperty, self).__init__(**kwargs)
def clean(self, value, allow_custom=False):
def clean(self, value, allow_custom=False, interoperability=False):
if not isinstance(value, str):
value = str(value)
return value, False
@ -306,8 +302,7 @@ class IDProperty(Property):
self.spec_version = spec_version
super(IDProperty, self).__init__()
def clean(self, value, allow_custom=False):
interoperability = self.interoperability if hasattr(self, 'interoperability') and self.interoperability else False
def clean(self, value, allow_custom=False, interoperability=False):
_validate_id(value, self.spec_version, self.required_prefix, interoperability)
return value, False
@ -552,12 +547,11 @@ class ReferenceProperty(Property):
super(ReferenceProperty, self).__init__(**kwargs)
def clean(self, value, allow_custom):
def clean(self, value, allow_custom, interoperability):
if isinstance(value, _STIXBase):
value = value.id
value = str(value)
interoperability = self.interoperability if hasattr(self, 'interoperability') and self.interoperability else False
_validate_id(value, self.spec_version, None, interoperability)
obj_type = get_type_from_id(value)
@ -652,7 +646,7 @@ class EmbeddedObjectProperty(Property):
self.type = type
super(EmbeddedObjectProperty, self).__init__(**kwargs)
def clean(self, value, allow_custom):
def clean(self, value, allow_custom, interoperability):
if isinstance(value, dict):
value = self.type(allow_custom=allow_custom, **value)
elif not isinstance(value, self.type):
@ -701,9 +695,9 @@ class OpenVocabProperty(StringProperty):
allowed = [allowed]
self.allowed = allowed
def clean(self, value, allow_custom):
def clean(self, value, allow_custom, interoperability):
cleaned_value, _ = super(OpenVocabProperty, self).clean(
value, allow_custom,
value, allow_custom, interoperability
)
# Disabled: it was decided that enforcing this is too strict (might
@ -829,9 +823,8 @@ class ExtensionsProperty(DictionaryProperty):
# extensions should be pre-registered with the library).
if key.startswith('extension-definition--'):
interoperability = self.interoperability if hasattr(self, 'interoperability') else False
_validate_id(
key, self.spec_version, 'extension-definition--', interoperability
key, self.spec_version, 'extension-definition--',
)
elif allow_custom:
has_custom = True
@ -845,12 +838,11 @@ class ExtensionsProperty(DictionaryProperty):
class STIXObjectProperty(Property):
def __init__(self, spec_version=DEFAULT_VERSION, interoperability=False, *args, **kwargs):
def __init__(self, spec_version=DEFAULT_VERSION, *args, **kwargs):
self.spec_version = spec_version
self.interoperability = interoperability
super(STIXObjectProperty, self).__init__(*args, **kwargs)
def clean(self, value, allow_custom):
def clean(self, value, allow_custom, interoperability):
# Any STIX Object (SDO, SRO, or Marking Definition) can be added to
# a bundle with no further checks.
stix2_classes = {'_DomainObject', '_RelationshipObject', 'MarkingDefinition'}
@ -890,7 +882,7 @@ class STIXObjectProperty(Property):
"containing objects of a different spec version.",
)
parsed_obj = parse(dictified, allow_custom=allow_custom, interoperability=self.interoperability)
parsed_obj = parse(dictified, allow_custom=allow_custom, interoperability=interoperability)
if isinstance(parsed_obj, _STIXBase):
has_custom = parsed_obj.has_custom