From b633fd37856ccd15257d7d6bae3223411a90e9ef Mon Sep 17 00:00:00 2001 From: Chris Lenk Date: Tue, 10 Apr 2018 12:54:27 -0400 Subject: [PATCH 1/4] WIP: Allow custom observables, extensions --- stix2/test/test_custom.py | 31 +++++++++++++++++++++++++++++++ stix2/v20/observables.py | 15 +++++++++++---- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/stix2/test/test_custom.py b/stix2/test/test_custom.py index cc8b32b..a50819b 100644 --- a/stix2/test/test_custom.py +++ b/stix2/test/test_custom.py @@ -363,6 +363,7 @@ def test_parse_custom_observable_object(): }""" nt = stix2.parse_observable(nt_string, []) + assert isinstance(nt, stix2.core._STIXBase) assert nt.property1 == 'something' @@ -376,6 +377,32 @@ def test_parse_unregistered_custom_observable_object(): stix2.parse_observable(nt_string) assert "Can't parse unknown observable type" in str(excinfo.value) + parsed_custom = stix2.parse_observable(nt_string, allow_custom=True) + assert parsed_custom['property1'] == 'something' + with pytest.raises(AttributeError) as excinfo: + assert parsed_custom.property1 == 'something' + assert not isinstance(parsed_custom, stix2.core._STIXBase) + + +def test_parse_observed_data_with_custom_observable(): + input_str = """{ + "type": "observed-data", + "id": "observed-data--dc20c4ca-a2a3-4090-a5d5-9558c3af4758", + "created": "2016-04-06T19:58:16.000Z", + "modified": "2016-04-06T19:58:16.000Z", + "first_observed": "2015-12-21T19:00:00Z", + "last_observed": "2015-12-21T19:00:00Z", + "number_observed": 1, + "objects": { + "0": { + "type": "x-foobar-observable", + "property1": "something" + } + } + }""" + parsed = stix2.parse(input_str, allow_custom=True) + assert parsed.objects['0']['property1'] == 'something' + def test_parse_invalid_custom_observable_object(): nt_string = """{ @@ -585,6 +612,10 @@ def test_parse_observable_with_unregistered_custom_extension(): stix2.parse_observable(input_str) assert "Can't parse Unknown extension type" in str(excinfo.value) + parsed_ob = stix2.parse_observable(input_str, allow_custom=True) + assert parsed_ob['extensions']['x-foobar-ext']['property1'] == 'foo' + assert not isinstance(parsed_ob['extensions']['x-foobar-ext'], stix2.core._STIXBase) + def test_register_custom_object(): # Not the way to register custom object. diff --git a/stix2/v20/observables.py b/stix2/v20/observables.py index 83600b0..4449527 100644 --- a/stix2/v20/observables.py +++ b/stix2/v20/observables.py @@ -923,15 +923,22 @@ def parse_observable(data, _valid_refs=None, allow_custom=False): try: obj_class = OBJ_MAP_OBSERVABLE[obj['type']] except KeyError: + if allow_custom: + # flag allows for unknown custom objects too, but will not + # be parsed into STIX observable object, just returned as is + return obj raise ParseError("Can't parse unknown observable type '%s'! For custom observables, " "use the CustomObservable decorator." % obj['type']) if 'extensions' in obj and obj['type'] in EXT_MAP: for name, ext in obj['extensions'].items(): - if name not in EXT_MAP[obj['type']]: - raise ParseError("Can't parse Unknown extension type '%s' for observable type '%s'!" % (name, obj['type'])) - ext_class = EXT_MAP[obj['type']][name] - obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name]) + try: + ext_class = EXT_MAP[obj['type']][name] + except KeyError: + if not allow_custom: + raise ParseError("Can't parse Unknown extension type '%s' for observable type '%s'!" % (name, obj['type'])) + else: # extension was found + obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name]) return obj_class(allow_custom=allow_custom, **obj) From 9ef5b395a8b702eae048f4688e7b8d16eda61795 Mon Sep 17 00:00:00 2001 From: Chris Lenk Date: Thu, 12 Apr 2018 14:20:24 -0400 Subject: [PATCH 2/4] Fix allowing custom observables and extensions --- stix2/base.py | 16 +++++++++++----- stix2/test/test_custom.py | 17 +++++++++++++++++ stix2/v20/observables.py | 4 ++-- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/stix2/base.py b/stix2/base.py index 898f489..7ca4740 100644 --- a/stix2/base.py +++ b/stix2/base.py @@ -10,7 +10,7 @@ from .exceptions import (AtLeastOnePropertyError, DependentPropertiesError, ExtraPropertiesError, ImmutableError, InvalidObjRefError, InvalidValueError, MissingPropertiesError, - MutuallyExclusivePropertiesError) + MutuallyExclusivePropertiesError, ParseError) from .markings.utils import validate from .utils import NOW, find_property_index, format_datetime, get_timestamp from .utils import new_version as _new_version @@ -49,7 +49,7 @@ class _STIXBase(collections.Mapping): return all_properties - def _check_property(self, prop_name, prop, kwargs): + def _check_property(self, prop_name, prop, kwargs, allow_custom=False): if prop_name not in kwargs: if hasattr(prop, 'default'): value = prop.default() @@ -61,6 +61,8 @@ class _STIXBase(collections.Mapping): try: kwargs[prop_name] = prop.clean(kwargs[prop_name]) except ValueError as exc: + if allow_custom and isinstance(exc, ParseError): + return raise InvalidValueError(self.__class__, prop_name, reason=str(exc)) # interproperty constraint methods @@ -125,7 +127,11 @@ class _STIXBase(collections.Mapping): raise MissingPropertiesError(cls, missing_kwargs) for prop_name, prop_metadata in cls._properties.items(): - self._check_property(prop_name, prop_metadata, setting_kwargs) + try: + self._check_property(prop_name, prop_metadata, setting_kwargs, allow_custom) + except ParseError as err: + if not allow_custom: + raise err self._inner = setting_kwargs @@ -244,8 +250,8 @@ class _Observable(_STIXBase): if ref_type not in allowed_types: raise InvalidObjRefError(self.__class__, prop_name, "object reference '%s' is of an invalid type '%s'" % (ref, ref_type)) - def _check_property(self, prop_name, prop, kwargs): - super(_Observable, self)._check_property(prop_name, prop, kwargs) + def _check_property(self, prop_name, prop, kwargs, allow_custom=False): + super(_Observable, self)._check_property(prop_name, prop, kwargs, allow_custom) if prop_name not in kwargs: return diff --git a/stix2/test/test_custom.py b/stix2/test/test_custom.py index a50819b..d46acd4 100644 --- a/stix2/test/test_custom.py +++ b/stix2/test/test_custom.py @@ -617,6 +617,23 @@ def test_parse_observable_with_unregistered_custom_extension(): assert not isinstance(parsed_ob['extensions']['x-foobar-ext'], stix2.core._STIXBase) +def test_parse_observable_with_unregistered_custom_extension_dict(): + input_dict = { + "type": "domain-name", + "value": "example.com", + "extensions": { + "x-foobar-ext": { + "property1": "foo", + "property2": 12 + } + } + } + + with pytest.raises(ValueError) as excinfo: + stix2.v20.observables.DomainName(**input_dict) + assert "Can't parse unknown extension type" in str(excinfo.value) + + def test_register_custom_object(): # Not the way to register custom object. class CustomObject2(object): diff --git a/stix2/v20/observables.py b/stix2/v20/observables.py index 4449527..7dc7c02 100644 --- a/stix2/v20/observables.py +++ b/stix2/v20/observables.py @@ -67,7 +67,7 @@ class ExtensionsProperty(DictionaryProperty): else: raise ValueError("Cannot determine extension type.") else: - raise ValueError("The key used in the extensions dictionary is not an extension type name") + raise ParseError("Can't parse unknown extension type: {}".format(key)) else: raise ValueError("The enclosing type '%s' has no extensions defined" % self.enclosing_type) return dictified @@ -936,7 +936,7 @@ def parse_observable(data, _valid_refs=None, allow_custom=False): ext_class = EXT_MAP[obj['type']][name] except KeyError: if not allow_custom: - raise ParseError("Can't parse Unknown extension type '%s' for observable type '%s'!" % (name, obj['type'])) + raise ParseError("Can't parse unknown extension type '%s' for observable type '%s'!" % (name, obj['type'])) else: # extension was found obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name]) From 91376586d4e311f9e24c686b70de7a247772df7d Mon Sep 17 00:00:00 2001 From: Chris Lenk Date: Thu, 12 Apr 2018 16:33:08 -0400 Subject: [PATCH 3/4] Simplify allowing custom observables/extensions --- stix2/base.py | 15 ++++++--------- stix2/test/test_custom.py | 19 +------------------ 2 files changed, 7 insertions(+), 27 deletions(-) diff --git a/stix2/base.py b/stix2/base.py index 7ca4740..05afe3f 100644 --- a/stix2/base.py +++ b/stix2/base.py @@ -49,7 +49,7 @@ class _STIXBase(collections.Mapping): return all_properties - def _check_property(self, prop_name, prop, kwargs, allow_custom=False): + def _check_property(self, prop_name, prop, kwargs): if prop_name not in kwargs: if hasattr(prop, 'default'): value = prop.default() @@ -61,7 +61,7 @@ class _STIXBase(collections.Mapping): try: kwargs[prop_name] = prop.clean(kwargs[prop_name]) except ValueError as exc: - if allow_custom and isinstance(exc, ParseError): + if self.__allow_custom and isinstance(exc, ParseError): return raise InvalidValueError(self.__class__, prop_name, reason=str(exc)) @@ -99,6 +99,7 @@ class _STIXBase(collections.Mapping): def __init__(self, allow_custom=False, **kwargs): cls = self.__class__ + self.__allow_custom = allow_custom # Use the same timestamp for any auto-generated datetimes self.__now = get_timestamp() @@ -127,11 +128,7 @@ class _STIXBase(collections.Mapping): raise MissingPropertiesError(cls, missing_kwargs) for prop_name, prop_metadata in cls._properties.items(): - try: - self._check_property(prop_name, prop_metadata, setting_kwargs, allow_custom) - except ParseError as err: - if not allow_custom: - raise err + self._check_property(prop_name, prop_metadata, setting_kwargs) self._inner = setting_kwargs @@ -250,8 +247,8 @@ class _Observable(_STIXBase): if ref_type not in allowed_types: raise InvalidObjRefError(self.__class__, prop_name, "object reference '%s' is of an invalid type '%s'" % (ref, ref_type)) - def _check_property(self, prop_name, prop, kwargs, allow_custom=False): - super(_Observable, self)._check_property(prop_name, prop, kwargs, allow_custom) + def _check_property(self, prop_name, prop, kwargs): + super(_Observable, self)._check_property(prop_name, prop, kwargs) if prop_name not in kwargs: return diff --git a/stix2/test/test_custom.py b/stix2/test/test_custom.py index d46acd4..8da2a7a 100644 --- a/stix2/test/test_custom.py +++ b/stix2/test/test_custom.py @@ -610,30 +610,13 @@ def test_parse_observable_with_unregistered_custom_extension(): with pytest.raises(ValueError) as excinfo: stix2.parse_observable(input_str) - assert "Can't parse Unknown extension type" in str(excinfo.value) + assert "Can't parse unknown extension type" in str(excinfo.value) parsed_ob = stix2.parse_observable(input_str, allow_custom=True) assert parsed_ob['extensions']['x-foobar-ext']['property1'] == 'foo' assert not isinstance(parsed_ob['extensions']['x-foobar-ext'], stix2.core._STIXBase) -def test_parse_observable_with_unregistered_custom_extension_dict(): - input_dict = { - "type": "domain-name", - "value": "example.com", - "extensions": { - "x-foobar-ext": { - "property1": "foo", - "property2": 12 - } - } - } - - with pytest.raises(ValueError) as excinfo: - stix2.v20.observables.DomainName(**input_dict) - assert "Can't parse unknown extension type" in str(excinfo.value) - - def test_register_custom_object(): # Not the way to register custom object. class CustomObject2(object): From fc6a33b23e2a7131249b89eb5a1b8394d328ca26 Mon Sep 17 00:00:00 2001 From: Chris Lenk Date: Fri, 13 Apr 2018 11:18:56 -0400 Subject: [PATCH 4/4] Disallow missing 'type' property with allow_custom There was a bug where if you allowed custom content the library would parse an object without the required 'type' property. --- stix2/base.py | 10 +++++----- stix2/exceptions.py | 7 +++++++ stix2/test/test_custom.py | 12 +++++++++++- stix2/v20/observables.py | 13 +++++++------ 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/stix2/base.py b/stix2/base.py index 05afe3f..3219007 100644 --- a/stix2/base.py +++ b/stix2/base.py @@ -6,11 +6,11 @@ import datetime as dt import simplejson as json -from .exceptions import (AtLeastOnePropertyError, DependentPropertiesError, - ExtraPropertiesError, ImmutableError, - InvalidObjRefError, InvalidValueError, +from .exceptions import (AtLeastOnePropertyError, CustomContentError, + DependentPropertiesError, ExtraPropertiesError, + ImmutableError, InvalidObjRefError, InvalidValueError, MissingPropertiesError, - MutuallyExclusivePropertiesError, ParseError) + MutuallyExclusivePropertiesError) from .markings.utils import validate from .utils import NOW, find_property_index, format_datetime, get_timestamp from .utils import new_version as _new_version @@ -61,7 +61,7 @@ class _STIXBase(collections.Mapping): try: kwargs[prop_name] = prop.clean(kwargs[prop_name]) except ValueError as exc: - if self.__allow_custom and isinstance(exc, ParseError): + if self.__allow_custom and isinstance(exc, CustomContentError): return raise InvalidValueError(self.__class__, prop_name, reason=str(exc)) diff --git a/stix2/exceptions.py b/stix2/exceptions.py index 841a8e9..79c5a81 100644 --- a/stix2/exceptions.py +++ b/stix2/exceptions.py @@ -163,6 +163,13 @@ class ParseError(STIXError, ValueError): super(ParseError, self).__init__(msg) +class CustomContentError(STIXError, ValueError): + """Custom STIX Content (SDO, Observable, Extension, etc.) detected.""" + + def __init__(self, msg): + super(CustomContentError, self).__init__(msg) + + class InvalidSelectorError(STIXError, AssertionError): """Granular Marking selector violation. The selector must resolve into an existing STIX object property.""" diff --git a/stix2/test/test_custom.py b/stix2/test/test_custom.py index 8da2a7a..d655e5d 100644 --- a/stix2/test/test_custom.py +++ b/stix2/test/test_custom.py @@ -373,7 +373,7 @@ def test_parse_unregistered_custom_observable_object(): "property1": "something" }""" - with pytest.raises(stix2.exceptions.ParseError) as excinfo: + with pytest.raises(stix2.exceptions.CustomContentError) as excinfo: stix2.parse_observable(nt_string) assert "Can't parse unknown observable type" in str(excinfo.value) @@ -384,6 +384,16 @@ def test_parse_unregistered_custom_observable_object(): assert not isinstance(parsed_custom, stix2.core._STIXBase) +def test_parse_unregistered_custom_observable_object_with_no_type(): + nt_string = """{ + "property1": "something" + }""" + + with pytest.raises(stix2.exceptions.ParseError) as excinfo: + stix2.parse_observable(nt_string, allow_custom=True) + assert "Can't parse observable with no 'type' property" in str(excinfo.value) + + def test_parse_observed_data_with_custom_observable(): input_str = """{ "type": "observed-data", diff --git a/stix2/v20/observables.py b/stix2/v20/observables.py index 7dc7c02..b5ffe49 100644 --- a/stix2/v20/observables.py +++ b/stix2/v20/observables.py @@ -8,8 +8,8 @@ Observable and do not have a ``_type`` attribute. from collections import OrderedDict from ..base import _Extension, _Observable, _STIXBase -from ..exceptions import (AtLeastOnePropertyError, DependentPropertiesError, - ParseError) +from ..exceptions import (AtLeastOnePropertyError, CustomContentError, + DependentPropertiesError, ParseError) from ..properties import (BinaryProperty, BooleanProperty, DictionaryProperty, EmbeddedObjectProperty, EnumProperty, FloatProperty, HashesProperty, HexProperty, IntegerProperty, @@ -67,7 +67,7 @@ class ExtensionsProperty(DictionaryProperty): else: raise ValueError("Cannot determine extension type.") else: - raise ParseError("Can't parse unknown extension type: {}".format(key)) + raise CustomContentError("Can't parse unknown extension type: {}".format(key)) else: raise ValueError("The enclosing type '%s' has no extensions defined" % self.enclosing_type) return dictified @@ -927,8 +927,8 @@ def parse_observable(data, _valid_refs=None, allow_custom=False): # flag allows for unknown custom objects too, but will not # be parsed into STIX observable object, just returned as is return obj - raise ParseError("Can't parse unknown observable type '%s'! For custom observables, " - "use the CustomObservable decorator." % obj['type']) + raise CustomContentError("Can't parse unknown observable type '%s'! For custom observables, " + "use the CustomObservable decorator." % obj['type']) if 'extensions' in obj and obj['type'] in EXT_MAP: for name, ext in obj['extensions'].items(): @@ -936,7 +936,8 @@ def parse_observable(data, _valid_refs=None, allow_custom=False): ext_class = EXT_MAP[obj['type']][name] except KeyError: if not allow_custom: - raise ParseError("Can't parse unknown extension type '%s' for observable type '%s'!" % (name, obj['type'])) + raise CustomContentError("Can't parse unknown extension type '%s'" + "for observable type '%s'!" % (name, obj['type'])) else: # extension was found obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name])