Merge pull request #47 from oasis-open/custom
Add support for custom extensions on Cyber Observablesstix2.1
commit
7bc6a60388
|
@ -205,18 +205,14 @@ class _Observable(_STIXBase):
|
|||
try:
|
||||
allowed_types = prop.contained.valid_types
|
||||
except AttributeError:
|
||||
try:
|
||||
allowed_types = prop.valid_types
|
||||
except AttributeError:
|
||||
raise ValueError("'%s' is named like an object reference property but "
|
||||
"is not an ObjectReferenceProperty or a ListProperty "
|
||||
"containing ObjectReferenceProperty." % prop_name)
|
||||
allowed_types = prop.valid_types
|
||||
|
||||
try:
|
||||
ref_type = self._STIXBase__valid_refs[ref]
|
||||
except TypeError:
|
||||
raise ValueError("'%s' must be created with _valid_refs as a dict, not a list." % self.__class__.__name__)
|
||||
|
||||
if allowed_types:
|
||||
try:
|
||||
ref_type = self._STIXBase__valid_refs[ref]
|
||||
except TypeError:
|
||||
raise ValueError("'%s' must be created with _valid_refs as a dict, not a list." % self.__class__.__name__)
|
||||
if ref_type not in allowed_types:
|
||||
raise InvalidObjRefError(self.__class__, prop_name, "object reference '%s' is of an invalid type '%s'" % (ref, ref_type))
|
||||
|
||||
|
|
|
@ -121,7 +121,7 @@ class DependentPropertiesError(STIXError, TypeError):
|
|||
def __str__(self):
|
||||
msg = "The property dependencies for {0}: ({1}) are not met."
|
||||
return msg.format(self.cls.__name__,
|
||||
", ".join(x for x in self.dependencies))
|
||||
", ".join(x for x, y in self.dependencies))
|
||||
|
||||
|
||||
class AtLeastOnePropertyError(STIXError, TypeError):
|
||||
|
|
|
@ -24,16 +24,13 @@ class ObservableProperty(Property):
|
|||
except ValueError:
|
||||
raise ValueError("The observable property must contain a dictionary")
|
||||
if dictified == {}:
|
||||
raise ValueError("The dictionary property must contain a non-empty dictionary")
|
||||
raise ValueError("The observable property must contain a non-empty dictionary")
|
||||
|
||||
valid_refs = dict((k, v['type']) for (k, v) in dictified.items())
|
||||
|
||||
# from .__init__ import parse_observable # avoid circular import
|
||||
for key, obj in dictified.items():
|
||||
parsed_obj = parse_observable(obj, valid_refs)
|
||||
if not issubclass(type(parsed_obj), _Observable):
|
||||
raise ValueError("Objects in an observable property must be "
|
||||
"Cyber Observable Objects")
|
||||
dictified[key] = parsed_obj
|
||||
|
||||
return dictified
|
||||
|
@ -53,7 +50,7 @@ class ExtensionsProperty(DictionaryProperty):
|
|||
except ValueError:
|
||||
raise ValueError("The extensions property must contain a dictionary")
|
||||
if dictified == {}:
|
||||
raise ValueError("The dictionary property must contain a non-empty dictionary")
|
||||
raise ValueError("The extensions property must contain a non-empty dictionary")
|
||||
|
||||
if self.enclosing_type in EXT_MAP:
|
||||
specific_type_map = EXT_MAP[self.enclosing_type]
|
||||
|
@ -69,7 +66,7 @@ class ExtensionsProperty(DictionaryProperty):
|
|||
else:
|
||||
raise ValueError("The key used in the extensions dictionary is not an extension type name")
|
||||
else:
|
||||
raise ValueError("The enclosing type has no extensions defined")
|
||||
raise ValueError("The enclosing type '%s' has no extensions defined" % self.enclosing_type)
|
||||
return dictified
|
||||
|
||||
|
||||
|
@ -77,6 +74,7 @@ class Artifact(_Observable):
|
|||
_type = 'artifact'
|
||||
_properties = {
|
||||
'type': TypeProperty(_type),
|
||||
'extensions': ExtensionsProperty(enclosing_type=_type),
|
||||
'mime_type': StringProperty(),
|
||||
'payload_bin': BinaryProperty(),
|
||||
'url': StringProperty(),
|
||||
|
@ -93,6 +91,7 @@ class AutonomousSystem(_Observable):
|
|||
_type = 'autonomous-system'
|
||||
_properties = {
|
||||
'type': TypeProperty(_type),
|
||||
'extensions': ExtensionsProperty(enclosing_type=_type),
|
||||
'number': IntegerProperty(),
|
||||
'name': StringProperty(),
|
||||
'rir': StringProperty(),
|
||||
|
@ -103,6 +102,7 @@ class Directory(_Observable):
|
|||
_type = 'directory'
|
||||
_properties = {
|
||||
'type': TypeProperty(_type),
|
||||
'extensions': ExtensionsProperty(enclosing_type=_type),
|
||||
'path': StringProperty(required=True),
|
||||
'path_enc': StringProperty(),
|
||||
# these are not the created/modified timestamps of the object itself
|
||||
|
@ -117,6 +117,7 @@ class DomainName(_Observable):
|
|||
_type = 'domain-name'
|
||||
_properties = {
|
||||
'type': TypeProperty(_type),
|
||||
'extensions': ExtensionsProperty(enclosing_type=_type),
|
||||
'value': StringProperty(required=True),
|
||||
'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name'])),
|
||||
}
|
||||
|
@ -126,6 +127,7 @@ class EmailAddress(_Observable):
|
|||
_type = 'email-addr'
|
||||
_properties = {
|
||||
'type': TypeProperty(_type),
|
||||
'extensions': ExtensionsProperty(enclosing_type=_type),
|
||||
'value': StringProperty(required=True),
|
||||
'display_name': StringProperty(),
|
||||
'belongs_to_ref': ObjectReferenceProperty(valid_types='user-account'),
|
||||
|
@ -149,6 +151,7 @@ class EmailMessage(_Observable):
|
|||
_type = 'email-message'
|
||||
_properties = {
|
||||
'type': TypeProperty(_type),
|
||||
'extensions': ExtensionsProperty(enclosing_type=_type),
|
||||
'is_multipart': BooleanProperty(required=True),
|
||||
'date': TimestampProperty(),
|
||||
'content_type': StringProperty(),
|
||||
|
@ -174,6 +177,7 @@ class EmailMessage(_Observable):
|
|||
|
||||
|
||||
class ArchiveExt(_Extension):
|
||||
_type = 'archive-ext'
|
||||
_properties = {
|
||||
'contains_refs': ListProperty(ObjectReferenceProperty(valid_types='file'), required=True),
|
||||
'version': StringProperty(),
|
||||
|
@ -190,6 +194,7 @@ class AlternateDataStream(_STIXBase):
|
|||
|
||||
|
||||
class NTFSExt(_Extension):
|
||||
_type = 'ntfs-ext'
|
||||
_properties = {
|
||||
'sid': StringProperty(),
|
||||
'alternate_data_streams': ListProperty(EmbeddedObjectProperty(type=AlternateDataStream)),
|
||||
|
@ -197,6 +202,7 @@ class NTFSExt(_Extension):
|
|||
|
||||
|
||||
class PDFExt(_Extension):
|
||||
_type = 'pdf-ext'
|
||||
_properties = {
|
||||
'version': StringProperty(),
|
||||
'is_optimized': BooleanProperty(),
|
||||
|
@ -207,6 +213,7 @@ class PDFExt(_Extension):
|
|||
|
||||
|
||||
class RasterImageExt(_Extension):
|
||||
_type = 'raster-image-ext'
|
||||
_properties = {
|
||||
'image_height': IntegerProperty(),
|
||||
'image_weight': IntegerProperty(),
|
||||
|
@ -266,6 +273,7 @@ class WindowsPESection(_STIXBase):
|
|||
|
||||
|
||||
class WindowsPEBinaryExt(_Extension):
|
||||
_type = 'windows-pebinary-ext'
|
||||
_properties = {
|
||||
'pe_type': StringProperty(required=True), # open_vocab
|
||||
'imphash': StringProperty(),
|
||||
|
@ -315,6 +323,7 @@ class IPv4Address(_Observable):
|
|||
_type = 'ipv4-addr'
|
||||
_properties = {
|
||||
'type': TypeProperty(_type),
|
||||
'extensions': ExtensionsProperty(enclosing_type=_type),
|
||||
'value': StringProperty(required=True),
|
||||
'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types='mac-addr')),
|
||||
'belongs_to_refs': ListProperty(ObjectReferenceProperty(valid_types='autonomous-system')),
|
||||
|
@ -325,6 +334,7 @@ class IPv6Address(_Observable):
|
|||
_type = 'ipv6-addr'
|
||||
_properties = {
|
||||
'type': TypeProperty(_type),
|
||||
'extensions': ExtensionsProperty(enclosing_type=_type),
|
||||
'value': StringProperty(required=True),
|
||||
'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types='mac-addr')),
|
||||
'belongs_to_refs': ListProperty(ObjectReferenceProperty(valid_types='autonomous-system')),
|
||||
|
@ -335,6 +345,7 @@ class MACAddress(_Observable):
|
|||
_type = 'mac-addr'
|
||||
_properties = {
|
||||
'type': TypeProperty(_type),
|
||||
'extensions': ExtensionsProperty(enclosing_type=_type),
|
||||
'value': StringProperty(required=True),
|
||||
}
|
||||
|
||||
|
@ -343,11 +354,13 @@ class Mutex(_Observable):
|
|||
_type = 'mutex'
|
||||
_properties = {
|
||||
'type': TypeProperty(_type),
|
||||
'extensions': ExtensionsProperty(enclosing_type=_type),
|
||||
'name': StringProperty(),
|
||||
}
|
||||
|
||||
|
||||
class HTTPRequestExt(_Extension):
|
||||
_type = 'http-request-ext'
|
||||
_properties = {
|
||||
'request_method': StringProperty(required=True),
|
||||
'request_value': StringProperty(required=True),
|
||||
|
@ -359,6 +372,7 @@ class HTTPRequestExt(_Extension):
|
|||
|
||||
|
||||
class ICMPExt(_Extension):
|
||||
_type = 'icmp-ext'
|
||||
_properties = {
|
||||
'icmp_type_hex': HexProperty(required=True),
|
||||
'icmp_code_hex': HexProperty(required=True),
|
||||
|
@ -366,6 +380,7 @@ class ICMPExt(_Extension):
|
|||
|
||||
|
||||
class SocketExt(_Extension):
|
||||
_type = 'socket-ext'
|
||||
_properties = {
|
||||
'address_family': EnumProperty([
|
||||
"AF_UNSPEC",
|
||||
|
@ -399,6 +414,7 @@ class SocketExt(_Extension):
|
|||
|
||||
|
||||
class TCPExt(_Extension):
|
||||
_type = 'tcp-ext'
|
||||
_properties = {
|
||||
'src_flags_hex': HexProperty(),
|
||||
'dst_flags_hex': HexProperty(),
|
||||
|
@ -435,6 +451,7 @@ class NetworkTraffic(_Observable):
|
|||
|
||||
|
||||
class WindowsProcessExt(_Extension):
|
||||
_type = 'windows-process-ext'
|
||||
_properties = {
|
||||
'aslr_enabled': BooleanProperty(),
|
||||
'dep_enabled': BooleanProperty(),
|
||||
|
@ -446,6 +463,7 @@ class WindowsProcessExt(_Extension):
|
|||
|
||||
|
||||
class WindowsServiceExt(_Extension):
|
||||
_type = 'windows-service-ext'
|
||||
_properties = {
|
||||
'service_name': StringProperty(required=True),
|
||||
'descriptions': ListProperty(StringProperty),
|
||||
|
@ -517,6 +535,7 @@ class Software(_Observable):
|
|||
_type = 'software'
|
||||
_properties = {
|
||||
'type': TypeProperty(_type),
|
||||
'extensions': ExtensionsProperty(enclosing_type=_type),
|
||||
'name': StringProperty(required=True),
|
||||
'cpe': StringProperty(),
|
||||
'languages': ListProperty(StringProperty),
|
||||
|
@ -529,11 +548,13 @@ class URL(_Observable):
|
|||
_type = 'url'
|
||||
_properties = {
|
||||
'type': TypeProperty(_type),
|
||||
'extensions': ExtensionsProperty(enclosing_type=_type),
|
||||
'value': StringProperty(required=True),
|
||||
}
|
||||
|
||||
|
||||
class UNIXAccountExt(_Extension):
|
||||
_type = 'unix-account-ext'
|
||||
_properties = {
|
||||
'gid': IntegerProperty(),
|
||||
'groups': ListProperty(StringProperty),
|
||||
|
@ -590,6 +611,7 @@ class WindowsRegistryKey(_Observable):
|
|||
_type = 'windows-registry-key'
|
||||
_properties = {
|
||||
'type': TypeProperty(_type),
|
||||
'extensions': ExtensionsProperty(enclosing_type=_type),
|
||||
'key': StringProperty(required=True),
|
||||
'values': ListProperty(EmbeddedObjectProperty(type=WindowsRegistryValueType)),
|
||||
# this is not the modified timestamps of the object itself
|
||||
|
@ -630,6 +652,7 @@ class X509Certificate(_Observable):
|
|||
_type = 'x509-certificate'
|
||||
_properties = {
|
||||
'type': TypeProperty(_type),
|
||||
'extensions': ExtensionsProperty(enclosing_type=_type),
|
||||
'is_self_signed': BooleanProperty(),
|
||||
'hashes': HashesProperty(),
|
||||
'version': StringProperty(),
|
||||
|
@ -667,36 +690,28 @@ OBJ_MAP_OBSERVABLE = {
|
|||
'x509-certificate': X509Certificate,
|
||||
}
|
||||
|
||||
EXT_MAP_FILE = {
|
||||
'archive-ext': ArchiveExt,
|
||||
'ntfs-ext': NTFSExt,
|
||||
'pdf-ext': PDFExt,
|
||||
'raster-image-ext': RasterImageExt,
|
||||
'windows-pebinary-ext': WindowsPEBinaryExt
|
||||
}
|
||||
|
||||
EXT_MAP_NETWORK_TRAFFIC = {
|
||||
'http-request-ext': HTTPRequestExt,
|
||||
'icmp-ext': ICMPExt,
|
||||
'socket-ext': SocketExt,
|
||||
'tcp-ext': TCPExt,
|
||||
}
|
||||
|
||||
EXT_MAP_PROCESS = {
|
||||
'windows-process-ext': WindowsProcessExt,
|
||||
'windows-service-ext': WindowsServiceExt,
|
||||
}
|
||||
|
||||
EXT_MAP_USER_ACCOUNT = {
|
||||
'unix-account-ext': UNIXAccountExt,
|
||||
}
|
||||
|
||||
EXT_MAP = {
|
||||
'file': EXT_MAP_FILE,
|
||||
'network-traffic': EXT_MAP_NETWORK_TRAFFIC,
|
||||
'process': EXT_MAP_PROCESS,
|
||||
'user-account': EXT_MAP_USER_ACCOUNT,
|
||||
|
||||
'file': {
|
||||
'archive-ext': ArchiveExt,
|
||||
'ntfs-ext': NTFSExt,
|
||||
'pdf-ext': PDFExt,
|
||||
'raster-image-ext': RasterImageExt,
|
||||
'windows-pebinary-ext': WindowsPEBinaryExt
|
||||
},
|
||||
'network-traffic': {
|
||||
'http-request-ext': HTTPRequestExt,
|
||||
'icmp-ext': ICMPExt,
|
||||
'socket-ext': SocketExt,
|
||||
'tcp-ext': TCPExt,
|
||||
},
|
||||
'process': {
|
||||
'windows-process-ext': WindowsProcessExt,
|
||||
'windows-service-ext': WindowsServiceExt,
|
||||
},
|
||||
'user-account': {
|
||||
'unix-account-ext': UNIXAccountExt,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
|
@ -716,16 +731,17 @@ def parse_observable(data, _valid_refs=[], allow_custom=False):
|
|||
obj['_valid_refs'] = _valid_refs
|
||||
|
||||
if 'type' not in obj:
|
||||
raise ParseError("Can't parse object with no 'type' property: %s" % str(obj))
|
||||
raise ParseError("Can't parse observable with no 'type' property: %s" % str(obj))
|
||||
try:
|
||||
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
|
||||
except KeyError:
|
||||
raise ParseError("Can't parse unknown object type '%s'! For custom observables, use the CustomObservable decorator." % obj['type'])
|
||||
raise ParseError("Can't parse unknown observable type '%s'! For custom observables, "
|
||||
"use the CustomObservable decorator." % obj['type'])
|
||||
|
||||
if 'extensions' in obj and obj['type'] in EXT_MAP:
|
||||
for name, ext in obj['extensions'].items():
|
||||
if name not in EXT_MAP[obj['type']]:
|
||||
raise ParseError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type']))
|
||||
raise ParseError("Can't parse Unknown extension type '%s' for observable type '%s'!" % (name, obj['type']))
|
||||
ext_class = EXT_MAP[obj['type']][name]
|
||||
obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name])
|
||||
|
||||
|
@ -750,6 +766,16 @@ def CustomObservable(type='x-custom-observable', properties={}):
|
|||
_properties = {
|
||||
'type': TypeProperty(_type),
|
||||
}
|
||||
# Check properties ending in "_ref/s" are ObjectReferenceProperties
|
||||
for prop_name, prop in properties.items():
|
||||
if prop_name.endswith('_ref') and not isinstance(prop, ObjectReferenceProperty):
|
||||
raise ValueError("'%s' is named like an object reference property but "
|
||||
"is not an ObjectReferenceProperty." % prop_name)
|
||||
elif (prop_name.endswith('_refs') and (not isinstance(prop, ListProperty)
|
||||
or not isinstance(prop.contained, ObjectReferenceProperty))):
|
||||
raise ValueError("'%s' is named like an object reference list property but "
|
||||
"is not a ListProperty containing ObjectReferenceProperty." % prop_name)
|
||||
|
||||
_properties.update(properties)
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
|
@ -760,3 +786,50 @@ def CustomObservable(type='x-custom-observable', properties={}):
|
|||
return _Custom
|
||||
|
||||
return custom_builder
|
||||
|
||||
|
||||
def _register_extension(observable, new_extension):
|
||||
"""Register a custom extension to a STIX Cyber Observable type.
|
||||
"""
|
||||
|
||||
try:
|
||||
observable_type = observable._type
|
||||
except AttributeError:
|
||||
raise ValueError("Unknown observable type. Custom observables must be "
|
||||
"created with the @CustomObservable decorator.")
|
||||
|
||||
try:
|
||||
EXT_MAP[observable_type][new_extension._type] = new_extension
|
||||
except KeyError:
|
||||
if observable_type not in OBJ_MAP_OBSERVABLE:
|
||||
raise ValueError("Unknown observable type '%s'. Custom observables "
|
||||
"must be created with the @CustomObservable decorator."
|
||||
% observable_type)
|
||||
else:
|
||||
EXT_MAP[observable_type] = {new_extension._type: new_extension}
|
||||
|
||||
|
||||
def CustomExtension(observable=None, type='x-custom-observable', properties={}):
|
||||
"""Decorator for custom extensions to STIX Cyber Observables
|
||||
"""
|
||||
|
||||
if not observable or not issubclass(observable, _Observable):
|
||||
raise ValueError("'observable' must be a valid Observable class!")
|
||||
|
||||
def custom_builder(cls):
|
||||
|
||||
class _Custom(cls, _Extension):
|
||||
_type = type
|
||||
_properties = {
|
||||
'extensions': ExtensionsProperty(enclosing_type=_type),
|
||||
}
|
||||
_properties.update(properties)
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
_Extension.__init__(self, **kwargs)
|
||||
cls.__init__(self, **kwargs)
|
||||
|
||||
_register_extension(observable, _Custom)
|
||||
return _Custom
|
||||
|
||||
return custom_builder
|
||||
|
|
|
@ -118,6 +118,20 @@ def test_create_bundle_with_arg_listarg_and_kwarg(indicator, malware, relationsh
|
|||
assert str(bundle) == EXPECTED_BUNDLE
|
||||
|
||||
|
||||
def test_create_bundle_invalid(indicator, malware, relationship):
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
stix2.Bundle(objects=[1])
|
||||
assert excinfo.value.reason == "This property may only contain a dictionary or object"
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
stix2.Bundle(objects=[{}])
|
||||
assert excinfo.value.reason == "This property may only contain a non-empty dictionary or object"
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
stix2.Bundle(objects=[{'type': 'bundle'}])
|
||||
assert excinfo.value.reason == 'This property may not contain a Bundle object'
|
||||
|
||||
|
||||
def test_parse_bundle():
|
||||
bundle = stix2.parse(EXPECTED_BUNDLE)
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ from .constants import FAKE_TIME
|
|||
|
||||
|
||||
def test_identity_custom_property():
|
||||
with pytest.raises(ValueError):
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
stix2.Identity(
|
||||
id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c",
|
||||
created="2015-12-21T19:59:11Z",
|
||||
|
@ -15,6 +15,7 @@ def test_identity_custom_property():
|
|||
identity_class="individual",
|
||||
custom_properties="foobar",
|
||||
)
|
||||
assert str(excinfo.value) == "'custom_properties' must be a dictionary"
|
||||
|
||||
identity = stix2.Identity(
|
||||
id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c",
|
||||
|
@ -31,7 +32,7 @@ def test_identity_custom_property():
|
|||
|
||||
|
||||
def test_identity_custom_property_invalid():
|
||||
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
|
||||
with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo:
|
||||
stix2.Identity(
|
||||
id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c",
|
||||
created="2015-12-21T19:59:11Z",
|
||||
|
@ -40,6 +41,9 @@ def test_identity_custom_property_invalid():
|
|||
identity_class="individual",
|
||||
x_foo="bar",
|
||||
)
|
||||
assert excinfo.value.cls == stix2.Identity
|
||||
assert excinfo.value.properties == ['x_foo']
|
||||
assert "Unexpected properties for" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_identity_custom_property_allowed():
|
||||
|
@ -67,8 +71,11 @@ def test_identity_custom_property_allowed():
|
|||
}""",
|
||||
])
|
||||
def test_parse_identity_custom_property(data):
|
||||
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
|
||||
with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo:
|
||||
identity = stix2.parse(data)
|
||||
assert excinfo.value.cls == stix2.Identity
|
||||
assert excinfo.value.properties == ['foo']
|
||||
assert "Unexpected properties for" in str(excinfo.value)
|
||||
|
||||
identity = stix2.parse(data, allow_custom=True)
|
||||
assert identity.foo == "bar"
|
||||
|
@ -88,11 +95,13 @@ def test_custom_object_type():
|
|||
nt = NewType(property1='something')
|
||||
assert nt.property1 == 'something'
|
||||
|
||||
with pytest.raises(stix2.exceptions.MissingPropertiesError):
|
||||
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
|
||||
NewType(property2=42)
|
||||
assert "No values for required properties" in str(excinfo.value)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
NewType(property1='something', property2=4)
|
||||
assert "'property2' is too small." in str(excinfo.value)
|
||||
|
||||
|
||||
def test_parse_custom_object_type():
|
||||
|
@ -106,6 +115,19 @@ def test_parse_custom_object_type():
|
|||
assert nt.property1 == 'something'
|
||||
|
||||
|
||||
def test_parse_unregistered_custom_object_type():
|
||||
nt_string = """{
|
||||
"type": "x-foobar-observable",
|
||||
"created": "2015-12-21T19:59:11Z",
|
||||
"property1": "something"
|
||||
}"""
|
||||
|
||||
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
|
||||
stix2.parse(nt_string)
|
||||
assert "Can't parse unknown object type" in str(excinfo.value)
|
||||
assert "use the CustomObject decorator." in str(excinfo.value)
|
||||
|
||||
|
||||
@stix2.observables.CustomObservable('x-new-observable', {
|
||||
'property1': stix2.properties.StringProperty(required=True),
|
||||
'property2': stix2.properties.IntegerProperty(),
|
||||
|
@ -120,11 +142,59 @@ def test_custom_observable_object():
|
|||
no = NewObservable(property1='something')
|
||||
assert no.property1 == 'something'
|
||||
|
||||
with pytest.raises(stix2.exceptions.MissingPropertiesError):
|
||||
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
|
||||
NewObservable(property2=42)
|
||||
assert excinfo.value.properties == ['property1']
|
||||
assert "No values for required properties" in str(excinfo.value)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
NewObservable(property1='something', property2=4)
|
||||
assert "'property2' is too small." in str(excinfo.value)
|
||||
|
||||
|
||||
def test_custom_observable_object_invalid_ref_property():
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
@stix2.observables.CustomObservable('x-new-obs', {
|
||||
'property_ref': stix2.properties.StringProperty(),
|
||||
})
|
||||
class NewObs():
|
||||
pass
|
||||
assert "is named like an object reference property but is not an ObjectReferenceProperty" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_custom_observable_object_invalid_refs_property():
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
@stix2.observables.CustomObservable('x-new-obs', {
|
||||
'property_refs': stix2.properties.StringProperty(),
|
||||
})
|
||||
class NewObs():
|
||||
pass
|
||||
assert "is named like an object reference list property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_custom_observable_object_invalid_refs_list_property():
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
@stix2.observables.CustomObservable('x-new-obs', {
|
||||
'property_refs': stix2.properties.ListProperty(stix2.properties.StringProperty),
|
||||
})
|
||||
class NewObs():
|
||||
pass
|
||||
assert "is named like an object reference list property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_custom_observable_object_invalid_valid_refs():
|
||||
@stix2.observables.CustomObservable('x-new-obs', {
|
||||
'property1': stix2.properties.StringProperty(required=True),
|
||||
'property_ref': stix2.properties.ObjectReferenceProperty(valid_types='email-addr'),
|
||||
})
|
||||
class NewObs():
|
||||
pass
|
||||
|
||||
with pytest.raises(Exception) as excinfo:
|
||||
NewObs(_valid_refs=['1'],
|
||||
property1='something',
|
||||
property_ref='1')
|
||||
assert "must be created with _valid_refs as a dict, not a list" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_parse_custom_observable_object():
|
||||
|
@ -137,12 +207,34 @@ def test_parse_custom_observable_object():
|
|||
assert nt.property1 == 'something'
|
||||
|
||||
|
||||
def test_parse_unregistered_custom_observable_object():
|
||||
nt_string = """{
|
||||
"type": "x-foobar-observable",
|
||||
"property1": "something"
|
||||
}"""
|
||||
|
||||
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
|
||||
stix2.parse_observable(nt_string)
|
||||
assert "Can't parse unknown observable type" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_parse_invalid_custom_observable_object():
|
||||
nt_string = """{
|
||||
"property1": "something"
|
||||
}"""
|
||||
|
||||
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
|
||||
stix2.parse_observable(nt_string)
|
||||
assert "Can't parse observable with no 'type' property" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_observable_custom_property():
|
||||
with pytest.raises(ValueError):
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
NewObservable(
|
||||
property1='something',
|
||||
custom_properties="foobar",
|
||||
)
|
||||
assert "'custom_properties' must be a dictionary" in str(excinfo.value)
|
||||
|
||||
no = NewObservable(
|
||||
property1='something',
|
||||
|
@ -154,11 +246,13 @@ def test_observable_custom_property():
|
|||
|
||||
|
||||
def test_observable_custom_property_invalid():
|
||||
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
|
||||
with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo:
|
||||
NewObservable(
|
||||
property1='something',
|
||||
x_foo="bar",
|
||||
)
|
||||
assert excinfo.value.properties == ['x_foo']
|
||||
assert "Unexpected properties for" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_observable_custom_property_allowed():
|
||||
|
@ -180,3 +274,107 @@ def test_observed_data_with_custom_observable_object():
|
|||
allow_custom=True,
|
||||
)
|
||||
assert ob_data.objects['0'].property1 == 'something'
|
||||
|
||||
|
||||
@stix2.observables.CustomExtension(stix2.DomainName, 'x-new-ext', {
|
||||
'property1': stix2.properties.StringProperty(required=True),
|
||||
'property2': stix2.properties.IntegerProperty(),
|
||||
})
|
||||
class NewExtension():
|
||||
def __init__(self, property2=None, **kwargs):
|
||||
if property2 and property2 < 10:
|
||||
raise ValueError("'property2' is too small.")
|
||||
|
||||
|
||||
def test_custom_extension():
|
||||
ext = NewExtension(property1='something')
|
||||
assert ext.property1 == 'something'
|
||||
|
||||
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
|
||||
NewExtension(property2=42)
|
||||
assert excinfo.value.properties == ['property1']
|
||||
assert str(excinfo.value) == "No values for required properties for _Custom: (property1)."
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
NewExtension(property1='something', property2=4)
|
||||
assert str(excinfo.value) == "'property2' is too small."
|
||||
|
||||
|
||||
def test_custom_extension_wrong_observable_type():
|
||||
ext = NewExtension(property1='something')
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
stix2.File(name="abc.txt",
|
||||
extensions={
|
||||
"ntfs-ext": ext,
|
||||
})
|
||||
|
||||
assert 'Cannot determine extension type' in excinfo.value.reason
|
||||
|
||||
|
||||
def test_custom_extension_invalid_observable():
|
||||
# These extensions are being applied to improperly-created Observables.
|
||||
# The Observable classes should have been created with the CustomObservable decorator.
|
||||
class Foo(object):
|
||||
pass
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
@stix2.observables.CustomExtension(Foo, 'x-new-ext', {
|
||||
'property1': stix2.properties.StringProperty(required=True),
|
||||
})
|
||||
class FooExtension():
|
||||
pass # pragma: no cover
|
||||
assert str(excinfo.value) == "'observable' must be a valid Observable class!"
|
||||
|
||||
class Bar(stix2.observables._Observable):
|
||||
pass
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
@stix2.observables.CustomExtension(Bar, 'x-new-ext', {
|
||||
'property1': stix2.properties.StringProperty(required=True),
|
||||
})
|
||||
class BarExtension():
|
||||
pass
|
||||
assert "Unknown observable type" in str(excinfo.value)
|
||||
assert "Custom observables must be created with the @CustomObservable decorator." in str(excinfo.value)
|
||||
|
||||
class Baz(stix2.observables._Observable):
|
||||
_type = 'Baz'
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
@stix2.observables.CustomExtension(Baz, 'x-new-ext', {
|
||||
'property1': stix2.properties.StringProperty(required=True),
|
||||
})
|
||||
class BazExtension():
|
||||
pass
|
||||
assert "Unknown observable type" in str(excinfo.value)
|
||||
assert "Custom observables must be created with the @CustomObservable decorator." in str(excinfo.value)
|
||||
|
||||
|
||||
def test_parse_observable_with_custom_extension():
|
||||
input_str = """{
|
||||
"type": "domain-name",
|
||||
"value": "example.com",
|
||||
"extensions": {
|
||||
"x-new-ext": {
|
||||
"property1": "foo",
|
||||
"property2": 12
|
||||
}
|
||||
}
|
||||
}"""
|
||||
|
||||
parsed = stix2.parse_observable(input_str)
|
||||
assert parsed.extensions['x-new-ext'].property2 == 12
|
||||
|
||||
|
||||
def test_parse_observable_with_unregistered_custom_extension():
|
||||
input_str = """{
|
||||
"type": "domain-name",
|
||||
"value": "example.com",
|
||||
"extensions": {
|
||||
"x-foobar-ext": {
|
||||
"property1": "foo",
|
||||
"property2": 12
|
||||
}
|
||||
}
|
||||
}"""
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
stix2.parse_observable(input_str)
|
||||
assert "Can't parse Unknown extension type" in str(excinfo.value)
|
||||
|
|
|
@ -125,6 +125,42 @@ def test_observed_data_example_with_bad_refs():
|
|||
assert excinfo.value.reason == "Invalid object reference for 'Directory:contains_refs': '2' is not a valid object in local scope"
|
||||
|
||||
|
||||
def test_observed_data_example_with_non_dictionary():
|
||||
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
|
||||
stix2.ObservedData(
|
||||
id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
|
||||
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
|
||||
created="2016-04-06T19:58:16.000Z",
|
||||
modified="2016-04-06T19:58:16.000Z",
|
||||
first_observed="2015-12-21T19:00:00Z",
|
||||
last_observed="2015-12-21T19:00:00Z",
|
||||
number_observed=50,
|
||||
objects="file: foo.exe",
|
||||
)
|
||||
|
||||
assert excinfo.value.cls == stix2.ObservedData
|
||||
assert excinfo.value.prop_name == "objects"
|
||||
assert 'must contain a dictionary' in excinfo.value.reason
|
||||
|
||||
|
||||
def test_observed_data_example_with_empty_dictionary():
|
||||
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
|
||||
stix2.ObservedData(
|
||||
id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
|
||||
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
|
||||
created="2016-04-06T19:58:16.000Z",
|
||||
modified="2016-04-06T19:58:16.000Z",
|
||||
first_observed="2015-12-21T19:00:00Z",
|
||||
last_observed="2015-12-21T19:00:00Z",
|
||||
number_observed=50,
|
||||
objects={},
|
||||
)
|
||||
|
||||
assert excinfo.value.cls == stix2.ObservedData
|
||||
assert excinfo.value.prop_name == "objects"
|
||||
assert 'must contain a non-empty dictionary' in excinfo.value.reason
|
||||
|
||||
|
||||
@pytest.mark.parametrize("data", [
|
||||
EXPECTED,
|
||||
{
|
||||
|
@ -416,6 +452,8 @@ def test_parse_email_message_with_at_least_one_error(data):
|
|||
|
||||
assert excinfo.value.cls == stix2.EmailMIMEComponent
|
||||
assert excinfo.value.properties == ["body", "body_raw_ref"]
|
||||
assert "At least one of the" in str(excinfo.value)
|
||||
assert "must be populated" in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("data", [
|
||||
|
@ -555,6 +593,7 @@ def test_artifact_mutual_exclusion_error():
|
|||
|
||||
assert excinfo.value.cls == stix2.Artifact
|
||||
assert excinfo.value.properties == ["payload_bin", "url"]
|
||||
assert 'are mutually exclusive' in str(excinfo.value)
|
||||
|
||||
|
||||
def test_directory_example():
|
||||
|
@ -800,6 +839,8 @@ def test_file_example_encryption_error():
|
|||
|
||||
assert excinfo.value.cls == stix2.File
|
||||
assert excinfo.value.dependencies == [("is_encrypted", "encryption_algorithm")]
|
||||
assert "property dependencies" in str(excinfo.value)
|
||||
assert "are not met" in str(excinfo.value)
|
||||
|
||||
with pytest.raises(stix2.exceptions.DependentPropertiesError) as excinfo:
|
||||
stix2.File(name="qwerty.dll",
|
||||
|
|
|
@ -5,10 +5,10 @@ from stix2.exceptions import AtLeastOnePropertyError, DictionaryKeyError
|
|||
from stix2.observables import EmailMIMEComponent, ExtensionsProperty
|
||||
from stix2.properties import (BinaryProperty, BooleanProperty,
|
||||
DictionaryProperty, EmbeddedObjectProperty,
|
||||
EnumProperty, HashesProperty, HexProperty,
|
||||
IDProperty, IntegerProperty, ListProperty,
|
||||
Property, ReferenceProperty, StringProperty,
|
||||
TimestampProperty, TypeProperty)
|
||||
EnumProperty, FloatProperty, HashesProperty,
|
||||
HexProperty, IDProperty, IntegerProperty,
|
||||
ListProperty, Property, ReferenceProperty,
|
||||
StringProperty, TimestampProperty, TypeProperty)
|
||||
|
||||
from .constants import FAKE_TIME
|
||||
|
||||
|
@ -119,6 +119,27 @@ def test_integer_property_invalid(value):
|
|||
int_prop.clean(value)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("value", [
|
||||
2,
|
||||
-1,
|
||||
3.14,
|
||||
False,
|
||||
])
|
||||
def test_float_property_valid(value):
|
||||
int_prop = FloatProperty()
|
||||
assert int_prop.clean(value) is not None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("value", [
|
||||
"something",
|
||||
StringProperty(),
|
||||
])
|
||||
def test_float_property_invalid(value):
|
||||
int_prop = FloatProperty()
|
||||
with pytest.raises(ValueError):
|
||||
int_prop.clean(value)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("value", [
|
||||
True,
|
||||
False,
|
||||
|
@ -210,10 +231,22 @@ def test_dictionary_property_valid(d):
|
|||
{'a'*300: 'something'},
|
||||
{'Hey!': 'something'},
|
||||
])
|
||||
def test_dictionary_property_invalid_key(d):
|
||||
dict_prop = DictionaryProperty()
|
||||
|
||||
with pytest.raises(DictionaryKeyError) as excinfo:
|
||||
dict_prop.clean(d)
|
||||
assert "Invalid dictionary key" in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("d", [
|
||||
{},
|
||||
"{'description': 'something'}",
|
||||
])
|
||||
def test_dictionary_property_invalid(d):
|
||||
dict_prop = DictionaryProperty()
|
||||
|
||||
with pytest.raises(DictionaryKeyError):
|
||||
with pytest.raises(ValueError):
|
||||
dict_prop.clean(d)
|
||||
|
||||
|
||||
|
@ -250,10 +283,18 @@ def test_embedded_property():
|
|||
emb_prop.clean("string")
|
||||
|
||||
|
||||
def test_enum_property():
|
||||
enum_prop = EnumProperty(['a', 'b', 'c'])
|
||||
@pytest.mark.parametrize("value", [
|
||||
['a', 'b', 'c'],
|
||||
('a', 'b', 'c'),
|
||||
'b',
|
||||
])
|
||||
def test_enum_property_valid(value):
|
||||
enum_prop = EnumProperty(value)
|
||||
assert enum_prop.clean('b')
|
||||
|
||||
|
||||
def test_enum_property_invalid():
|
||||
enum_prop = EnumProperty(['a', 'b', 'c'])
|
||||
with pytest.raises(ValueError):
|
||||
enum_prop.clean('z')
|
||||
|
||||
|
|
|
@ -160,6 +160,7 @@ def test_versioning_error_new_version_of_revoked():
|
|||
|
||||
with pytest.raises(stix2.exceptions.RevokeError) as excinfo:
|
||||
campaign_v2.new_version(name="barney")
|
||||
assert str(excinfo.value) == "Cannot create a new version of a revoked object."
|
||||
|
||||
assert excinfo.value.called_by == "new_version"
|
||||
|
||||
|
@ -178,5 +179,6 @@ def test_versioning_error_revoke_of_revoked():
|
|||
|
||||
with pytest.raises(stix2.exceptions.RevokeError) as excinfo:
|
||||
campaign_v2.revoke()
|
||||
assert str(excinfo.value) == "Cannot revoke an already revoked object."
|
||||
|
||||
assert excinfo.value.called_by == "revoke"
|
||||
|
|
Loading…
Reference in New Issue