Compare commits
3 Commits
d229bd3cd1
...
625576622f
Author | SHA1 | Date |
---|---|---|
Christian Studer | 625576622f | |
Christian Studer | 276410537d | |
Christian Studer | 9a966f45c3 |
|
@ -36,7 +36,7 @@ def get_required_properties(properties):
|
||||||
class _STIXBase(collections.abc.Mapping):
|
class _STIXBase(collections.abc.Mapping):
|
||||||
"""Base class for STIX object types"""
|
"""Base class for STIX object types"""
|
||||||
|
|
||||||
def _check_property(self, prop_name, prop, kwargs, allow_custom):
|
def _check_property(self, prop_name, prop, kwargs, allow_custom, interoperability):
|
||||||
if prop_name not in kwargs:
|
if prop_name not in kwargs:
|
||||||
if hasattr(prop, 'default'):
|
if hasattr(prop, 'default'):
|
||||||
value = prop.default()
|
value = prop.default()
|
||||||
|
@ -46,10 +46,11 @@ class _STIXBase(collections.abc.Mapping):
|
||||||
|
|
||||||
has_custom = False
|
has_custom = False
|
||||||
if prop_name in kwargs:
|
if prop_name in kwargs:
|
||||||
|
arguments = [kwargs[prop_name], allow_custom]
|
||||||
|
if isinstance(prop, self.__INTEROPERABILITY_types):
|
||||||
|
arguments.append(interoperability)
|
||||||
try:
|
try:
|
||||||
kwargs[prop_name], has_custom = prop.clean(
|
kwargs[prop_name], has_custom = prop.clean(*arguments)
|
||||||
kwargs[prop_name], allow_custom,
|
|
||||||
)
|
|
||||||
except InvalidValueError:
|
except InvalidValueError:
|
||||||
# No point in wrapping InvalidValueError in another
|
# No point in wrapping InvalidValueError in another
|
||||||
# InvalidValueError... so let those propagate.
|
# InvalidValueError... so let those propagate.
|
||||||
|
@ -114,11 +115,18 @@ class _STIXBase(collections.abc.Mapping):
|
||||||
|
|
||||||
def __init__(self, allow_custom=False, interoperability=False, **kwargs):
|
def __init__(self, allow_custom=False, interoperability=False, **kwargs):
|
||||||
cls = self.__class__
|
cls = self.__class__
|
||||||
self.__interoperability = interoperability
|
|
||||||
|
|
||||||
# Use the same timestamp for any auto-generated datetimes
|
# Use the same timestamp for any auto-generated datetimes
|
||||||
self.__now = get_timestamp()
|
self.__now = get_timestamp()
|
||||||
|
|
||||||
|
self.__INTEROPERABILITY_types = (
|
||||||
|
stix2.properties.EmbeddedObjectProperty, stix2.properties.EnumProperty,
|
||||||
|
stix2.properties.ExtensionsProperty, stix2.properties.DictionaryProperty,
|
||||||
|
stix2.properties.HashesProperty, stix2.properties.IDProperty,
|
||||||
|
stix2.properties.ListProperty, stix2.properties.OpenVocabProperty,
|
||||||
|
stix2.properties.ReferenceProperty, stix2.properties.SelectorProperty,
|
||||||
|
)
|
||||||
|
|
||||||
custom_props = kwargs.pop('custom_properties', {})
|
custom_props = kwargs.pop('custom_properties', {})
|
||||||
if custom_props and not isinstance(custom_props, dict):
|
if custom_props and not isinstance(custom_props, dict):
|
||||||
raise ValueError("'custom_properties' must be a dictionary")
|
raise ValueError("'custom_properties' must be a dictionary")
|
||||||
|
@ -202,7 +210,7 @@ class _STIXBase(collections.abc.Mapping):
|
||||||
prop = defined_properties.get(prop_name)
|
prop = defined_properties.get(prop_name)
|
||||||
if prop:
|
if prop:
|
||||||
temp_custom = self._check_property(
|
temp_custom = self._check_property(
|
||||||
prop_name, prop, setting_kwargs, allow_custom,
|
prop_name, prop, setting_kwargs, allow_custom, interoperability,
|
||||||
)
|
)
|
||||||
|
|
||||||
has_custom = has_custom or temp_custom
|
has_custom = has_custom or temp_custom
|
||||||
|
@ -292,8 +300,7 @@ class _STIXBase(collections.abc.Mapping):
|
||||||
if isinstance(self, _Observable):
|
if isinstance(self, _Observable):
|
||||||
# Assume: valid references in the original object are still valid in the new version
|
# Assume: valid references in the original object are still valid in the new version
|
||||||
new_inner['_valid_refs'] = {'*': '*'}
|
new_inner['_valid_refs'] = {'*': '*'}
|
||||||
new_inner['interoperability'] = self.__interoperability
|
return cls(allow_custom=True, interoperability=False, **new_inner)
|
||||||
return cls(allow_custom=True, **new_inner)
|
|
||||||
|
|
||||||
def properties_populated(self):
|
def properties_populated(self):
|
||||||
return list(self._inner.keys())
|
return list(self._inner.keys())
|
||||||
|
@ -368,26 +375,11 @@ class _STIXBase(collections.abc.Mapping):
|
||||||
|
|
||||||
|
|
||||||
class _DomainObject(_STIXBase, _MarkingsMixin):
|
class _DomainObject(_STIXBase, _MarkingsMixin):
|
||||||
def __init__(self, *args, **kwargs):
|
pass
|
||||||
interoperability = kwargs.get('interoperability', False)
|
|
||||||
self.__interoperability = interoperability
|
|
||||||
self._properties['id'].interoperability = interoperability
|
|
||||||
self._properties['created_by_ref'].interoperability = interoperability
|
|
||||||
if kwargs.get('object_marking_refs'):
|
|
||||||
self._properties['object_marking_refs'].contained.interoperability = interoperability
|
|
||||||
super(_DomainObject, self).__init__(*args, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
class _RelationshipObject(_STIXBase, _MarkingsMixin):
|
class _RelationshipObject(_STIXBase, _MarkingsMixin):
|
||||||
def __init__(self, *args, **kwargs):
|
pass
|
||||||
interoperability = kwargs.get('interoperability', False)
|
|
||||||
self.__interoperability = interoperability
|
|
||||||
self._properties['id'].interoperability = interoperability
|
|
||||||
if kwargs.get('created_by_ref'):
|
|
||||||
self._properties['created_by_ref'].interoperability = interoperability
|
|
||||||
if kwargs.get('object_marking_refs'):
|
|
||||||
self._properties['object_marking_refs'].contained.interoperability = interoperability
|
|
||||||
super(_RelationshipObject, self).__init__(*args, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
class _Observable(_STIXBase):
|
class _Observable(_STIXBase):
|
||||||
|
@ -426,8 +418,8 @@ class _Observable(_STIXBase):
|
||||||
if ref_type not in allowed_types:
|
if ref_type not in allowed_types:
|
||||||
raise InvalidObjRefError(self.__class__, prop_name, "object reference '%s' is of an invalid type '%s'" % (ref, ref_type))
|
raise InvalidObjRefError(self.__class__, prop_name, "object reference '%s' is of an invalid type '%s'" % (ref, ref_type))
|
||||||
|
|
||||||
def _check_property(self, prop_name, prop, kwargs, allow_custom):
|
def _check_property(self, prop_name, prop, kwargs, allow_custom, interoperability):
|
||||||
has_custom = super(_Observable, self)._check_property(prop_name, prop, kwargs, allow_custom)
|
has_custom = super(_Observable, self)._check_property(prop_name, prop, kwargs, allow_custom, interoperability)
|
||||||
|
|
||||||
if prop_name in kwargs:
|
if prop_name in kwargs:
|
||||||
from .properties import ObjectReferenceProperty
|
from .properties import ObjectReferenceProperty
|
||||||
|
@ -516,7 +508,6 @@ def _make_json_serializable(value):
|
||||||
etc. "Convenience" types this library uses as property values are
|
etc. "Convenience" types this library uses as property values are
|
||||||
JSON-serialized to produce a JSON-serializable value. (So you will always
|
JSON-serialized to produce a JSON-serializable value. (So you will always
|
||||||
get strings for those.)
|
get strings for those.)
|
||||||
>>>>>>> e9d417de2592c0c7367c312ca0fd25dc8f8a9818
|
|
||||||
|
|
||||||
The conversion will not affect the passed in value.
|
The conversion will not affect the passed in value.
|
||||||
|
|
||||||
|
|
|
@ -42,47 +42,6 @@ def parse(data, allow_custom=False, interoperability=False, version=None):
|
||||||
return obj
|
return obj
|
||||||
|
|
||||||
|
|
||||||
def _detect_spec_version(stix_dict):
|
|
||||||
"""
|
|
||||||
Given a dict representing a STIX object, try to detect what spec version
|
|
||||||
it is likely to comply with.
|
|
||||||
|
|
||||||
:param stix_dict: A dict with some STIX content. Must at least have a
|
|
||||||
"type" property.
|
|
||||||
:return: A string in "vXX" format, where "XX" indicates the spec version,
|
|
||||||
e.g. "v20", "v21", etc.
|
|
||||||
"""
|
|
||||||
|
|
||||||
obj_type = stix_dict["type"]
|
|
||||||
|
|
||||||
if 'spec_version' in stix_dict:
|
|
||||||
# For STIX 2.0, applies to bundles only.
|
|
||||||
# For STIX 2.1+, applies to SCOs, SDOs, SROs, and markings only.
|
|
||||||
v = 'v' + stix_dict['spec_version'].replace('.', '')
|
|
||||||
elif "id" not in stix_dict:
|
|
||||||
# Only 2.0 SCOs don't have ID properties
|
|
||||||
v = "v20"
|
|
||||||
elif obj_type == 'bundle':
|
|
||||||
# Bundle without a spec_version property: must be 2.1. But to
|
|
||||||
# future-proof, use max version over all contained SCOs, with 2.1
|
|
||||||
# minimum.
|
|
||||||
v = max(
|
|
||||||
"v21",
|
|
||||||
max(
|
|
||||||
_detect_spec_version(obj) for obj in stix_dict["objects"]
|
|
||||||
),
|
|
||||||
)
|
|
||||||
elif obj_type in STIX2_OBJ_MAPS["v21"]["observables"]:
|
|
||||||
# Non-bundle object with an ID and without spec_version. Could be a
|
|
||||||
# 2.1 SCO or 2.0 SDO/SRO/marking. Check for 2.1 SCO...
|
|
||||||
v = "v21"
|
|
||||||
else:
|
|
||||||
# Not a 2.1 SCO; must be a 2.0 object.
|
|
||||||
v = "v20"
|
|
||||||
|
|
||||||
return v
|
|
||||||
|
|
||||||
|
|
||||||
def dict_to_stix2(stix_dict, allow_custom=False, interoperability=False, version=None):
|
def dict_to_stix2(stix_dict, allow_custom=False, interoperability=False, version=None):
|
||||||
"""convert dictionary to full python-stix2 object
|
"""convert dictionary to full python-stix2 object
|
||||||
|
|
||||||
|
@ -140,7 +99,7 @@ def dict_to_stix2(stix_dict, allow_custom=False, interoperability=False, version
|
||||||
return obj_class(allow_custom=allow_custom, interoperability=interoperability, **stix_dict)
|
return obj_class(allow_custom=allow_custom, interoperability=interoperability, **stix_dict)
|
||||||
|
|
||||||
|
|
||||||
def parse_observable(data, _valid_refs=None, allow_custom=False, version=None):
|
def parse_observable(data, _valid_refs=None, allow_custom=False, interoperability=False, version=None):
|
||||||
"""Deserialize a string or file-like object into a STIX Cyber Observable
|
"""Deserialize a string or file-like object into a STIX Cyber Observable
|
||||||
object.
|
object.
|
||||||
|
|
||||||
|
@ -185,4 +144,4 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None):
|
||||||
"use the CustomObservable decorator." % obj['type'],
|
"use the CustomObservable decorator." % obj['type'],
|
||||||
)
|
)
|
||||||
|
|
||||||
return obj_class(allow_custom=allow_custom, **obj)
|
return obj_class(allow_custom=allow_custom, interoperability=interoperability, **obj)
|
||||||
|
|
|
@ -21,13 +21,9 @@ from .utils import (
|
||||||
)
|
)
|
||||||
from .version import DEFAULT_VERSION
|
from .version import DEFAULT_VERSION
|
||||||
|
|
||||||
ID_REGEX_interoperability = re.compile(r"[0-9a-fA-F]{8}-"
|
ID_REGEX_interoperability = re.compile(
|
||||||
"[0-9a-fA-F]{4}-"
|
r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$",
|
||||||
"[0-9a-fA-F]{4}-"
|
)
|
||||||
"[0-9a-fA-F]{4}-"
|
|
||||||
"[0-9a-fA-F]{12}$")
|
|
||||||
|
|
||||||
|
|
||||||
TYPE_REGEX = re.compile(r'^-?[a-z0-9]+(-[a-z0-9]+)*-?$')
|
TYPE_REGEX = re.compile(r'^-?[a-z0-9]+(-[a-z0-9]+)*-?$')
|
||||||
TYPE_21_REGEX = re.compile(r'^([a-z][a-z0-9]*)+([a-z0-9-]+)*-?$')
|
TYPE_21_REGEX = re.compile(r'^([a-z][a-z0-9]*)+([a-z0-9-]+)*-?$')
|
||||||
ERROR_INVALID_ID = (
|
ERROR_INVALID_ID = (
|
||||||
|
@ -58,7 +54,7 @@ def _check_uuid(uuid_str, spec_version, interoperability):
|
||||||
return ok
|
return ok
|
||||||
|
|
||||||
|
|
||||||
def _validate_id(id_, spec_version, required_prefix, interoperability):
|
def _validate_id(id_, spec_version, required_prefix, interoperability=False):
|
||||||
"""
|
"""
|
||||||
Check the STIX identifier for correctness, raise an exception if there are
|
Check the STIX identifier for correctness, raise an exception if there are
|
||||||
errors.
|
errors.
|
||||||
|
@ -180,7 +176,7 @@ class Property(object):
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def _default_clean(self, value, allow_custom=False):
|
def _default_clean(self, value, allow_custom=False, interoperability=False):
|
||||||
if value != self._fixed_value:
|
if value != self._fixed_value:
|
||||||
raise ValueError("must equal '{}'.".format(self._fixed_value))
|
raise ValueError("must equal '{}'.".format(self._fixed_value))
|
||||||
return value, False
|
return value, False
|
||||||
|
@ -234,7 +230,7 @@ class ListProperty(Property):
|
||||||
|
|
||||||
super(ListProperty, self).__init__(**kwargs)
|
super(ListProperty, self).__init__(**kwargs)
|
||||||
|
|
||||||
def clean(self, value, allow_custom):
|
def clean(self, value, allow_custom, interoperability=False):
|
||||||
try:
|
try:
|
||||||
iter(value)
|
iter(value)
|
||||||
except TypeError:
|
except TypeError:
|
||||||
|
@ -247,7 +243,7 @@ class ListProperty(Property):
|
||||||
has_custom = False
|
has_custom = False
|
||||||
if isinstance(self.contained, Property):
|
if isinstance(self.contained, Property):
|
||||||
for item in value:
|
for item in value:
|
||||||
valid, temp_custom = self.contained.clean(item, allow_custom)
|
valid, temp_custom = self.contained.clean(item, allow_custom, interoperability)
|
||||||
result.append(valid)
|
result.append(valid)
|
||||||
has_custom = has_custom or temp_custom
|
has_custom = has_custom or temp_custom
|
||||||
|
|
||||||
|
@ -258,7 +254,7 @@ class ListProperty(Property):
|
||||||
|
|
||||||
elif isinstance(item, collections.abc.Mapping):
|
elif isinstance(item, collections.abc.Mapping):
|
||||||
# attempt a mapping-like usage...
|
# attempt a mapping-like usage...
|
||||||
valid = self.contained(allow_custom=allow_custom, **item)
|
valid = self.contained(allow_custom=allow_custom, interoperability=interoperability, **item)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
|
@ -285,7 +281,7 @@ class StringProperty(Property):
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
super(StringProperty, self).__init__(**kwargs)
|
super(StringProperty, self).__init__(**kwargs)
|
||||||
|
|
||||||
def clean(self, value, allow_custom=False):
|
def clean(self, value, allow_custom=False, interoperability=False):
|
||||||
if not isinstance(value, str):
|
if not isinstance(value, str):
|
||||||
value = str(value)
|
value = str(value)
|
||||||
return value, False
|
return value, False
|
||||||
|
@ -306,8 +302,7 @@ class IDProperty(Property):
|
||||||
self.spec_version = spec_version
|
self.spec_version = spec_version
|
||||||
super(IDProperty, self).__init__()
|
super(IDProperty, self).__init__()
|
||||||
|
|
||||||
def clean(self, value, allow_custom=False):
|
def clean(self, value, allow_custom=False, interoperability=False):
|
||||||
interoperability = self.interoperability if hasattr(self, 'interoperability') and self.interoperability else False
|
|
||||||
_validate_id(value, self.spec_version, self.required_prefix, interoperability)
|
_validate_id(value, self.spec_version, self.required_prefix, interoperability)
|
||||||
return value, False
|
return value, False
|
||||||
|
|
||||||
|
@ -322,7 +317,7 @@ class IntegerProperty(Property):
|
||||||
self.max = max
|
self.max = max
|
||||||
super(IntegerProperty, self).__init__(**kwargs)
|
super(IntegerProperty, self).__init__(**kwargs)
|
||||||
|
|
||||||
def clean(self, value, allow_custom=False):
|
def clean(self, value, allow_custom=False, interoperability=False):
|
||||||
try:
|
try:
|
||||||
value = int(value)
|
value = int(value)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
@ -402,7 +397,7 @@ class DictionaryProperty(Property):
|
||||||
self.spec_version = spec_version
|
self.spec_version = spec_version
|
||||||
super(DictionaryProperty, self).__init__(**kwargs)
|
super(DictionaryProperty, self).__init__(**kwargs)
|
||||||
|
|
||||||
def clean(self, value, allow_custom=False):
|
def clean(self, value, allow_custom=False, interoperability=False):
|
||||||
try:
|
try:
|
||||||
dictified = _get_dict(value)
|
dictified = _get_dict(value)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
|
@ -445,7 +440,7 @@ class HashesProperty(DictionaryProperty):
|
||||||
if alg:
|
if alg:
|
||||||
self.__alg_to_spec_name[alg] = spec_hash_name
|
self.__alg_to_spec_name[alg] = spec_hash_name
|
||||||
|
|
||||||
def clean(self, value, allow_custom):
|
def clean(self, value, allow_custom, interoperability=False):
|
||||||
# ignore the has_custom return value here; there is no customization
|
# ignore the has_custom return value here; there is no customization
|
||||||
# of DictionaryProperties.
|
# of DictionaryProperties.
|
||||||
clean_dict, _ = super().clean(value, allow_custom)
|
clean_dict, _ = super().clean(value, allow_custom)
|
||||||
|
@ -552,12 +547,11 @@ class ReferenceProperty(Property):
|
||||||
|
|
||||||
super(ReferenceProperty, self).__init__(**kwargs)
|
super(ReferenceProperty, self).__init__(**kwargs)
|
||||||
|
|
||||||
def clean(self, value, allow_custom):
|
def clean(self, value, allow_custom, interoperability=False):
|
||||||
if isinstance(value, _STIXBase):
|
if isinstance(value, _STIXBase):
|
||||||
value = value.id
|
value = value.id
|
||||||
value = str(value)
|
value = str(value)
|
||||||
|
|
||||||
interoperability = self.interoperability if hasattr(self, 'interoperability') and self.interoperability else False
|
|
||||||
_validate_id(value, self.spec_version, None, interoperability)
|
_validate_id(value, self.spec_version, None, interoperability)
|
||||||
|
|
||||||
obj_type = get_type_from_id(value)
|
obj_type = get_type_from_id(value)
|
||||||
|
@ -631,7 +625,7 @@ SELECTOR_REGEX = re.compile(r"^([a-z0-9_-]{3,250}(\.(\[\d+\]|[a-z0-9_-]{1,250}))
|
||||||
|
|
||||||
class SelectorProperty(Property):
|
class SelectorProperty(Property):
|
||||||
|
|
||||||
def clean(self, value, allow_custom=False):
|
def clean(self, value, allow_custom=False, interoperability=False):
|
||||||
if not SELECTOR_REGEX.match(value):
|
if not SELECTOR_REGEX.match(value):
|
||||||
raise ValueError("must adhere to selector syntax.")
|
raise ValueError("must adhere to selector syntax.")
|
||||||
return value, False
|
return value, False
|
||||||
|
@ -652,7 +646,7 @@ class EmbeddedObjectProperty(Property):
|
||||||
self.type = type
|
self.type = type
|
||||||
super(EmbeddedObjectProperty, self).__init__(**kwargs)
|
super(EmbeddedObjectProperty, self).__init__(**kwargs)
|
||||||
|
|
||||||
def clean(self, value, allow_custom):
|
def clean(self, value, allow_custom, interoperability=False):
|
||||||
if isinstance(value, dict):
|
if isinstance(value, dict):
|
||||||
value = self.type(allow_custom=allow_custom, **value)
|
value = self.type(allow_custom=allow_custom, **value)
|
||||||
elif not isinstance(value, self.type):
|
elif not isinstance(value, self.type):
|
||||||
|
@ -680,8 +674,8 @@ class EnumProperty(StringProperty):
|
||||||
self.allowed = allowed
|
self.allowed = allowed
|
||||||
super(EnumProperty, self).__init__(**kwargs)
|
super(EnumProperty, self).__init__(**kwargs)
|
||||||
|
|
||||||
def clean(self, value, allow_custom):
|
def clean(self, value, allow_custom, interoperability=False):
|
||||||
cleaned_value, _ = super(EnumProperty, self).clean(value, allow_custom)
|
cleaned_value, _ = super(EnumProperty, self).clean(value, allow_custom, interoperability)
|
||||||
|
|
||||||
if cleaned_value not in self.allowed:
|
if cleaned_value not in self.allowed:
|
||||||
raise ValueError("value '{}' is not valid for this enumeration.".format(cleaned_value))
|
raise ValueError("value '{}' is not valid for this enumeration.".format(cleaned_value))
|
||||||
|
@ -701,9 +695,9 @@ class OpenVocabProperty(StringProperty):
|
||||||
allowed = [allowed]
|
allowed = [allowed]
|
||||||
self.allowed = allowed
|
self.allowed = allowed
|
||||||
|
|
||||||
def clean(self, value, allow_custom):
|
def clean(self, value, allow_custom, interoperability=False):
|
||||||
cleaned_value, _ = super(OpenVocabProperty, self).clean(
|
cleaned_value, _ = super(OpenVocabProperty, self).clean(
|
||||||
value, allow_custom,
|
value, allow_custom, interoperability,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Disabled: it was decided that enforcing this is too strict (might
|
# Disabled: it was decided that enforcing this is too strict (might
|
||||||
|
@ -782,7 +776,7 @@ class ExtensionsProperty(DictionaryProperty):
|
||||||
def __init__(self, spec_version=DEFAULT_VERSION, required=False):
|
def __init__(self, spec_version=DEFAULT_VERSION, required=False):
|
||||||
super(ExtensionsProperty, self).__init__(spec_version=spec_version, required=required)
|
super(ExtensionsProperty, self).__init__(spec_version=spec_version, required=required)
|
||||||
|
|
||||||
def clean(self, value, allow_custom):
|
def clean(self, value, allow_custom, interoperability=False):
|
||||||
try:
|
try:
|
||||||
dictified = _get_dict(value)
|
dictified = _get_dict(value)
|
||||||
# get deep copy since we are going modify the dict and might
|
# get deep copy since we are going modify the dict and might
|
||||||
|
@ -797,7 +791,7 @@ class ExtensionsProperty(DictionaryProperty):
|
||||||
cls = class_for_type(key, self.spec_version, "extensions")
|
cls = class_for_type(key, self.spec_version, "extensions")
|
||||||
if cls:
|
if cls:
|
||||||
if isinstance(subvalue, dict):
|
if isinstance(subvalue, dict):
|
||||||
ext = cls(allow_custom=allow_custom, **subvalue)
|
ext = cls(allow_custom=allow_custom, interoperability=interoperability, **subvalue)
|
||||||
elif isinstance(subvalue, cls):
|
elif isinstance(subvalue, cls):
|
||||||
# If already an instance of the registered class, assume
|
# If already an instance of the registered class, assume
|
||||||
# it's valid
|
# it's valid
|
||||||
|
@ -829,9 +823,8 @@ class ExtensionsProperty(DictionaryProperty):
|
||||||
# extensions should be pre-registered with the library).
|
# extensions should be pre-registered with the library).
|
||||||
|
|
||||||
if key.startswith('extension-definition--'):
|
if key.startswith('extension-definition--'):
|
||||||
interoperability = self.interoperability if hasattr(self, 'interoperability') else False
|
|
||||||
_validate_id(
|
_validate_id(
|
||||||
key, self.spec_version, 'extension-definition--', interoperability
|
key, self.spec_version, 'extension-definition--',
|
||||||
)
|
)
|
||||||
elif allow_custom:
|
elif allow_custom:
|
||||||
has_custom = True
|
has_custom = True
|
||||||
|
@ -845,12 +838,11 @@ class ExtensionsProperty(DictionaryProperty):
|
||||||
|
|
||||||
class STIXObjectProperty(Property):
|
class STIXObjectProperty(Property):
|
||||||
|
|
||||||
def __init__(self, spec_version=DEFAULT_VERSION, interoperability=False, *args, **kwargs):
|
def __init__(self, spec_version=DEFAULT_VERSION, *args, **kwargs):
|
||||||
self.spec_version = spec_version
|
self.spec_version = spec_version
|
||||||
self.interoperability = interoperability
|
|
||||||
super(STIXObjectProperty, self).__init__(*args, **kwargs)
|
super(STIXObjectProperty, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
def clean(self, value, allow_custom):
|
def clean(self, value, allow_custom, interoperability=False):
|
||||||
# Any STIX Object (SDO, SRO, or Marking Definition) can be added to
|
# Any STIX Object (SDO, SRO, or Marking Definition) can be added to
|
||||||
# a bundle with no further checks.
|
# a bundle with no further checks.
|
||||||
stix2_classes = {'_DomainObject', '_RelationshipObject', 'MarkingDefinition'}
|
stix2_classes = {'_DomainObject', '_RelationshipObject', 'MarkingDefinition'}
|
||||||
|
@ -890,7 +882,7 @@ class STIXObjectProperty(Property):
|
||||||
"containing objects of a different spec version.",
|
"containing objects of a different spec version.",
|
||||||
)
|
)
|
||||||
|
|
||||||
parsed_obj = parse(dictified, allow_custom=allow_custom, interoperability=self.interoperability)
|
parsed_obj = parse(dictified, allow_custom=allow_custom, interoperability=interoperability)
|
||||||
|
|
||||||
if isinstance(parsed_obj, _STIXBase):
|
if isinstance(parsed_obj, _STIXBase):
|
||||||
has_custom = parsed_obj.has_custom
|
has_custom = parsed_obj.has_custom
|
||||||
|
|
|
@ -35,14 +35,6 @@ class Bundle(_STIXBase20):
|
||||||
|
|
||||||
kwargs['objects'] = obj_list + kwargs.get('objects', [])
|
kwargs['objects'] = obj_list + kwargs.get('objects', [])
|
||||||
|
|
||||||
allow_custom = kwargs.get('allow_custom', False)
|
|
||||||
self._allow_custom = allow_custom
|
|
||||||
self._properties['objects'].contained.allow_custom = allow_custom
|
|
||||||
interoperability = kwargs.get('interoperability', False)
|
|
||||||
self.__interoperability = interoperability
|
|
||||||
self._properties['id'].interoperability = interoperability
|
|
||||||
self._properties['objects'].contained.interoperability = interoperability
|
|
||||||
|
|
||||||
super(Bundle, self).__init__(**kwargs)
|
super(Bundle, self).__init__(**kwargs)
|
||||||
|
|
||||||
def get_obj(self, obj_uuid):
|
def get_obj(self, obj_uuid):
|
||||||
|
|
|
@ -147,12 +147,6 @@ class MarkingDefinition(_STIXBase20, _MarkingsMixin):
|
||||||
if not isinstance(kwargs['definition'], marking_type):
|
if not isinstance(kwargs['definition'], marking_type):
|
||||||
defn = _get_dict(kwargs['definition'])
|
defn = _get_dict(kwargs['definition'])
|
||||||
kwargs['definition'] = marking_type(**defn)
|
kwargs['definition'] = marking_type(**defn)
|
||||||
interoperability = kwargs.get('interoperability', False)
|
|
||||||
self._properties['id'].interoperability = interoperability
|
|
||||||
if kwargs.get('created_by_ref'):
|
|
||||||
self._properties['created_by_ref'].interoperability = interoperability
|
|
||||||
if kwargs.get('object_marking_refs'):
|
|
||||||
self._properties['object_marking_refs'].contained.interoperability = interoperability
|
|
||||||
|
|
||||||
super(MarkingDefinition, self).__init__(**kwargs)
|
super(MarkingDefinition, self).__init__(**kwargs)
|
||||||
|
|
||||||
|
|
|
@ -248,12 +248,6 @@ class Report(_DomainObject):
|
||||||
('granular_markings', ListProperty(GranularMarking)),
|
('granular_markings', ListProperty(GranularMarking)),
|
||||||
])
|
])
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
self._properties['object_refs'].contained.allow_custom = kwargs.get('allow_custom', False)
|
|
||||||
self._properties['object_refs'].contained.interoperability = kwargs.get('interoperability', False)
|
|
||||||
|
|
||||||
super(Report, self).__init__(*args, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
class ThreatActor(_DomainObject):
|
class ThreatActor(_DomainObject):
|
||||||
"""For more detailed information on this object's properties, see
|
"""For more detailed information on this object's properties, see
|
||||||
|
|
|
@ -48,9 +48,6 @@ class Relationship(_RelationshipObject):
|
||||||
kwargs['relationship_type'] = relationship_type
|
kwargs['relationship_type'] = relationship_type
|
||||||
if target_ref and not kwargs.get('target_ref'):
|
if target_ref and not kwargs.get('target_ref'):
|
||||||
kwargs['target_ref'] = target_ref
|
kwargs['target_ref'] = target_ref
|
||||||
interoperability = kwargs.get('interoperability', False)
|
|
||||||
self._properties['source_ref'].interoperability = interoperability
|
|
||||||
self._properties['target_ref'].interoperability = interoperability
|
|
||||||
|
|
||||||
super(Relationship, self).__init__(**kwargs)
|
super(Relationship, self).__init__(**kwargs)
|
||||||
|
|
||||||
|
@ -86,11 +83,5 @@ class Sighting(_RelationshipObject):
|
||||||
# Allow sighting_of_ref as a positional arg.
|
# Allow sighting_of_ref as a positional arg.
|
||||||
if sighting_of_ref and not kwargs.get('sighting_of_ref'):
|
if sighting_of_ref and not kwargs.get('sighting_of_ref'):
|
||||||
kwargs['sighting_of_ref'] = sighting_of_ref
|
kwargs['sighting_of_ref'] = sighting_of_ref
|
||||||
interoperability = kwargs.get('interoperability', False)
|
|
||||||
self._properties['sighting_of_ref'].interoperability = interoperability
|
|
||||||
if kwargs.get('observed_data_refs'):
|
|
||||||
self._properties['observed_data_refs'].contained.interoperability = interoperability
|
|
||||||
if kwargs.get('where_sighted_refs'):
|
|
||||||
self._properties['where_sighted_refs'].contained.interoperability = interoperability
|
|
||||||
|
|
||||||
super(Sighting, self).__init__(**kwargs)
|
super(Sighting, self).__init__(**kwargs)
|
||||||
|
|
|
@ -12,9 +12,6 @@ class _STIXBase21(_STIXBase):
|
||||||
class _Observable(_Observable, _STIXBase21):
|
class _Observable(_Observable, _STIXBase21):
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
interoperability = kwargs.get('interoperability', False)
|
|
||||||
self.__interoperability = interoperability
|
|
||||||
self._properties['id'].interoperability = interoperability
|
|
||||||
super(_Observable, self).__init__(**kwargs)
|
super(_Observable, self).__init__(**kwargs)
|
||||||
if 'id' not in kwargs:
|
if 'id' not in kwargs:
|
||||||
# Specific to 2.1+ observables: generate a deterministic ID
|
# Specific to 2.1+ observables: generate a deterministic ID
|
||||||
|
|
|
@ -32,14 +32,6 @@ class Bundle(_STIXBase21):
|
||||||
|
|
||||||
kwargs['objects'] = obj_list + kwargs.get('objects', [])
|
kwargs['objects'] = obj_list + kwargs.get('objects', [])
|
||||||
|
|
||||||
allow_custom = kwargs.get('allow_custom', False)
|
|
||||||
self._allow_custom = allow_custom
|
|
||||||
self._properties['objects'].contained.allow_custom = allow_custom
|
|
||||||
interoperability = kwargs.get('interoperability', False)
|
|
||||||
self.__interoperability = interoperability
|
|
||||||
self._properties['id'].interoperability = interoperability
|
|
||||||
self._properties['objects'].contained.interoperability = interoperability
|
|
||||||
|
|
||||||
super(Bundle, self).__init__(**kwargs)
|
super(Bundle, self).__init__(**kwargs)
|
||||||
|
|
||||||
def get_obj(self, obj_uuid):
|
def get_obj(self, obj_uuid):
|
||||||
|
|
|
@ -222,12 +222,6 @@ class MarkingDefinition(_STIXBase21, _MarkingsMixin):
|
||||||
if not isinstance(kwargs['definition'], marking_type):
|
if not isinstance(kwargs['definition'], marking_type):
|
||||||
defn = _get_dict(kwargs['definition'])
|
defn = _get_dict(kwargs['definition'])
|
||||||
kwargs['definition'] = marking_type(**defn)
|
kwargs['definition'] = marking_type(**defn)
|
||||||
interoperability = kwargs.get('interoperability', False)
|
|
||||||
self._properties['id'].interoperability = interoperability
|
|
||||||
if kwargs.get('created_by_ref'):
|
|
||||||
self._properties['created_by_ref'].interoperability = interoperability
|
|
||||||
if kwargs.get('object_marking_refs'):
|
|
||||||
self._properties['object_marking_refs'].contained.interoperability = interoperability
|
|
||||||
|
|
||||||
super(MarkingDefinition, self).__init__(**kwargs)
|
super(MarkingDefinition, self).__init__(**kwargs)
|
||||||
|
|
||||||
|
|
|
@ -698,11 +698,6 @@ class Report(_DomainObject):
|
||||||
('extensions', ExtensionsProperty(spec_version='2.1')),
|
('extensions', ExtensionsProperty(spec_version='2.1')),
|
||||||
])
|
])
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
self._properties['object_refs'].contained.interoperability = kwargs.get('interoperability', False)
|
|
||||||
|
|
||||||
super(Report, self).__init__(*args, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
class ThreatActor(_DomainObject):
|
class ThreatActor(_DomainObject):
|
||||||
"""For more detailed information on this object's properties, see
|
"""For more detailed information on this object's properties, see
|
||||||
|
|
|
@ -55,9 +55,6 @@ class Relationship(_RelationshipObject):
|
||||||
kwargs['relationship_type'] = relationship_type
|
kwargs['relationship_type'] = relationship_type
|
||||||
if target_ref and not kwargs.get('target_ref'):
|
if target_ref and not kwargs.get('target_ref'):
|
||||||
kwargs['target_ref'] = target_ref
|
kwargs['target_ref'] = target_ref
|
||||||
interoperability = kwargs.get('interoperability', False)
|
|
||||||
self._properties['source_ref'].interoperability = interoperability
|
|
||||||
self._properties['target_ref'].interoperability = interoperability
|
|
||||||
|
|
||||||
super(Relationship, self).__init__(**kwargs)
|
super(Relationship, self).__init__(**kwargs)
|
||||||
|
|
||||||
|
@ -108,12 +105,6 @@ class Sighting(_RelationshipObject):
|
||||||
# Allow sighting_of_ref as a positional arg.
|
# Allow sighting_of_ref as a positional arg.
|
||||||
if sighting_of_ref and not kwargs.get('sighting_of_ref'):
|
if sighting_of_ref and not kwargs.get('sighting_of_ref'):
|
||||||
kwargs['sighting_of_ref'] = sighting_of_ref
|
kwargs['sighting_of_ref'] = sighting_of_ref
|
||||||
interoperability = kwargs.get('interoperability', False)
|
|
||||||
self._properties['sighting_of_ref'].interoperability = interoperability
|
|
||||||
if kwargs.get('observed_data_refs'):
|
|
||||||
self._properties['observed_data_refs'].contained.interoperability = interoperability
|
|
||||||
if kwargs.get('where_sighted_refs'):
|
|
||||||
self._properties['where_sighted_refs'].contained.interoperability = interoperability
|
|
||||||
|
|
||||||
super(Sighting, self).__init__(**kwargs)
|
super(Sighting, self).__init__(**kwargs)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue