diff --git a/stix2/base.py b/stix2/base.py index 270354b..0289907 100644 --- a/stix2/base.py +++ b/stix2/base.py @@ -36,7 +36,7 @@ def get_required_properties(properties): class _STIXBase(collections.abc.Mapping): """Base class for STIX object types""" - def _check_property(self, prop_name, prop, kwargs, allow_custom): + def _check_property(self, prop_name, prop, kwargs, allow_custom, interoperability): if prop_name not in kwargs: if hasattr(prop, 'default'): value = prop.default() @@ -46,10 +46,11 @@ class _STIXBase(collections.abc.Mapping): has_custom = False if prop_name in kwargs: + arguments = [kwargs[prop_name], allow_custom] + if isinstance(prop, self.__INTEROPERABILITY_types): + arguments.append(interoperability) try: - kwargs[prop_name], has_custom = prop.clean( - kwargs[prop_name], allow_custom, - ) + kwargs[prop_name], has_custom = prop.clean(*arguments) except InvalidValueError: # No point in wrapping InvalidValueError in another # InvalidValueError... so let those propagate. @@ -114,11 +115,16 @@ class _STIXBase(collections.abc.Mapping): def __init__(self, allow_custom=False, interoperability=False, **kwargs): cls = self.__class__ - self.__interoperability = interoperability # Use the same timestamp for any auto-generated datetimes self.__now = get_timestamp() + self.__INTEROPERABILITY_types = ( + stix2.properties.EmbeddedObjectProperty, stix2.properties.IDProperty, + stix2.properties.ListProperty, stix2.properties.OpenVocabProperty, + stix2.properties.ReferenceProperty + ) + custom_props = kwargs.pop('custom_properties', {}) if custom_props and not isinstance(custom_props, dict): raise ValueError("'custom_properties' must be a dictionary") @@ -202,7 +208,7 @@ class _STIXBase(collections.abc.Mapping): prop = defined_properties.get(prop_name) if prop: temp_custom = self._check_property( - prop_name, prop, setting_kwargs, allow_custom, + prop_name, prop, setting_kwargs, allow_custom, interoperability ) has_custom = has_custom or temp_custom @@ -292,8 +298,7 @@ class _STIXBase(collections.abc.Mapping): if isinstance(self, _Observable): # Assume: valid references in the original object are still valid in the new version new_inner['_valid_refs'] = {'*': '*'} - new_inner['interoperability'] = self.__interoperability - return cls(allow_custom=True, **new_inner) + return cls(allow_custom=True, interoperability=False, **new_inner) def properties_populated(self): return list(self._inner.keys()) @@ -368,26 +373,11 @@ class _STIXBase(collections.abc.Mapping): class _DomainObject(_STIXBase, _MarkingsMixin): - def __init__(self, *args, **kwargs): - interoperability = kwargs.get('interoperability', False) - self.__interoperability = interoperability - self._properties['id'].interoperability = interoperability - self._properties['created_by_ref'].interoperability = interoperability - if kwargs.get('object_marking_refs'): - self._properties['object_marking_refs'].contained.interoperability = interoperability - super(_DomainObject, self).__init__(*args, **kwargs) + pass class _RelationshipObject(_STIXBase, _MarkingsMixin): - def __init__(self, *args, **kwargs): - interoperability = kwargs.get('interoperability', False) - self.__interoperability = interoperability - self._properties['id'].interoperability = interoperability - if kwargs.get('created_by_ref'): - self._properties['created_by_ref'].interoperability = interoperability - if kwargs.get('object_marking_refs'): - self._properties['object_marking_refs'].contained.interoperability = interoperability - super(_RelationshipObject, self).__init__(*args, **kwargs) + pass class _Observable(_STIXBase): @@ -426,8 +416,8 @@ class _Observable(_STIXBase): if ref_type not in allowed_types: raise InvalidObjRefError(self.__class__, prop_name, "object reference '%s' is of an invalid type '%s'" % (ref, ref_type)) - def _check_property(self, prop_name, prop, kwargs, allow_custom): - has_custom = super(_Observable, self)._check_property(prop_name, prop, kwargs, allow_custom) + def _check_property(self, prop_name, prop, kwargs, allow_custom, interoperability): + has_custom = super(_Observable, self)._check_property(prop_name, prop, kwargs, allow_custom, interoperability) if prop_name in kwargs: from .properties import ObjectReferenceProperty @@ -516,7 +506,6 @@ def _make_json_serializable(value): etc. "Convenience" types this library uses as property values are JSON-serialized to produce a JSON-serializable value. (So you will always get strings for those.) ->>>>>>> e9d417de2592c0c7367c312ca0fd25dc8f8a9818 The conversion will not affect the passed in value. diff --git a/stix2/parsing.py b/stix2/parsing.py index 9737542..382fa5c 100644 --- a/stix2/parsing.py +++ b/stix2/parsing.py @@ -42,47 +42,6 @@ def parse(data, allow_custom=False, interoperability=False, version=None): return obj -def _detect_spec_version(stix_dict): - """ - Given a dict representing a STIX object, try to detect what spec version - it is likely to comply with. - - :param stix_dict: A dict with some STIX content. Must at least have a - "type" property. - :return: A string in "vXX" format, where "XX" indicates the spec version, - e.g. "v20", "v21", etc. - """ - - obj_type = stix_dict["type"] - - if 'spec_version' in stix_dict: - # For STIX 2.0, applies to bundles only. - # For STIX 2.1+, applies to SCOs, SDOs, SROs, and markings only. - v = 'v' + stix_dict['spec_version'].replace('.', '') - elif "id" not in stix_dict: - # Only 2.0 SCOs don't have ID properties - v = "v20" - elif obj_type == 'bundle': - # Bundle without a spec_version property: must be 2.1. But to - # future-proof, use max version over all contained SCOs, with 2.1 - # minimum. - v = max( - "v21", - max( - _detect_spec_version(obj) for obj in stix_dict["objects"] - ), - ) - elif obj_type in STIX2_OBJ_MAPS["v21"]["observables"]: - # Non-bundle object with an ID and without spec_version. Could be a - # 2.1 SCO or 2.0 SDO/SRO/marking. Check for 2.1 SCO... - v = "v21" - else: - # Not a 2.1 SCO; must be a 2.0 object. - v = "v20" - - return v - - def dict_to_stix2(stix_dict, allow_custom=False, interoperability=False, version=None): """convert dictionary to full python-stix2 object @@ -140,7 +99,7 @@ def dict_to_stix2(stix_dict, allow_custom=False, interoperability=False, version return obj_class(allow_custom=allow_custom, interoperability=interoperability, **stix_dict) -def parse_observable(data, _valid_refs=None, allow_custom=False, version=None): +def parse_observable(data, _valid_refs=None, allow_custom=False, interoperability=False, version=None): """Deserialize a string or file-like object into a STIX Cyber Observable object. @@ -185,4 +144,4 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None): "use the CustomObservable decorator." % obj['type'], ) - return obj_class(allow_custom=allow_custom, **obj) + return obj_class(allow_custom=allow_custom, interoperability=interoperability, **obj) diff --git a/stix2/properties.py b/stix2/properties.py index e7a77ef..119cf87 100644 --- a/stix2/properties.py +++ b/stix2/properties.py @@ -21,13 +21,9 @@ from .utils import ( ) from .version import DEFAULT_VERSION -ID_REGEX_interoperability = re.compile(r"[0-9a-fA-F]{8}-" - "[0-9a-fA-F]{4}-" - "[0-9a-fA-F]{4}-" - "[0-9a-fA-F]{4}-" - "[0-9a-fA-F]{12}$") - - +ID_REGEX_interoperability = re.compile( + r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$" +) TYPE_REGEX = re.compile(r'^-?[a-z0-9]+(-[a-z0-9]+)*-?$') TYPE_21_REGEX = re.compile(r'^([a-z][a-z0-9]*)+([a-z0-9-]+)*-?$') ERROR_INVALID_ID = ( @@ -234,7 +230,7 @@ class ListProperty(Property): super(ListProperty, self).__init__(**kwargs) - def clean(self, value, allow_custom): + def clean(self, value, allow_custom, interoperability): try: iter(value) except TypeError: @@ -247,7 +243,7 @@ class ListProperty(Property): has_custom = False if isinstance(self.contained, Property): for item in value: - valid, temp_custom = self.contained.clean(item, allow_custom) + valid, temp_custom = self.contained.clean(item, allow_custom, interoperability) result.append(valid) has_custom = has_custom or temp_custom @@ -258,7 +254,7 @@ class ListProperty(Property): elif isinstance(item, collections.abc.Mapping): # attempt a mapping-like usage... - valid = self.contained(allow_custom=allow_custom, **item) + valid = self.contained(allow_custom=allow_custom, interoperability=interoperability, **item) else: raise ValueError( @@ -285,7 +281,7 @@ class StringProperty(Property): def __init__(self, **kwargs): super(StringProperty, self).__init__(**kwargs) - def clean(self, value, allow_custom=False): + def clean(self, value, allow_custom=False, interoperability=False): if not isinstance(value, str): value = str(value) return value, False @@ -306,8 +302,7 @@ class IDProperty(Property): self.spec_version = spec_version super(IDProperty, self).__init__() - def clean(self, value, allow_custom=False): - interoperability = self.interoperability if hasattr(self, 'interoperability') and self.interoperability else False + def clean(self, value, allow_custom=False, interoperability=False): _validate_id(value, self.spec_version, self.required_prefix, interoperability) return value, False @@ -552,12 +547,11 @@ class ReferenceProperty(Property): super(ReferenceProperty, self).__init__(**kwargs) - def clean(self, value, allow_custom): + def clean(self, value, allow_custom, interoperability): if isinstance(value, _STIXBase): value = value.id value = str(value) - interoperability = self.interoperability if hasattr(self, 'interoperability') and self.interoperability else False _validate_id(value, self.spec_version, None, interoperability) obj_type = get_type_from_id(value) @@ -652,7 +646,7 @@ class EmbeddedObjectProperty(Property): self.type = type super(EmbeddedObjectProperty, self).__init__(**kwargs) - def clean(self, value, allow_custom): + def clean(self, value, allow_custom, interoperability): if isinstance(value, dict): value = self.type(allow_custom=allow_custom, **value) elif not isinstance(value, self.type): @@ -701,9 +695,9 @@ class OpenVocabProperty(StringProperty): allowed = [allowed] self.allowed = allowed - def clean(self, value, allow_custom): + def clean(self, value, allow_custom, interoperability): cleaned_value, _ = super(OpenVocabProperty, self).clean( - value, allow_custom, + value, allow_custom, interoperability ) # Disabled: it was decided that enforcing this is too strict (might @@ -829,9 +823,8 @@ class ExtensionsProperty(DictionaryProperty): # extensions should be pre-registered with the library). if key.startswith('extension-definition--'): - interoperability = self.interoperability if hasattr(self, 'interoperability') else False _validate_id( - key, self.spec_version, 'extension-definition--', interoperability + key, self.spec_version, 'extension-definition--', ) elif allow_custom: has_custom = True @@ -845,12 +838,11 @@ class ExtensionsProperty(DictionaryProperty): class STIXObjectProperty(Property): - def __init__(self, spec_version=DEFAULT_VERSION, interoperability=False, *args, **kwargs): + def __init__(self, spec_version=DEFAULT_VERSION, *args, **kwargs): self.spec_version = spec_version - self.interoperability = interoperability super(STIXObjectProperty, self).__init__(*args, **kwargs) - def clean(self, value, allow_custom): + def clean(self, value, allow_custom, interoperability): # Any STIX Object (SDO, SRO, or Marking Definition) can be added to # a bundle with no further checks. stix2_classes = {'_DomainObject', '_RelationshipObject', 'MarkingDefinition'} @@ -890,7 +882,7 @@ class STIXObjectProperty(Property): "containing objects of a different spec version.", ) - parsed_obj = parse(dictified, allow_custom=allow_custom, interoperability=self.interoperability) + parsed_obj = parse(dictified, allow_custom=allow_custom, interoperability=interoperability) if isinstance(parsed_obj, _STIXBase): has_custom = parsed_obj.has_custom