diff --git a/stix2/base.py b/stix2/base.py index 60dd78a..2e7f026 100644 --- a/stix2/base.py +++ b/stix2/base.py @@ -216,18 +216,14 @@ class _Observable(_STIXBase): try: allowed_types = prop.contained.valid_types except AttributeError: - try: - allowed_types = prop.valid_types - except AttributeError: - raise ValueError("'%s' is named like an object reference property but " - "is not an ObjectReferenceProperty or a ListProperty " - "containing ObjectReferenceProperty." % prop_name) + allowed_types = prop.valid_types + + try: + ref_type = self._STIXBase__valid_refs[ref] + except TypeError: + raise ValueError("'%s' must be created with _valid_refs as a dict, not a list." % self.__class__.__name__) if allowed_types: - try: - ref_type = self._STIXBase__valid_refs[ref] - except TypeError: - raise ValueError("'%s' must be created with _valid_refs as a dict, not a list." % self.__class__.__name__) if ref_type not in allowed_types: raise InvalidObjRefError(self.__class__, prop_name, "object reference '%s' is of an invalid type '%s'" % (ref, ref_type)) diff --git a/stix2/exceptions.py b/stix2/exceptions.py index 5a9e7b2..8a33afa 100644 --- a/stix2/exceptions.py +++ b/stix2/exceptions.py @@ -121,7 +121,7 @@ class DependentPropertiesError(STIXError, TypeError): def __str__(self): msg = "The property dependencies for {0}: ({1}) are not met." return msg.format(self.cls.__name__, - ", ".join(name for x in self.dependencies for name in x)) + ", ".join(x for x, y in self.dependencies)) class AtLeastOnePropertyError(STIXError, TypeError): diff --git a/stix2/observables.py b/stix2/observables.py index 6c82e99..b9dcf7f 100644 --- a/stix2/observables.py +++ b/stix2/observables.py @@ -29,16 +29,13 @@ class ObservableProperty(Property): except ValueError: raise ValueError("The observable property must contain a dictionary") if dictified == {}: - raise ValueError("The dictionary property must contain a non-empty dictionary") + raise ValueError("The observable property must contain a non-empty dictionary") valid_refs = dict((k, v['type']) for (k, v) in dictified.items()) # from .__init__ import parse_observable # avoid circular import for key, obj in dictified.items(): parsed_obj = parse_observable(obj, valid_refs) - if not issubclass(type(parsed_obj), _Observable): - raise ValueError("Objects in an observable property must be " - "Cyber Observable Objects") dictified[key] = parsed_obj return dictified @@ -58,7 +55,7 @@ class ExtensionsProperty(DictionaryProperty): except ValueError: raise ValueError("The extensions property must contain a dictionary") if dictified == {}: - raise ValueError("The dictionary property must contain a non-empty dictionary") + raise ValueError("The extensions property must contain a non-empty dictionary") if self.enclosing_type in EXT_MAP: specific_type_map = EXT_MAP[self.enclosing_type] @@ -74,7 +71,7 @@ class ExtensionsProperty(DictionaryProperty): else: raise ValueError("The key used in the extensions dictionary is not an extension type name") else: - raise ValueError("The enclosing type has no extensions defined") + raise ValueError("The enclosing type '%s' has no extensions defined" % self.enclosing_type) return dictified @@ -87,6 +84,7 @@ class Artifact(_Observable): ('payload_bin', BinaryProperty()), ('url', StringProperty()), ('hashes', HashesProperty()), + ('extensions', ExtensionsProperty(enclosing_type=_type)), ]) def _check_object_constraints(self): @@ -103,6 +101,7 @@ class AutonomousSystem(_Observable): ('number', IntegerProperty()), ('name', StringProperty()), ('rir', StringProperty()), + ('extensions', ExtensionsProperty(enclosing_type=_type)), ]) @@ -118,6 +117,7 @@ class Directory(_Observable): ('modified', TimestampProperty()), ('accessed', TimestampProperty()), ('contains_refs', ListProperty(ObjectReferenceProperty(valid_types=['file', 'directory']))), + ('extensions', ExtensionsProperty(enclosing_type=_type)), ]) @@ -128,6 +128,7 @@ class DomainName(_Observable): ('type', TypeProperty(_type)), ('value', StringProperty(required=True)), ('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name']))), + ('extensions', ExtensionsProperty(enclosing_type=_type)), ]) @@ -139,6 +140,8 @@ class EmailAddress(_Observable): ('value', StringProperty(required=True)), ('display_name', StringProperty()), ('belongs_to_ref', ObjectReferenceProperty(valid_types='user-account')), + ('extensions', ExtensionsProperty(enclosing_type=_type)), + ]) @@ -175,6 +178,7 @@ class EmailMessage(_Observable): ('body', StringProperty()), ('body_multipart', ListProperty(EmbeddedObjectProperty(type=EmailMIMEComponent))), ('raw_email_ref', ObjectReferenceProperty(valid_types='artifact')), + ('extensions', ExtensionsProperty(enclosing_type=_type)), ]) def _check_object_constraints(self): @@ -186,6 +190,7 @@ class EmailMessage(_Observable): class ArchiveExt(_Extension): + _type = 'archive-ext' _properties = OrderedDict() _properties.update([ ('contains_refs', ListProperty(ObjectReferenceProperty(valid_types='file'), required=True)), @@ -204,6 +209,7 @@ class AlternateDataStream(_STIXBase): class NTFSExt(_Extension): + _type = 'ntfs-ext' _properties = OrderedDict() _properties.update([ ('sid', StringProperty()), @@ -212,6 +218,7 @@ class NTFSExt(_Extension): class PDFExt(_Extension): + _type = 'pdf-ext' _properties = OrderedDict() _properties.update([ ('version', StringProperty()), @@ -223,6 +230,7 @@ class PDFExt(_Extension): class RasterImageExt(_Extension): + _type = 'raster-image-ext' _properties = OrderedDict() _properties.update([ ('image_height', IntegerProperty()), @@ -285,6 +293,7 @@ class WindowsPESection(_STIXBase): class WindowsPEBinaryExt(_Extension): + _type = 'windows-pebinary-ext' _properties = OrderedDict() _properties.update([ ('pe_type', StringProperty(required=True)), # open_vocab @@ -340,6 +349,7 @@ class IPv4Address(_Observable): ('value', StringProperty(required=True)), ('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))), ('belongs_to_refs', ListProperty(ObjectReferenceProperty(valid_types='autonomous-system'))), + ('extensions', ExtensionsProperty(enclosing_type=_type)), ]) @@ -351,6 +361,7 @@ class IPv6Address(_Observable): ('value', StringProperty(required=True)), ('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))), ('belongs_to_refs', ListProperty(ObjectReferenceProperty(valid_types='autonomous-system'))), + ('extensions', ExtensionsProperty(enclosing_type=_type)), ]) @@ -360,6 +371,7 @@ class MACAddress(_Observable): _properties.update([ ('type', TypeProperty(_type)), ('value', StringProperty(required=True)), + ('extensions', ExtensionsProperty(enclosing_type=_type)), ]) @@ -369,10 +381,12 @@ class Mutex(_Observable): _properties.update([ ('type', TypeProperty(_type)), ('name', StringProperty()), + ('extensions', ExtensionsProperty(enclosing_type=_type)), ]) class HTTPRequestExt(_Extension): + _type = 'http-request-ext' _properties = OrderedDict() _properties.update([ ('request_method', StringProperty(required=True)), @@ -385,6 +399,7 @@ class HTTPRequestExt(_Extension): class ICMPExt(_Extension): + _type = 'icmp-ext' _properties = OrderedDict() _properties.update([ ('icmp_type_hex', HexProperty(required=True)), @@ -393,6 +408,7 @@ class ICMPExt(_Extension): class SocketExt(_Extension): + _type = 'socket-ext' _properties = OrderedDict() _properties.update([ ('address_family', EnumProperty([ @@ -427,6 +443,7 @@ class SocketExt(_Extension): class TCPExt(_Extension): + _type = 'tcp-ext' _properties = OrderedDict() _properties.update([ ('src_flags_hex', HexProperty()), @@ -465,6 +482,7 @@ class NetworkTraffic(_Observable): class WindowsProcessExt(_Extension): + _type = 'windows-process-ext' _properties = OrderedDict() _properties.update([ ('aslr_enabled', BooleanProperty()), @@ -477,6 +495,7 @@ class WindowsProcessExt(_Extension): class WindowsServiceExt(_Extension): + _type = 'windows-service-ext' _properties = OrderedDict() _properties.update([ ('service_name', StringProperty(required=True)), @@ -556,6 +575,7 @@ class Software(_Observable): ('languages', ListProperty(StringProperty)), ('vendor', StringProperty()), ('version', StringProperty()), + ('extensions', ExtensionsProperty(enclosing_type=_type)), ]) @@ -565,10 +585,12 @@ class URL(_Observable): _properties.update([ ('type', TypeProperty(_type)), ('value', StringProperty(required=True)), + ('extensions', ExtensionsProperty(enclosing_type=_type)), ]) class UNIXAccountExt(_Extension): + _type = 'unix-account-ext' _properties = OrderedDict() _properties.update([ ('gid', IntegerProperty()), @@ -635,6 +657,7 @@ class WindowsRegistryKey(_Observable): ('modified', TimestampProperty()), ('creator_user_ref', ObjectReferenceProperty(valid_types='user-account')), ('number_of_subkeys', IntegerProperty()), + ('extensions', ExtensionsProperty(enclosing_type=_type)), ]) @property @@ -684,6 +707,7 @@ class X509Certificate(_Observable): ('subject_public_key_modulus', StringProperty()), ('subject_public_key_exponent', IntegerProperty()), ('x509_v3_extensions', EmbeddedObjectProperty(type=X509V3ExtenstionsType)), + ('extensions', ExtensionsProperty(enclosing_type=_type)), ]) @@ -708,39 +732,32 @@ OBJ_MAP_OBSERVABLE = { 'x509-certificate': X509Certificate, } -EXT_MAP_FILE = { - 'archive-ext': ArchiveExt, - 'ntfs-ext': NTFSExt, - 'pdf-ext': PDFExt, - 'raster-image-ext': RasterImageExt, - 'windows-pebinary-ext': WindowsPEBinaryExt -} - -EXT_MAP_NETWORK_TRAFFIC = { - 'http-request-ext': HTTPRequestExt, - 'icmp-ext': ICMPExt, - 'socket-ext': SocketExt, - 'tcp-ext': TCPExt, -} - -EXT_MAP_PROCESS = { - 'windows-process-ext': WindowsProcessExt, - 'windows-service-ext': WindowsServiceExt, -} - -EXT_MAP_USER_ACCOUNT = { - 'unix-account-ext': UNIXAccountExt, -} EXT_MAP = { - 'file': EXT_MAP_FILE, - 'network-traffic': EXT_MAP_NETWORK_TRAFFIC, - 'process': EXT_MAP_PROCESS, - 'user-account': EXT_MAP_USER_ACCOUNT, + 'file': { + 'archive-ext': ArchiveExt, + 'ntfs-ext': NTFSExt, + 'pdf-ext': PDFExt, + 'raster-image-ext': RasterImageExt, + 'windows-pebinary-ext': WindowsPEBinaryExt + }, + 'network-traffic': { + 'http-request-ext': HTTPRequestExt, + 'icmp-ext': ICMPExt, + 'socket-ext': SocketExt, + 'tcp-ext': TCPExt, + }, + 'process': { + 'windows-process-ext': WindowsProcessExt, + 'windows-service-ext': WindowsServiceExt, + }, + 'user-account': { + 'unix-account-ext': UNIXAccountExt, + }, } -def parse_observable(data, _valid_refs, allow_custom=False): +def parse_observable(data, _valid_refs=None, allow_custom=False): """Deserialize a string or file-like object into a STIX Cyber Observable object. @@ -756,19 +773,20 @@ def parse_observable(data, _valid_refs, allow_custom=False): """ obj = get_dict(data) - obj['_valid_refs'] = _valid_refs + obj['_valid_refs'] = _valid_refs or [] if 'type' not in obj: - raise ParseError("Can't parse object with no 'type' property: %s" % str(obj)) + raise ParseError("Can't parse observable with no 'type' property: %s" % str(obj)) try: obj_class = OBJ_MAP_OBSERVABLE[obj['type']] except KeyError: - raise ParseError("Can't parse unknown object type '%s'! For custom observables, use the CustomObservable decorator." % obj['type']) + raise ParseError("Can't parse unknown observable type '%s'! For custom observables, " + "use the CustomObservable decorator." % obj['type']) if 'extensions' in obj and obj['type'] in EXT_MAP: for name, ext in obj['extensions'].items(): if name not in EXT_MAP[obj['type']]: - raise ParseError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type'])) + raise ParseError("Can't parse Unknown extension type '%s' for observable type '%s'!" % (name, obj['type'])) ext_class = EXT_MAP[obj['type']][name] obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name]) @@ -807,6 +825,16 @@ def CustomObservable(type='x-custom-observable', properties=None): if not properties or not isinstance(properties, list): raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]") + # Check properties ending in "_ref/s" are ObjectReferenceProperties + for prop_name, prop in properties: + if prop_name.endswith('_ref') and not isinstance(prop, ObjectReferenceProperty): + raise ValueError("'%s' is named like an object reference property but " + "is not an ObjectReferenceProperty." % prop_name) + elif (prop_name.endswith('_refs') and (not isinstance(prop, ListProperty) + or not isinstance(prop.contained, ObjectReferenceProperty))): + raise ValueError("'%s' is named like an object reference list property but " + "is not a ListProperty containing ObjectReferenceProperty." % prop_name) + _properties.update(properties) def __init__(self, **kwargs): @@ -817,3 +845,50 @@ def CustomObservable(type='x-custom-observable', properties=None): return _Custom return custom_builder + + +def _register_extension(observable, new_extension): + """Register a custom extension to a STIX Cyber Observable type. + """ + + try: + observable_type = observable._type + except AttributeError: + raise ValueError("Unknown observable type. Custom observables must be " + "created with the @CustomObservable decorator.") + + try: + EXT_MAP[observable_type][new_extension._type] = new_extension + except KeyError: + if observable_type not in OBJ_MAP_OBSERVABLE: + raise ValueError("Unknown observable type '%s'. Custom observables " + "must be created with the @CustomObservable decorator." + % observable_type) + else: + EXT_MAP[observable_type] = {new_extension._type: new_extension} + + +def CustomExtension(observable=None, type='x-custom-observable', properties={}): + """Decorator for custom extensions to STIX Cyber Observables + """ + + if not observable or not issubclass(observable, _Observable): + raise ValueError("'observable' must be a valid Observable class!") + + def custom_builder(cls): + + class _Custom(cls, _Extension): + _type = type + _properties = { + 'extensions': ExtensionsProperty(enclosing_type=_type), + } + _properties.update(properties) + + def __init__(self, **kwargs): + _Extension.__init__(self, **kwargs) + cls.__init__(self, **kwargs) + + _register_extension(observable, _Custom) + return _Custom + + return custom_builder diff --git a/stix2/test/test_bundle.py b/stix2/test/test_bundle.py index 6f56d85..d70f63a 100644 --- a/stix2/test/test_bundle.py +++ b/stix2/test/test_bundle.py @@ -118,6 +118,20 @@ def test_create_bundle_with_arg_listarg_and_kwarg(indicator, malware, relationsh assert str(bundle) == EXPECTED_BUNDLE +def test_create_bundle_invalid(indicator, malware, relationship): + with pytest.raises(ValueError) as excinfo: + stix2.Bundle(objects=[1]) + assert excinfo.value.reason == "This property may only contain a dictionary or object" + + with pytest.raises(ValueError) as excinfo: + stix2.Bundle(objects=[{}]) + assert excinfo.value.reason == "This property may only contain a non-empty dictionary or object" + + with pytest.raises(ValueError) as excinfo: + stix2.Bundle(objects=[{'type': 'bundle'}]) + assert excinfo.value.reason == 'This property may not contain a Bundle object' + + def test_parse_bundle(): bundle = stix2.parse(EXPECTED_BUNDLE) diff --git a/stix2/test/test_custom.py b/stix2/test/test_custom.py index f25044e..a71650b 100644 --- a/stix2/test/test_custom.py +++ b/stix2/test/test_custom.py @@ -6,7 +6,7 @@ from .constants import FAKE_TIME def test_identity_custom_property(): - with pytest.raises(ValueError): + with pytest.raises(ValueError) as excinfo: stix2.Identity( id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c", created="2015-12-21T19:59:11Z", @@ -15,6 +15,7 @@ def test_identity_custom_property(): identity_class="individual", custom_properties="foobar", ) + assert str(excinfo.value) == "'custom_properties' must be a dictionary" identity = stix2.Identity( id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c", @@ -31,7 +32,7 @@ def test_identity_custom_property(): def test_identity_custom_property_invalid(): - with pytest.raises(stix2.exceptions.ExtraPropertiesError): + with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo: stix2.Identity( id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c", created="2015-12-21T19:59:11Z", @@ -40,6 +41,9 @@ def test_identity_custom_property_invalid(): identity_class="individual", x_foo="bar", ) + assert excinfo.value.cls == stix2.Identity + assert excinfo.value.properties == ['x_foo'] + assert "Unexpected properties for" in str(excinfo.value) def test_identity_custom_property_allowed(): @@ -67,8 +71,11 @@ def test_identity_custom_property_allowed(): }""", ]) def test_parse_identity_custom_property(data): - with pytest.raises(stix2.exceptions.ExtraPropertiesError): + with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo: identity = stix2.parse(data) + assert excinfo.value.cls == stix2.Identity + assert excinfo.value.properties == ['foo'] + assert "Unexpected properties for" in str(excinfo.value) identity = stix2.parse(data, allow_custom=True) assert identity.foo == "bar" @@ -88,11 +95,13 @@ def test_custom_object_type(): nt = NewType(property1='something') assert nt.property1 == 'something' - with pytest.raises(stix2.exceptions.MissingPropertiesError): + with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo: NewType(property2=42) + assert "No values for required properties" in str(excinfo.value) - with pytest.raises(ValueError): + with pytest.raises(ValueError) as excinfo: NewType(property1='something', property2=4) + assert "'property2' is too small." in str(excinfo.value) def test_parse_custom_object_type(): @@ -106,12 +115,25 @@ def test_parse_custom_object_type(): assert nt.property1 == 'something' +def test_parse_unregistered_custom_object_type(): + nt_string = """{ + "type": "x-foobar-observable", + "created": "2015-12-21T19:59:11Z", + "property1": "something" + }""" + + with pytest.raises(stix2.exceptions.ParseError) as excinfo: + stix2.parse(nt_string) + assert "Can't parse unknown object type" in str(excinfo.value) + assert "use the CustomObject decorator." in str(excinfo.value) + + @stix2.observables.CustomObservable('x-new-observable', [ ('property1', stix2.properties.StringProperty(required=True)), ('property2', stix2.properties.IntegerProperty()), ('x_property3', stix2.properties.BooleanProperty()), ]) -class NewObservable(object): +class NewObservable(): def __init__(self, property2=None, **kwargs): if property2 and property2 < 10: raise ValueError("'property2' is too small.") @@ -121,11 +143,59 @@ def test_custom_observable_object(): no = NewObservable(property1='something') assert no.property1 == 'something' - with pytest.raises(stix2.exceptions.MissingPropertiesError): + with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo: NewObservable(property2=42) + assert excinfo.value.properties == ['property1'] + assert "No values for required properties" in str(excinfo.value) - with pytest.raises(ValueError): + with pytest.raises(ValueError) as excinfo: NewObservable(property1='something', property2=4) + assert "'property2' is too small." in str(excinfo.value) + + +def test_custom_observable_object_invalid_ref_property(): + with pytest.raises(ValueError) as excinfo: + @stix2.observables.CustomObservable('x-new-obs', [ + ('property_ref', stix2.properties.StringProperty()), + ]) + class NewObs(): + pass + assert "is named like an object reference property but is not an ObjectReferenceProperty" in str(excinfo.value) + + +def test_custom_observable_object_invalid_refs_property(): + with pytest.raises(ValueError) as excinfo: + @stix2.observables.CustomObservable('x-new-obs', [ + ('property_refs', stix2.properties.StringProperty()), + ]) + class NewObs(): + pass + assert "is named like an object reference list property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value) + + +def test_custom_observable_object_invalid_refs_list_property(): + with pytest.raises(ValueError) as excinfo: + @stix2.observables.CustomObservable('x-new-obs', [ + ('property_refs', stix2.properties.ListProperty(stix2.properties.StringProperty)), + ]) + class NewObs(): + pass + assert "is named like an object reference list property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value) + + +def test_custom_observable_object_invalid_valid_refs(): + @stix2.observables.CustomObservable('x-new-obs', [ + ('property1', stix2.properties.StringProperty(required=True)), + ('property_ref', stix2.properties.ObjectReferenceProperty(valid_types='email-addr')), + ]) + class NewObs(): + pass + + with pytest.raises(Exception) as excinfo: + NewObs(_valid_refs=['1'], + property1='something', + property_ref='1') + assert "must be created with _valid_refs as a dict, not a list" in str(excinfo.value) def test_custom_no_properties_raises_exception(): @@ -154,12 +224,34 @@ def test_parse_custom_observable_object(): assert nt.property1 == 'something' +def test_parse_unregistered_custom_observable_object(): + nt_string = """{ + "type": "x-foobar-observable", + "property1": "something" + }""" + + with pytest.raises(stix2.exceptions.ParseError) as excinfo: + stix2.parse_observable(nt_string) + assert "Can't parse unknown observable type" in str(excinfo.value) + + +def test_parse_invalid_custom_observable_object(): + nt_string = """{ + "property1": "something" + }""" + + with pytest.raises(stix2.exceptions.ParseError) as excinfo: + stix2.parse_observable(nt_string) + assert "Can't parse observable with no 'type' property" in str(excinfo.value) + + def test_observable_custom_property(): - with pytest.raises(ValueError): + with pytest.raises(ValueError) as excinfo: NewObservable( property1='something', custom_properties="foobar", ) + assert "'custom_properties' must be a dictionary" in str(excinfo.value) no = NewObservable( property1='something', @@ -171,11 +263,13 @@ def test_observable_custom_property(): def test_observable_custom_property_invalid(): - with pytest.raises(stix2.exceptions.ExtraPropertiesError): + with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo: NewObservable( property1='something', x_foo="bar", ) + assert excinfo.value.properties == ['x_foo'] + assert "Unexpected properties for" in str(excinfo.value) def test_observable_custom_property_allowed(): @@ -197,3 +291,107 @@ def test_observed_data_with_custom_observable_object(): allow_custom=True, ) assert ob_data.objects['0'].property1 == 'something' + + +@stix2.observables.CustomExtension(stix2.DomainName, 'x-new-ext', { + 'property1': stix2.properties.StringProperty(required=True), + 'property2': stix2.properties.IntegerProperty(), +}) +class NewExtension(): + def __init__(self, property2=None, **kwargs): + if property2 and property2 < 10: + raise ValueError("'property2' is too small.") + + +def test_custom_extension(): + ext = NewExtension(property1='something') + assert ext.property1 == 'something' + + with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo: + NewExtension(property2=42) + assert excinfo.value.properties == ['property1'] + assert str(excinfo.value) == "No values for required properties for _Custom: (property1)." + + with pytest.raises(ValueError) as excinfo: + NewExtension(property1='something', property2=4) + assert str(excinfo.value) == "'property2' is too small." + + +def test_custom_extension_wrong_observable_type(): + ext = NewExtension(property1='something') + with pytest.raises(ValueError) as excinfo: + stix2.File(name="abc.txt", + extensions={ + "ntfs-ext": ext, + }) + + assert 'Cannot determine extension type' in excinfo.value.reason + + +def test_custom_extension_invalid_observable(): + # These extensions are being applied to improperly-created Observables. + # The Observable classes should have been created with the CustomObservable decorator. + class Foo(object): + pass + with pytest.raises(ValueError) as excinfo: + @stix2.observables.CustomExtension(Foo, 'x-new-ext', { + 'property1': stix2.properties.StringProperty(required=True), + }) + class FooExtension(): + pass # pragma: no cover + assert str(excinfo.value) == "'observable' must be a valid Observable class!" + + class Bar(stix2.observables._Observable): + pass + with pytest.raises(ValueError) as excinfo: + @stix2.observables.CustomExtension(Bar, 'x-new-ext', { + 'property1': stix2.properties.StringProperty(required=True), + }) + class BarExtension(): + pass + assert "Unknown observable type" in str(excinfo.value) + assert "Custom observables must be created with the @CustomObservable decorator." in str(excinfo.value) + + class Baz(stix2.observables._Observable): + _type = 'Baz' + with pytest.raises(ValueError) as excinfo: + @stix2.observables.CustomExtension(Baz, 'x-new-ext', { + 'property1': stix2.properties.StringProperty(required=True), + }) + class BazExtension(): + pass + assert "Unknown observable type" in str(excinfo.value) + assert "Custom observables must be created with the @CustomObservable decorator." in str(excinfo.value) + + +def test_parse_observable_with_custom_extension(): + input_str = """{ + "type": "domain-name", + "value": "example.com", + "extensions": { + "x-new-ext": { + "property1": "foo", + "property2": 12 + } + } + }""" + + parsed = stix2.parse_observable(input_str) + assert parsed.extensions['x-new-ext'].property2 == 12 + + +def test_parse_observable_with_unregistered_custom_extension(): + input_str = """{ + "type": "domain-name", + "value": "example.com", + "extensions": { + "x-foobar-ext": { + "property1": "foo", + "property2": 12 + } + } + }""" + + with pytest.raises(ValueError) as excinfo: + stix2.parse_observable(input_str) + assert "Can't parse Unknown extension type" in str(excinfo.value) diff --git a/stix2/test/test_observed_data.py b/stix2/test/test_observed_data.py index fdc66ee..0a12b3e 100644 --- a/stix2/test/test_observed_data.py +++ b/stix2/test/test_observed_data.py @@ -127,6 +127,42 @@ def test_observed_data_example_with_bad_refs(): assert excinfo.value.reason == "Invalid object reference for 'Directory:contains_refs': '2' is not a valid object in local scope" +def test_observed_data_example_with_non_dictionary(): + with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo: + stix2.ObservedData( + id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", + created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + created="2016-04-06T19:58:16.000Z", + modified="2016-04-06T19:58:16.000Z", + first_observed="2015-12-21T19:00:00Z", + last_observed="2015-12-21T19:00:00Z", + number_observed=50, + objects="file: foo.exe", + ) + + assert excinfo.value.cls == stix2.ObservedData + assert excinfo.value.prop_name == "objects" + assert 'must contain a dictionary' in excinfo.value.reason + + +def test_observed_data_example_with_empty_dictionary(): + with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo: + stix2.ObservedData( + id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", + created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + created="2016-04-06T19:58:16.000Z", + modified="2016-04-06T19:58:16.000Z", + first_observed="2015-12-21T19:00:00Z", + last_observed="2015-12-21T19:00:00Z", + number_observed=50, + objects={}, + ) + + assert excinfo.value.cls == stix2.ObservedData + assert excinfo.value.prop_name == "objects" + assert 'must contain a non-empty dictionary' in excinfo.value.reason + + @pytest.mark.parametrize("data", [ EXPECTED, { @@ -206,7 +242,7 @@ def test_artifact_example_dependency_error(): stix2.Artifact(url="http://example.com/sirvizio.exe") assert excinfo.value.dependencies == [("hashes", "url")] - assert str(excinfo.value) == "The property dependencies for Artifact: (hashes, url) are not met." + assert str(excinfo.value) == "The property dependencies for Artifact: (hashes) are not met." @pytest.mark.parametrize("data", [ @@ -419,6 +455,8 @@ def test_parse_email_message_with_at_least_one_error(data): assert excinfo.value.cls == stix2.EmailMIMEComponent assert excinfo.value.properties == ["body", "body_raw_ref"] + assert "At least one of the" in str(excinfo.value) + assert "must be populated" in str(excinfo.value) @pytest.mark.parametrize("data", [ @@ -558,7 +596,7 @@ def test_artifact_mutual_exclusion_error(): assert excinfo.value.cls == stix2.Artifact assert excinfo.value.properties == ["payload_bin", "url"] - assert str(excinfo.value) == "The (payload_bin, url) properties for Artifact are mutually exclusive." + assert 'are mutually exclusive' in str(excinfo.value) def test_directory_example(): @@ -804,6 +842,8 @@ def test_file_example_encryption_error(): assert excinfo.value.cls == stix2.File assert excinfo.value.dependencies == [("is_encrypted", "encryption_algorithm")] + assert "property dependencies" in str(excinfo.value) + assert "are not met" in str(excinfo.value) with pytest.raises(stix2.exceptions.DependentPropertiesError) as excinfo: stix2.File(name="qwerty.dll", diff --git a/stix2/test/test_properties.py b/stix2/test/test_properties.py index db1c143..7d03b9e 100644 --- a/stix2/test/test_properties.py +++ b/stix2/test/test_properties.py @@ -5,10 +5,10 @@ from stix2.exceptions import AtLeastOnePropertyError, DictionaryKeyError from stix2.observables import EmailMIMEComponent, ExtensionsProperty from stix2.properties import (BinaryProperty, BooleanProperty, DictionaryProperty, EmbeddedObjectProperty, - EnumProperty, HashesProperty, HexProperty, - IDProperty, IntegerProperty, ListProperty, - Property, ReferenceProperty, StringProperty, - TimestampProperty, TypeProperty) + EnumProperty, FloatProperty, HashesProperty, + HexProperty, IDProperty, IntegerProperty, + ListProperty, Property, ReferenceProperty, + StringProperty, TimestampProperty, TypeProperty) from .constants import FAKE_TIME @@ -119,6 +119,27 @@ def test_integer_property_invalid(value): int_prop.clean(value) +@pytest.mark.parametrize("value", [ + 2, + -1, + 3.14, + False, +]) +def test_float_property_valid(value): + int_prop = FloatProperty() + assert int_prop.clean(value) is not None + + +@pytest.mark.parametrize("value", [ + "something", + StringProperty(), +]) +def test_float_property_invalid(value): + int_prop = FloatProperty() + with pytest.raises(ValueError): + int_prop.clean(value) + + @pytest.mark.parametrize("value", [ True, False, @@ -215,7 +236,7 @@ def test_dictionary_property_valid(d): [{'Hey!': 'something'}, "Invalid dictionary key Hey!: (contains characters other thanlowercase a-z, " "uppercase A-Z, numerals 0-9, hyphen (-), or underscore (_))."], ]) -def test_dictionary_property_invalid(d): +def test_dictionary_property_invalid_key(d): dict_prop = DictionaryProperty() with pytest.raises(DictionaryKeyError) as excinfo: @@ -224,6 +245,26 @@ def test_dictionary_property_invalid(d): assert str(excinfo.value) == d[1] +@pytest.mark.parametrize("d", [ + ({}, "The dictionary property must contain a non-empty dictionary"), + # TODO: This error message could be made more helpful. The error is caused + # because `json.loads()` doesn't like the *single* quotes around the key + # name, even though they are valid in a Python dictionary. While technically + # accurate (a string is not a dictionary), if we want to be able to load + # string-encoded "dictionaries" that are, we need a better error message + # or an alternative to `json.loads()` ... and preferably *not* `eval()`. :-) + # Changing the following to `'{"description": "something"}'` does not cause + # any ValueError to be raised. + ("{'description': 'something'}", "The dictionary property must contain a dictionary"), +]) +def test_dictionary_property_invalid(d): + dict_prop = DictionaryProperty() + + with pytest.raises(ValueError) as excinfo: + dict_prop.clean(d[0]) + assert str(excinfo.value) == d[1] + + @pytest.mark.parametrize("value", [ {"sha256": "6db12788c37247f2316052e142f42f4b259d6561751e5f401a1ae2a6df9c674b"}, [('MD5', '2dfb1bcc980200c6706feee399d41b3f'), ('RIPEMD-160', 'b3a8cd8a27c90af79b3c81754f267780f443dfef')], @@ -257,10 +298,18 @@ def test_embedded_property(): emb_prop.clean("string") -def test_enum_property(): - enum_prop = EnumProperty(['a', 'b', 'c']) +@pytest.mark.parametrize("value", [ + ['a', 'b', 'c'], + ('a', 'b', 'c'), + 'b', +]) +def test_enum_property_valid(value): + enum_prop = EnumProperty(value) assert enum_prop.clean('b') + +def test_enum_property_invalid(): + enum_prop = EnumProperty(['a', 'b', 'c']) with pytest.raises(ValueError): enum_prop.clean('z') diff --git a/stix2/test/test_versioning.py b/stix2/test/test_versioning.py index 8506d16..453abc0 100644 --- a/stix2/test/test_versioning.py +++ b/stix2/test/test_versioning.py @@ -169,6 +169,7 @@ def test_versioning_error_new_version_of_revoked(): with pytest.raises(stix2.exceptions.RevokeError) as excinfo: campaign_v2.new_version(name="barney") + assert str(excinfo.value) == "Cannot create a new version of a revoked object." assert excinfo.value.called_by == "new_version" assert str(excinfo.value) == "Cannot create a new version of a revoked object." @@ -188,6 +189,7 @@ def test_versioning_error_revoke_of_revoked(): with pytest.raises(stix2.exceptions.RevokeError) as excinfo: campaign_v2.revoke() + assert str(excinfo.value) == "Cannot revoke an already revoked object." assert excinfo.value.called_by == "revoke" assert str(excinfo.value) == "Cannot revoke an already revoked object."