diff --git a/stix2/__init__.py b/stix2/__init__.py index f3dfd20..392e947 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -95,46 +95,70 @@ EXT_MAP = { } -def parse(data): - """Deserialize a string or file-like object into a STIX object""" +def parse(data, allow_custom=False): + """Deserialize a string or file-like object into a STIX object. + + Args: + data: The STIX 2 string to be parsed. + allow_custom (bool): Whether to allow custom properties or not. Default: False. + + Returns: + An instantiated Python STIX object. + """ obj = get_dict(data) if 'type' not in obj: - # TODO parse external references, kill chain phases, and granular markings - pass - else: - try: - obj_class = OBJ_MAP[obj['type']] - except KeyError: - # TODO handle custom objects - raise ValueError("Can't parse unknown object type '%s'!" % obj['type']) - return obj_class(**obj) + raise exceptions.ParseError("Can't parse object with no 'type' property: %s" % str(obj)) - return obj + try: + obj_class = OBJ_MAP[obj['type']] + except KeyError: + raise exceptions.ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % obj['type']) + return obj_class(allow_custom=allow_custom, **obj) -def parse_observable(data, _valid_refs): - """Deserialize a string or file-like object into a STIX Cyber Observable - object. +def parse_observable(data, _valid_refs=[], allow_custom=False): + """Deserialize a string or file-like object into a STIX Cyber Observable object. + + Args: + data: The STIX 2 string to be parsed. + _valid_refs: A list of object references valid for the scope of the object being parsed. + allow_custom: Whether to allow custom properties or not. Default: False. + + Returns: + An instantiated Python STIX Cyber Observable object. """ obj = get_dict(data) obj['_valid_refs'] = _valid_refs if 'type' not in obj: - raise ValueError("'type' is a required property!") + raise exceptions.ParseError("Can't parse object with no 'type' property: %s" % str(obj)) try: obj_class = OBJ_MAP_OBSERVABLE[obj['type']] except KeyError: - # TODO handle custom observable objects - raise ValueError("Can't parse unknown object type '%s'!" % obj['type']) + raise exceptions.ParseError("Can't parse unknown object type '%s'! For custom observables, use the CustomObservable decorator." % obj['type']) if 'extensions' in obj and obj['type'] in EXT_MAP: for name, ext in obj['extensions'].items(): if name not in EXT_MAP[obj['type']]: - raise ValueError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type'])) + raise exceptions.ParseError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type'])) ext_class = EXT_MAP[obj['type']][name] - obj['extensions'][name] = ext_class(**obj['extensions'][name]) + obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name]) - return obj_class(**obj) + return obj_class(allow_custom=allow_custom, **obj) + + +def _register_type(new_type): + """Register a custom STIX Object type. + """ + + OBJ_MAP[new_type._type] = new_type + + +def _register_observable(new_observable): + """Register a custom STIX Cyber Observable type. + """ + + OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable diff --git a/stix2/base.py b/stix2/base.py index c40c9e6..cb17f11 100644 --- a/stix2/base.py +++ b/stix2/base.py @@ -83,20 +83,26 @@ class _STIXBase(collections.Mapping): # TODO: check selectors pass - def __init__(self, **kwargs): + def __init__(self, allow_custom=False, **kwargs): cls = self.__class__ # Use the same timestamp for any auto-generated datetimes self.__now = get_timestamp() # Detect any keyword arguments not allowed for a specific type - extra_kwargs = list(set(kwargs) - set(cls._properties)) - if extra_kwargs: - raise ExtraPropertiesError(cls, extra_kwargs) + custom_props = kwargs.pop('custom_properties', {}) + if custom_props and not isinstance(custom_props, dict): + raise ValueError("'custom_properties' must be a dictionary") + if not allow_custom: + extra_kwargs = list(set(kwargs) - set(cls._properties)) + if extra_kwargs: + raise ExtraPropertiesError(cls, extra_kwargs) # Remove any keyword arguments whose value is None setting_kwargs = {} - for prop_name, prop_value in kwargs.items(): + props = kwargs.copy() + props.update(custom_props) + for prop_name, prop_value in props.items(): if prop_value is not None: setting_kwargs[prop_name] = prop_value diff --git a/stix2/exceptions.py b/stix2/exceptions.py index 3043047..ef47dd0 100644 --- a/stix2/exceptions.py +++ b/stix2/exceptions.py @@ -150,3 +150,10 @@ class RevokeError(STIXError, ValueError): return "Cannot revoke an already revoked object." else: return "Cannot create a new version of a revoked object." + + +class ParseError(STIXError, ValueError): + """Could not parse object""" + + def __init__(self, msg): + super(ParseError, self).__init__(msg) diff --git a/stix2/observables.py b/stix2/observables.py index 9c1b368..086dc45 100644 --- a/stix2/observables.py +++ b/stix2/observables.py @@ -5,6 +5,8 @@ embedded in Email Message objects, inherit from _STIXBase instead of Observable and do not have a '_type' attribute. """ +import stix2 + from .base import _Extension, _Observable, _STIXBase from .exceptions import AtLeastOnePropertyError, DependentPropertiesError from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty, @@ -586,3 +588,27 @@ class X509Certificate(_Observable): 'subject_public_key_exponent': IntegerProperty(), 'x509_v3_extensions': EmbeddedObjectProperty(type=X509V3ExtenstionsType), } + + +def CustomObservable(type='x-custom-observable', properties={}): + """Custom STIX Cyber Observable type decorator + + """ + + def custom_builder(cls): + + class _Custom(cls, _Observable): + _type = type + _properties = { + 'type': TypeProperty(_type), + } + _properties.update(properties) + + def __init__(self, **kwargs): + _Observable.__init__(self, **kwargs) + cls.__init__(self, **kwargs) + + stix2._register_observable(_Custom) + return _Custom + + return custom_builder diff --git a/stix2/sdo.py b/stix2/sdo.py index 73bb34d..1ec3b21 100644 --- a/stix2/sdo.py +++ b/stix2/sdo.py @@ -1,5 +1,7 @@ """STIX 2.0 Domain Objects""" +import stix2 + from .base import _STIXBase from .common import COMMON_PROPERTIES from .other import KillChainPhase @@ -190,3 +192,51 @@ class Vulnerability(_STIXBase): 'name': StringProperty(required=True), 'description': StringProperty(), }) + + +def CustomObject(type='x-custom-type', properties={}): + """Custom STIX Object type decorator + + Example 1: + + @CustomObject('x-type-name', { + 'property1': StringProperty(required=True), + 'property2': IntegerProperty(), + }) + class MyNewObjectType(): + pass + + Supply an __init__() function to add any special validations to the custom + type. Don't call super().__init() though - doing so will cause an error. + + Example 2: + + @CustomObject('x-type-name', { + 'property1': StringProperty(required=True), + 'property2': IntegerProperty(), + }) + class MyNewObjectType(): + def __init__(self, property2=None, **kwargs): + if property2 and property2 < 10: + raise ValueError("'property2' is too small.") + """ + + def custom_builder(cls): + + class _Custom(cls, _STIXBase): + _type = type + _properties = COMMON_PROPERTIES.copy() + _properties.update({ + 'id': IDProperty(_type), + 'type': TypeProperty(_type), + }) + _properties.update(properties) + + def __init__(self, **kwargs): + _STIXBase.__init__(self, **kwargs) + cls.__init__(self, **kwargs) + + stix2._register_type(_Custom) + return _Custom + + return custom_builder diff --git a/stix2/test/test_custom.py b/stix2/test/test_custom.py new file mode 100644 index 0000000..1a816bd --- /dev/null +++ b/stix2/test/test_custom.py @@ -0,0 +1,168 @@ +import pytest + +import stix2 + + +def test_identity_custom_property(): + with pytest.raises(ValueError): + stix2.Identity( + id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c", + created="2015-12-21T19:59:11Z", + modified="2015-12-21T19:59:11Z", + name="John Smith", + identity_class="individual", + custom_properties="foobar", + ) + + identity = stix2.Identity( + id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c", + created="2015-12-21T19:59:11Z", + modified="2015-12-21T19:59:11Z", + name="John Smith", + identity_class="individual", + custom_properties={ + "foo": "bar", + }, + ) + + assert identity.foo == "bar" + + +def test_identity_custom_property_invalid(): + with pytest.raises(stix2.exceptions.ExtraPropertiesError): + stix2.Identity( + id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c", + created="2015-12-21T19:59:11Z", + modified="2015-12-21T19:59:11Z", + name="John Smith", + identity_class="individual", + x_foo="bar", + ) + + +def test_identity_custom_property_allowed(): + identity = stix2.Identity( + id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c", + created="2015-12-21T19:59:11Z", + modified="2015-12-21T19:59:11Z", + name="John Smith", + identity_class="individual", + x_foo="bar", + allow_custom=True, + ) + assert identity.x_foo == "bar" + + +@pytest.mark.parametrize("data", [ + """{ + "type": "identity", + "id": "identity--311b2d2d-f010-5473-83ec-1edf84858f4c", + "created": "2015-12-21T19:59:11Z", + "modified": "2015-12-21T19:59:11Z", + "name": "John Smith", + "identity_class": "individual", + "foo": "bar" + }""", +]) +def test_parse_identity_custom_property(data): + with pytest.raises(stix2.exceptions.ExtraPropertiesError): + identity = stix2.parse(data) + + identity = stix2.parse(data, allow_custom=True) + assert identity.foo == "bar" + + +@stix2.sdo.CustomObject('x-new-type', { + 'property1': stix2.properties.StringProperty(required=True), + 'property2': stix2.properties.IntegerProperty(), +}) +class NewType(): + def __init__(self, property2=None, **kwargs): + if property2 and property2 < 10: + raise ValueError("'property2' is too small.") + + +def test_custom_object_type(): + nt = NewType(property1='something') + assert nt.property1 == 'something' + + with pytest.raises(stix2.exceptions.MissingPropertiesError): + NewType(property2=42) + + with pytest.raises(ValueError): + NewType(property1='something', property2=4) + + +def test_parse_custom_object_type(): + nt_string = """{ + "type": "x-new-type", + "created": "2015-12-21T19:59:11Z", + "property1": "something" + }""" + + nt = stix2.parse(nt_string) + assert nt.property1 == 'something' + + +@stix2.observables.CustomObservable('x-new-observable', { + 'property1': stix2.properties.StringProperty(required=True), + 'property2': stix2.properties.IntegerProperty(), +}) +class NewObservable(): + def __init__(self, property2=None, **kwargs): + if property2 and property2 < 10: + raise ValueError("'property2' is too small.") + + +def test_custom_observable_object(): + no = NewObservable(property1='something') + assert no.property1 == 'something' + + with pytest.raises(stix2.exceptions.MissingPropertiesError): + NewObservable(property2=42) + + with pytest.raises(ValueError): + NewObservable(property1='something', property2=4) + + +def test_parse_custom_observable_object(): + nt_string = """{ + "type": "x-new-observable", + "property1": "something" + }""" + + nt = stix2.parse_observable(nt_string) + assert nt.property1 == 'something' + + +def test_observable_custom_property(): + with pytest.raises(ValueError): + NewObservable( + property1='something', + custom_properties="foobar", + ) + + no = NewObservable( + property1='something', + custom_properties={ + "foo": "bar", + }, + ) + assert no.foo == "bar" + + +def test_observable_custom_property_invalid(): + with pytest.raises(stix2.exceptions.ExtraPropertiesError): + NewObservable( + property1='something', + x_foo="bar", + ) + + +def test_observable_custom_property_allowed(): + no = NewObservable( + property1='something', + x_foo="bar", + allow_custom=True, + ) + assert no.x_foo == "bar" diff --git a/stix2/test/test_identity.py b/stix2/test/test_identity.py index 3952d25..b2c166c 100644 --- a/stix2/test/test_identity.py +++ b/stix2/test/test_identity.py @@ -19,7 +19,7 @@ EXPECTED = """{ def test_identity_example(): - report = stix2.Identity( + identity = stix2.Identity( id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c", created="2015-12-21T19:59:11Z", modified="2015-12-21T19:59:11Z", @@ -27,7 +27,7 @@ def test_identity_example(): identity_class="individual", ) - assert str(report) == EXPECTED + assert str(identity) == EXPECTED @pytest.mark.parametrize("data", [ @@ -50,4 +50,16 @@ def test_parse_identity(data): assert identity.modified == dt.datetime(2015, 12, 21, 19, 59, 11, tzinfo=pytz.utc) assert identity.name == "John Smith" + +def test_parse_no_type(): + with pytest.raises(stix2.exceptions.ParseError): + stix2.parse(""" + { + "id": "identity--311b2d2d-f010-5473-83ec-1edf84858f4c", + "created": "2015-12-21T19:59:11Z", + "modified": "2015-12-21T19:59:11Z", + "name": "John Smith", + "identity_class": "individual" + }""") + # TODO: Add other examples