Merge pull request #164 from oasis-open/157-allow-generic-custom-observables

Allow generic custom observables and custom observable extensions
stix2.0
Chris Lenk 2018-04-13 12:38:02 -04:00 committed by GitHub
commit 58be98104f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 73 additions and 14 deletions

View File

@ -6,9 +6,9 @@ import datetime as dt
import simplejson as json
from .exceptions import (AtLeastOnePropertyError, DependentPropertiesError,
ExtraPropertiesError, ImmutableError,
InvalidObjRefError, InvalidValueError,
from .exceptions import (AtLeastOnePropertyError, CustomContentError,
DependentPropertiesError, ExtraPropertiesError,
ImmutableError, InvalidObjRefError, InvalidValueError,
MissingPropertiesError,
MutuallyExclusivePropertiesError)
from .markings.utils import validate
@ -61,6 +61,8 @@ class _STIXBase(collections.Mapping):
try:
kwargs[prop_name] = prop.clean(kwargs[prop_name])
except ValueError as exc:
if self.__allow_custom and isinstance(exc, CustomContentError):
return
raise InvalidValueError(self.__class__, prop_name, reason=str(exc))
# interproperty constraint methods
@ -97,6 +99,7 @@ class _STIXBase(collections.Mapping):
def __init__(self, allow_custom=False, **kwargs):
cls = self.__class__
self.__allow_custom = allow_custom
# Use the same timestamp for any auto-generated datetimes
self.__now = get_timestamp()

View File

@ -163,6 +163,13 @@ class ParseError(STIXError, ValueError):
super(ParseError, self).__init__(msg)
class CustomContentError(STIXError, ValueError):
"""Custom STIX Content (SDO, Observable, Extension, etc.) detected."""
def __init__(self, msg):
super(CustomContentError, self).__init__(msg)
class InvalidSelectorError(STIXError, AssertionError):
"""Granular Marking selector violation. The selector must resolve into an existing STIX object property."""

View File

@ -363,6 +363,7 @@ def test_parse_custom_observable_object():
}"""
nt = stix2.parse_observable(nt_string, [])
assert isinstance(nt, stix2.core._STIXBase)
assert nt.property1 == 'something'
@ -372,10 +373,46 @@ def test_parse_unregistered_custom_observable_object():
"property1": "something"
}"""
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
with pytest.raises(stix2.exceptions.CustomContentError) as excinfo:
stix2.parse_observable(nt_string)
assert "Can't parse unknown observable type" in str(excinfo.value)
parsed_custom = stix2.parse_observable(nt_string, allow_custom=True)
assert parsed_custom['property1'] == 'something'
with pytest.raises(AttributeError) as excinfo:
assert parsed_custom.property1 == 'something'
assert not isinstance(parsed_custom, stix2.core._STIXBase)
def test_parse_unregistered_custom_observable_object_with_no_type():
nt_string = """{
"property1": "something"
}"""
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
stix2.parse_observable(nt_string, allow_custom=True)
assert "Can't parse observable with no 'type' property" in str(excinfo.value)
def test_parse_observed_data_with_custom_observable():
input_str = """{
"type": "observed-data",
"id": "observed-data--dc20c4ca-a2a3-4090-a5d5-9558c3af4758",
"created": "2016-04-06T19:58:16.000Z",
"modified": "2016-04-06T19:58:16.000Z",
"first_observed": "2015-12-21T19:00:00Z",
"last_observed": "2015-12-21T19:00:00Z",
"number_observed": 1,
"objects": {
"0": {
"type": "x-foobar-observable",
"property1": "something"
}
}
}"""
parsed = stix2.parse(input_str, allow_custom=True)
assert parsed.objects['0']['property1'] == 'something'
def test_parse_invalid_custom_observable_object():
nt_string = """{
@ -591,7 +628,11 @@ def test_parse_observable_with_unregistered_custom_extension():
with pytest.raises(ValueError) as excinfo:
stix2.parse_observable(input_str)
assert "Can't parse Unknown extension type" in str(excinfo.value)
assert "Can't parse unknown extension type" in str(excinfo.value)
parsed_ob = stix2.parse_observable(input_str, allow_custom=True)
assert parsed_ob['extensions']['x-foobar-ext']['property1'] == 'foo'
assert not isinstance(parsed_ob['extensions']['x-foobar-ext'], stix2.core._STIXBase)
def test_register_custom_object():

View File

@ -8,8 +8,8 @@ Observable and do not have a ``_type`` attribute.
from collections import OrderedDict
from ..base import _Extension, _Observable, _STIXBase
from ..exceptions import (AtLeastOnePropertyError, DependentPropertiesError,
ParseError)
from ..exceptions import (AtLeastOnePropertyError, CustomContentError,
DependentPropertiesError, ParseError)
from ..properties import (BinaryProperty, BooleanProperty, DictionaryProperty,
EmbeddedObjectProperty, EnumProperty, FloatProperty,
HashesProperty, HexProperty, IntegerProperty,
@ -67,7 +67,7 @@ class ExtensionsProperty(DictionaryProperty):
else:
raise ValueError("Cannot determine extension type.")
else:
raise ValueError("The key used in the extensions dictionary is not an extension type name")
raise CustomContentError("Can't parse unknown extension type: {}".format(key))
else:
raise ValueError("The enclosing type '%s' has no extensions defined" % self.enclosing_type)
return dictified
@ -923,15 +923,23 @@ def parse_observable(data, _valid_refs=None, allow_custom=False):
try:
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
except KeyError:
raise ParseError("Can't parse unknown observable type '%s'! For custom observables, "
"use the CustomObservable decorator." % obj['type'])
if allow_custom:
# flag allows for unknown custom objects too, but will not
# be parsed into STIX observable object, just returned as is
return obj
raise CustomContentError("Can't parse unknown observable type '%s'! For custom observables, "
"use the CustomObservable decorator." % obj['type'])
if 'extensions' in obj and obj['type'] in EXT_MAP:
for name, ext in obj['extensions'].items():
if name not in EXT_MAP[obj['type']]:
raise ParseError("Can't parse Unknown extension type '%s' for observable type '%s'!" % (name, obj['type']))
ext_class = EXT_MAP[obj['type']][name]
obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name])
try:
ext_class = EXT_MAP[obj['type']][name]
except KeyError:
if not allow_custom:
raise CustomContentError("Can't parse unknown extension type '%s'"
"for observable type '%s'!" % (name, obj['type']))
else: # extension was found
obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name])
return obj_class(allow_custom=allow_custom, **obj)