Add custom extensions to cyber observables

Fix #31.
stix2.1
clenk 2017-08-23 18:36:24 -04:00
parent 94ccd422c9
commit ee49e78c72
2 changed files with 150 additions and 31 deletions

View File

@ -24,7 +24,7 @@ class ObservableProperty(Property):
except ValueError: except ValueError:
raise ValueError("The observable property must contain a dictionary") raise ValueError("The observable property must contain a dictionary")
if dictified == {}: if dictified == {}:
raise ValueError("The dictionary property must contain a non-empty dictionary") raise ValueError("The observable property must contain a non-empty dictionary")
valid_refs = dict((k, v['type']) for (k, v) in dictified.items()) valid_refs = dict((k, v['type']) for (k, v) in dictified.items())
@ -53,7 +53,7 @@ class ExtensionsProperty(DictionaryProperty):
except ValueError: except ValueError:
raise ValueError("The extensions property must contain a dictionary") raise ValueError("The extensions property must contain a dictionary")
if dictified == {}: if dictified == {}:
raise ValueError("The dictionary property must contain a non-empty dictionary") raise ValueError("The extensions property must contain a non-empty dictionary")
if self.enclosing_type in EXT_MAP: if self.enclosing_type in EXT_MAP:
specific_type_map = EXT_MAP[self.enclosing_type] specific_type_map = EXT_MAP[self.enclosing_type]
@ -69,7 +69,7 @@ class ExtensionsProperty(DictionaryProperty):
else: else:
raise ValueError("The key used in the extensions dictionary is not an extension type name") raise ValueError("The key used in the extensions dictionary is not an extension type name")
else: else:
raise ValueError("The enclosing type has no extensions defined") raise ValueError("The enclosing type '%s' has no extensions defined" % self.enclosing_type)
return dictified return dictified
@ -77,6 +77,7 @@ class Artifact(_Observable):
_type = 'artifact' _type = 'artifact'
_properties = { _properties = {
'type': TypeProperty(_type), 'type': TypeProperty(_type),
'extensions': ExtensionsProperty(enclosing_type=_type),
'mime_type': StringProperty(), 'mime_type': StringProperty(),
'payload_bin': BinaryProperty(), 'payload_bin': BinaryProperty(),
'url': StringProperty(), 'url': StringProperty(),
@ -93,6 +94,7 @@ class AutonomousSystem(_Observable):
_type = 'autonomous-system' _type = 'autonomous-system'
_properties = { _properties = {
'type': TypeProperty(_type), 'type': TypeProperty(_type),
'extensions': ExtensionsProperty(enclosing_type=_type),
'number': IntegerProperty(), 'number': IntegerProperty(),
'name': StringProperty(), 'name': StringProperty(),
'rir': StringProperty(), 'rir': StringProperty(),
@ -103,6 +105,7 @@ class Directory(_Observable):
_type = 'directory' _type = 'directory'
_properties = { _properties = {
'type': TypeProperty(_type), 'type': TypeProperty(_type),
'extensions': ExtensionsProperty(enclosing_type=_type),
'path': StringProperty(required=True), 'path': StringProperty(required=True),
'path_enc': StringProperty(), 'path_enc': StringProperty(),
# these are not the created/modified timestamps of the object itself # these are not the created/modified timestamps of the object itself
@ -117,6 +120,7 @@ class DomainName(_Observable):
_type = 'domain-name' _type = 'domain-name'
_properties = { _properties = {
'type': TypeProperty(_type), 'type': TypeProperty(_type),
'extensions': ExtensionsProperty(enclosing_type=_type),
'value': StringProperty(required=True), 'value': StringProperty(required=True),
'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name'])), 'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name'])),
} }
@ -126,6 +130,7 @@ class EmailAddress(_Observable):
_type = 'email-addr' _type = 'email-addr'
_properties = { _properties = {
'type': TypeProperty(_type), 'type': TypeProperty(_type),
'extensions': ExtensionsProperty(enclosing_type=_type),
'value': StringProperty(required=True), 'value': StringProperty(required=True),
'display_name': StringProperty(), 'display_name': StringProperty(),
'belongs_to_ref': ObjectReferenceProperty(valid_types='user-account'), 'belongs_to_ref': ObjectReferenceProperty(valid_types='user-account'),
@ -149,6 +154,7 @@ class EmailMessage(_Observable):
_type = 'email-message' _type = 'email-message'
_properties = { _properties = {
'type': TypeProperty(_type), 'type': TypeProperty(_type),
'extensions': ExtensionsProperty(enclosing_type=_type),
'is_multipart': BooleanProperty(required=True), 'is_multipart': BooleanProperty(required=True),
'date': TimestampProperty(), 'date': TimestampProperty(),
'content_type': StringProperty(), 'content_type': StringProperty(),
@ -174,6 +180,7 @@ class EmailMessage(_Observable):
class ArchiveExt(_Extension): class ArchiveExt(_Extension):
_type = 'archive-ext'
_properties = { _properties = {
'contains_refs': ListProperty(ObjectReferenceProperty(valid_types='file'), required=True), 'contains_refs': ListProperty(ObjectReferenceProperty(valid_types='file'), required=True),
'version': StringProperty(), 'version': StringProperty(),
@ -190,6 +197,7 @@ class AlternateDataStream(_STIXBase):
class NTFSExt(_Extension): class NTFSExt(_Extension):
_type = 'ntfs-ext'
_properties = { _properties = {
'sid': StringProperty(), 'sid': StringProperty(),
'alternate_data_streams': ListProperty(EmbeddedObjectProperty(type=AlternateDataStream)), 'alternate_data_streams': ListProperty(EmbeddedObjectProperty(type=AlternateDataStream)),
@ -197,6 +205,7 @@ class NTFSExt(_Extension):
class PDFExt(_Extension): class PDFExt(_Extension):
_type = 'pdf-ext'
_properties = { _properties = {
'version': StringProperty(), 'version': StringProperty(),
'is_optimized': BooleanProperty(), 'is_optimized': BooleanProperty(),
@ -207,6 +216,7 @@ class PDFExt(_Extension):
class RasterImageExt(_Extension): class RasterImageExt(_Extension):
_type = 'raster-image-ext'
_properties = { _properties = {
'image_height': IntegerProperty(), 'image_height': IntegerProperty(),
'image_weight': IntegerProperty(), 'image_weight': IntegerProperty(),
@ -266,6 +276,7 @@ class WindowsPESection(_STIXBase):
class WindowsPEBinaryExt(_Extension): class WindowsPEBinaryExt(_Extension):
_type = 'windows-pebinary-ext'
_properties = { _properties = {
'pe_type': StringProperty(required=True), # open_vocab 'pe_type': StringProperty(required=True), # open_vocab
'imphash': StringProperty(), 'imphash': StringProperty(),
@ -315,6 +326,7 @@ class IPv4Address(_Observable):
_type = 'ipv4-addr' _type = 'ipv4-addr'
_properties = { _properties = {
'type': TypeProperty(_type), 'type': TypeProperty(_type),
'extensions': ExtensionsProperty(enclosing_type=_type),
'value': StringProperty(required=True), 'value': StringProperty(required=True),
'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types='mac-addr')), 'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types='mac-addr')),
'belongs_to_refs': ListProperty(ObjectReferenceProperty(valid_types='autonomous-system')), 'belongs_to_refs': ListProperty(ObjectReferenceProperty(valid_types='autonomous-system')),
@ -325,6 +337,7 @@ class IPv6Address(_Observable):
_type = 'ipv6-addr' _type = 'ipv6-addr'
_properties = { _properties = {
'type': TypeProperty(_type), 'type': TypeProperty(_type),
'extensions': ExtensionsProperty(enclosing_type=_type),
'value': StringProperty(required=True), 'value': StringProperty(required=True),
'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types='mac-addr')), 'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types='mac-addr')),
'belongs_to_refs': ListProperty(ObjectReferenceProperty(valid_types='autonomous-system')), 'belongs_to_refs': ListProperty(ObjectReferenceProperty(valid_types='autonomous-system')),
@ -335,6 +348,7 @@ class MACAddress(_Observable):
_type = 'mac-addr' _type = 'mac-addr'
_properties = { _properties = {
'type': TypeProperty(_type), 'type': TypeProperty(_type),
'extensions': ExtensionsProperty(enclosing_type=_type),
'value': StringProperty(required=True), 'value': StringProperty(required=True),
} }
@ -343,11 +357,13 @@ class Mutex(_Observable):
_type = 'mutex' _type = 'mutex'
_properties = { _properties = {
'type': TypeProperty(_type), 'type': TypeProperty(_type),
'extensions': ExtensionsProperty(enclosing_type=_type),
'name': StringProperty(), 'name': StringProperty(),
} }
class HTTPRequestExt(_Extension): class HTTPRequestExt(_Extension):
_type = 'http-request-ext'
_properties = { _properties = {
'request_method': StringProperty(required=True), 'request_method': StringProperty(required=True),
'request_value': StringProperty(required=True), 'request_value': StringProperty(required=True),
@ -359,6 +375,7 @@ class HTTPRequestExt(_Extension):
class ICMPExt(_Extension): class ICMPExt(_Extension):
_type = 'icmp-ext'
_properties = { _properties = {
'icmp_type_hex': HexProperty(required=True), 'icmp_type_hex': HexProperty(required=True),
'icmp_code_hex': HexProperty(required=True), 'icmp_code_hex': HexProperty(required=True),
@ -366,6 +383,7 @@ class ICMPExt(_Extension):
class SocketExt(_Extension): class SocketExt(_Extension):
_type = 'socket-ext'
_properties = { _properties = {
'address_family': EnumProperty([ 'address_family': EnumProperty([
"AF_UNSPEC", "AF_UNSPEC",
@ -399,6 +417,7 @@ class SocketExt(_Extension):
class TCPExt(_Extension): class TCPExt(_Extension):
_type = 'tcp-ext'
_properties = { _properties = {
'src_flags_hex': HexProperty(), 'src_flags_hex': HexProperty(),
'dst_flags_hex': HexProperty(), 'dst_flags_hex': HexProperty(),
@ -435,6 +454,7 @@ class NetworkTraffic(_Observable):
class WindowsProcessExt(_Extension): class WindowsProcessExt(_Extension):
_type = 'windows-process-ext'
_properties = { _properties = {
'aslr_enabled': BooleanProperty(), 'aslr_enabled': BooleanProperty(),
'dep_enabled': BooleanProperty(), 'dep_enabled': BooleanProperty(),
@ -446,6 +466,7 @@ class WindowsProcessExt(_Extension):
class WindowsServiceExt(_Extension): class WindowsServiceExt(_Extension):
_type = 'windows-service-ext'
_properties = { _properties = {
'service_name': StringProperty(required=True), 'service_name': StringProperty(required=True),
'descriptions': ListProperty(StringProperty), 'descriptions': ListProperty(StringProperty),
@ -517,6 +538,7 @@ class Software(_Observable):
_type = 'software' _type = 'software'
_properties = { _properties = {
'type': TypeProperty(_type), 'type': TypeProperty(_type),
'extensions': ExtensionsProperty(enclosing_type=_type),
'name': StringProperty(required=True), 'name': StringProperty(required=True),
'cpe': StringProperty(), 'cpe': StringProperty(),
'languages': ListProperty(StringProperty), 'languages': ListProperty(StringProperty),
@ -529,11 +551,13 @@ class URL(_Observable):
_type = 'url' _type = 'url'
_properties = { _properties = {
'type': TypeProperty(_type), 'type': TypeProperty(_type),
'extensions': ExtensionsProperty(enclosing_type=_type),
'value': StringProperty(required=True), 'value': StringProperty(required=True),
} }
class UNIXAccountExt(_Extension): class UNIXAccountExt(_Extension):
_type = 'unix-account-ext'
_properties = { _properties = {
'gid': IntegerProperty(), 'gid': IntegerProperty(),
'groups': ListProperty(StringProperty), 'groups': ListProperty(StringProperty),
@ -590,6 +614,7 @@ class WindowsRegistryKey(_Observable):
_type = 'windows-registry-key' _type = 'windows-registry-key'
_properties = { _properties = {
'type': TypeProperty(_type), 'type': TypeProperty(_type),
'extensions': ExtensionsProperty(enclosing_type=_type),
'key': StringProperty(required=True), 'key': StringProperty(required=True),
'values': ListProperty(EmbeddedObjectProperty(type=WindowsRegistryValueType)), 'values': ListProperty(EmbeddedObjectProperty(type=WindowsRegistryValueType)),
# this is not the modified timestamps of the object itself # this is not the modified timestamps of the object itself
@ -630,6 +655,7 @@ class X509Certificate(_Observable):
_type = 'x509-certificate' _type = 'x509-certificate'
_properties = { _properties = {
'type': TypeProperty(_type), 'type': TypeProperty(_type),
'extensions': ExtensionsProperty(enclosing_type=_type),
'is_self_signed': BooleanProperty(), 'is_self_signed': BooleanProperty(),
'hashes': HashesProperty(), 'hashes': HashesProperty(),
'version': StringProperty(), 'version': StringProperty(),
@ -667,36 +693,28 @@ OBJ_MAP_OBSERVABLE = {
'x509-certificate': X509Certificate, 'x509-certificate': X509Certificate,
} }
EXT_MAP_FILE = {
'archive-ext': ArchiveExt,
'ntfs-ext': NTFSExt,
'pdf-ext': PDFExt,
'raster-image-ext': RasterImageExt,
'windows-pebinary-ext': WindowsPEBinaryExt
}
EXT_MAP_NETWORK_TRAFFIC = {
'http-request-ext': HTTPRequestExt,
'icmp-ext': ICMPExt,
'socket-ext': SocketExt,
'tcp-ext': TCPExt,
}
EXT_MAP_PROCESS = {
'windows-process-ext': WindowsProcessExt,
'windows-service-ext': WindowsServiceExt,
}
EXT_MAP_USER_ACCOUNT = {
'unix-account-ext': UNIXAccountExt,
}
EXT_MAP = { EXT_MAP = {
'file': EXT_MAP_FILE, 'file': {
'network-traffic': EXT_MAP_NETWORK_TRAFFIC, 'archive-ext': ArchiveExt,
'process': EXT_MAP_PROCESS, 'ntfs-ext': NTFSExt,
'user-account': EXT_MAP_USER_ACCOUNT, 'pdf-ext': PDFExt,
'raster-image-ext': RasterImageExt,
'windows-pebinary-ext': WindowsPEBinaryExt
},
'network-traffic': {
'http-request-ext': HTTPRequestExt,
'icmp-ext': ICMPExt,
'socket-ext': SocketExt,
'tcp-ext': TCPExt,
},
'process': {
'windows-process-ext': WindowsProcessExt,
'windows-service-ext': WindowsServiceExt,
},
'user-account': {
'unix-account-ext': UNIXAccountExt,
},
} }
@ -760,3 +778,47 @@ def CustomObservable(type='x-custom-observable', properties={}):
return _Custom return _Custom
return custom_builder return custom_builder
def _register_extension(observable, new_extension):
"""Register a custom extension to a STIX Cyber Observable type.
"""
try:
observable_type = observable._type
except AttributeError:
raise ValueError("Custom observables must be created with the @CustomObservable decorator.")
try:
EXT_MAP[observable_type][new_extension._type] = new_extension
except KeyError:
if observable_type not in OBJ_MAP_OBSERVABLE:
raise ValueError("Unknown observable type '%s'" % observable_type)
else:
EXT_MAP[observable_type] = {new_extension._type: new_extension}
def CustomExtension(observable=None, type='x-custom-observable', properties={}):
"""Decorator for custom extensions to STIX Cyber Observables
"""
if not observable or not issubclass(observable, _Observable):
raise ValueError("'observable' must be a valid Observable class!")
def custom_builder(cls):
class _Custom(cls, _Extension):
_type = type
_properties = {
'extensions': ExtensionsProperty(enclosing_type=_type),
}
_properties.update(properties)
def __init__(self, **kwargs):
_Extension.__init__(self, **kwargs)
cls.__init__(self, **kwargs)
_register_extension(observable, _Custom)
return _Custom
return custom_builder

View File

@ -180,3 +180,60 @@ def test_observed_data_with_custom_observable_object():
allow_custom=True, allow_custom=True,
) )
assert ob_data.objects['0'].property1 == 'something' assert ob_data.objects['0'].property1 == 'something'
@stix2.observables.CustomExtension(stix2.DomainName, 'x-new-ext', {
'property1': stix2.properties.StringProperty(required=True),
'property2': stix2.properties.IntegerProperty(),
})
class NewExtension():
def __init__(self, property2=None, **kwargs):
if property2 and property2 < 10:
raise ValueError("'property2' is too small.")
def test_custom_extension():
ext = NewExtension(property1='something')
assert ext.property1 == 'something'
with pytest.raises(stix2.exceptions.MissingPropertiesError):
NewExtension(property2=42)
with pytest.raises(ValueError):
NewExtension(property1='something', property2=4)
def test_custom_extension_invalid():
class Foo(object):
pass
with pytest.raises(ValueError):
@stix2.observables.CustomExtension(Foo, 'x-new-ext', {
'property1': stix2.properties.StringProperty(required=True),
})
class FooExtension():
pass # pragma: no cover
class Bar(stix2.observables._Observable):
pass
with pytest.raises(ValueError):
@stix2.observables.CustomExtension(Bar, 'x-new-ext', {
'property1': stix2.properties.StringProperty(required=True),
})
class BarExtension():
pass
def test_parse_observable_with_custom_extension():
input_str = """{
"type": "domain-name",
"value": "example.com",
"extensions": {
"x-new-ext": {
"property1": "foo",
"property2": 12
}
}
}"""
parsed = stix2.parse_observable(input_str)
assert parsed.extensions['x-new-ext'].property2 == 12