From ee49e78c72163e40c8cdae0b43914434e47d24ca Mon Sep 17 00:00:00 2001 From: clenk Date: Wed, 23 Aug 2017 18:36:24 -0400 Subject: [PATCH] Add custom extensions to cyber observables Fix #31. --- stix2/observables.py | 124 ++++++++++++++++++++++++++++---------- stix2/test/test_custom.py | 57 ++++++++++++++++++ 2 files changed, 150 insertions(+), 31 deletions(-) diff --git a/stix2/observables.py b/stix2/observables.py index 366e007..3c033bd 100644 --- a/stix2/observables.py +++ b/stix2/observables.py @@ -24,7 +24,7 @@ class ObservableProperty(Property): except ValueError: raise ValueError("The observable property must contain a dictionary") if dictified == {}: - raise ValueError("The dictionary property must contain a non-empty dictionary") + raise ValueError("The observable property must contain a non-empty dictionary") valid_refs = dict((k, v['type']) for (k, v) in dictified.items()) @@ -53,7 +53,7 @@ class ExtensionsProperty(DictionaryProperty): except ValueError: raise ValueError("The extensions property must contain a dictionary") if dictified == {}: - raise ValueError("The dictionary property must contain a non-empty dictionary") + raise ValueError("The extensions property must contain a non-empty dictionary") if self.enclosing_type in EXT_MAP: specific_type_map = EXT_MAP[self.enclosing_type] @@ -69,7 +69,7 @@ class ExtensionsProperty(DictionaryProperty): else: raise ValueError("The key used in the extensions dictionary is not an extension type name") else: - raise ValueError("The enclosing type has no extensions defined") + raise ValueError("The enclosing type '%s' has no extensions defined" % self.enclosing_type) return dictified @@ -77,6 +77,7 @@ class Artifact(_Observable): _type = 'artifact' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'mime_type': StringProperty(), 'payload_bin': BinaryProperty(), 'url': StringProperty(), @@ -93,6 +94,7 @@ class AutonomousSystem(_Observable): _type = 'autonomous-system' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'number': IntegerProperty(), 'name': StringProperty(), 'rir': StringProperty(), @@ -103,6 +105,7 @@ class Directory(_Observable): _type = 'directory' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'path': StringProperty(required=True), 'path_enc': StringProperty(), # these are not the created/modified timestamps of the object itself @@ -117,6 +120,7 @@ class DomainName(_Observable): _type = 'domain-name' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'value': StringProperty(required=True), 'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name'])), } @@ -126,6 +130,7 @@ class EmailAddress(_Observable): _type = 'email-addr' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'value': StringProperty(required=True), 'display_name': StringProperty(), 'belongs_to_ref': ObjectReferenceProperty(valid_types='user-account'), @@ -149,6 +154,7 @@ class EmailMessage(_Observable): _type = 'email-message' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'is_multipart': BooleanProperty(required=True), 'date': TimestampProperty(), 'content_type': StringProperty(), @@ -174,6 +180,7 @@ class EmailMessage(_Observable): class ArchiveExt(_Extension): + _type = 'archive-ext' _properties = { 'contains_refs': ListProperty(ObjectReferenceProperty(valid_types='file'), required=True), 'version': StringProperty(), @@ -190,6 +197,7 @@ class AlternateDataStream(_STIXBase): class NTFSExt(_Extension): + _type = 'ntfs-ext' _properties = { 'sid': StringProperty(), 'alternate_data_streams': ListProperty(EmbeddedObjectProperty(type=AlternateDataStream)), @@ -197,6 +205,7 @@ class NTFSExt(_Extension): class PDFExt(_Extension): + _type = 'pdf-ext' _properties = { 'version': StringProperty(), 'is_optimized': BooleanProperty(), @@ -207,6 +216,7 @@ class PDFExt(_Extension): class RasterImageExt(_Extension): + _type = 'raster-image-ext' _properties = { 'image_height': IntegerProperty(), 'image_weight': IntegerProperty(), @@ -266,6 +276,7 @@ class WindowsPESection(_STIXBase): class WindowsPEBinaryExt(_Extension): + _type = 'windows-pebinary-ext' _properties = { 'pe_type': StringProperty(required=True), # open_vocab 'imphash': StringProperty(), @@ -315,6 +326,7 @@ class IPv4Address(_Observable): _type = 'ipv4-addr' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'value': StringProperty(required=True), 'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types='mac-addr')), 'belongs_to_refs': ListProperty(ObjectReferenceProperty(valid_types='autonomous-system')), @@ -325,6 +337,7 @@ class IPv6Address(_Observable): _type = 'ipv6-addr' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'value': StringProperty(required=True), 'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types='mac-addr')), 'belongs_to_refs': ListProperty(ObjectReferenceProperty(valid_types='autonomous-system')), @@ -335,6 +348,7 @@ class MACAddress(_Observable): _type = 'mac-addr' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'value': StringProperty(required=True), } @@ -343,11 +357,13 @@ class Mutex(_Observable): _type = 'mutex' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'name': StringProperty(), } class HTTPRequestExt(_Extension): + _type = 'http-request-ext' _properties = { 'request_method': StringProperty(required=True), 'request_value': StringProperty(required=True), @@ -359,6 +375,7 @@ class HTTPRequestExt(_Extension): class ICMPExt(_Extension): + _type = 'icmp-ext' _properties = { 'icmp_type_hex': HexProperty(required=True), 'icmp_code_hex': HexProperty(required=True), @@ -366,6 +383,7 @@ class ICMPExt(_Extension): class SocketExt(_Extension): + _type = 'socket-ext' _properties = { 'address_family': EnumProperty([ "AF_UNSPEC", @@ -399,6 +417,7 @@ class SocketExt(_Extension): class TCPExt(_Extension): + _type = 'tcp-ext' _properties = { 'src_flags_hex': HexProperty(), 'dst_flags_hex': HexProperty(), @@ -435,6 +454,7 @@ class NetworkTraffic(_Observable): class WindowsProcessExt(_Extension): + _type = 'windows-process-ext' _properties = { 'aslr_enabled': BooleanProperty(), 'dep_enabled': BooleanProperty(), @@ -446,6 +466,7 @@ class WindowsProcessExt(_Extension): class WindowsServiceExt(_Extension): + _type = 'windows-service-ext' _properties = { 'service_name': StringProperty(required=True), 'descriptions': ListProperty(StringProperty), @@ -517,6 +538,7 @@ class Software(_Observable): _type = 'software' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'name': StringProperty(required=True), 'cpe': StringProperty(), 'languages': ListProperty(StringProperty), @@ -529,11 +551,13 @@ class URL(_Observable): _type = 'url' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'value': StringProperty(required=True), } class UNIXAccountExt(_Extension): + _type = 'unix-account-ext' _properties = { 'gid': IntegerProperty(), 'groups': ListProperty(StringProperty), @@ -590,6 +614,7 @@ class WindowsRegistryKey(_Observable): _type = 'windows-registry-key' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'key': StringProperty(required=True), 'values': ListProperty(EmbeddedObjectProperty(type=WindowsRegistryValueType)), # this is not the modified timestamps of the object itself @@ -630,6 +655,7 @@ class X509Certificate(_Observable): _type = 'x509-certificate' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'is_self_signed': BooleanProperty(), 'hashes': HashesProperty(), 'version': StringProperty(), @@ -667,36 +693,28 @@ OBJ_MAP_OBSERVABLE = { 'x509-certificate': X509Certificate, } -EXT_MAP_FILE = { - 'archive-ext': ArchiveExt, - 'ntfs-ext': NTFSExt, - 'pdf-ext': PDFExt, - 'raster-image-ext': RasterImageExt, - 'windows-pebinary-ext': WindowsPEBinaryExt -} - -EXT_MAP_NETWORK_TRAFFIC = { - 'http-request-ext': HTTPRequestExt, - 'icmp-ext': ICMPExt, - 'socket-ext': SocketExt, - 'tcp-ext': TCPExt, -} - -EXT_MAP_PROCESS = { - 'windows-process-ext': WindowsProcessExt, - 'windows-service-ext': WindowsServiceExt, -} - -EXT_MAP_USER_ACCOUNT = { - 'unix-account-ext': UNIXAccountExt, -} EXT_MAP = { - 'file': EXT_MAP_FILE, - 'network-traffic': EXT_MAP_NETWORK_TRAFFIC, - 'process': EXT_MAP_PROCESS, - 'user-account': EXT_MAP_USER_ACCOUNT, - + 'file': { + 'archive-ext': ArchiveExt, + 'ntfs-ext': NTFSExt, + 'pdf-ext': PDFExt, + 'raster-image-ext': RasterImageExt, + 'windows-pebinary-ext': WindowsPEBinaryExt + }, + 'network-traffic': { + 'http-request-ext': HTTPRequestExt, + 'icmp-ext': ICMPExt, + 'socket-ext': SocketExt, + 'tcp-ext': TCPExt, + }, + 'process': { + 'windows-process-ext': WindowsProcessExt, + 'windows-service-ext': WindowsServiceExt, + }, + 'user-account': { + 'unix-account-ext': UNIXAccountExt, + }, } @@ -760,3 +778,47 @@ def CustomObservable(type='x-custom-observable', properties={}): return _Custom return custom_builder + + +def _register_extension(observable, new_extension): + """Register a custom extension to a STIX Cyber Observable type. + """ + + try: + observable_type = observable._type + except AttributeError: + raise ValueError("Custom observables must be created with the @CustomObservable decorator.") + + try: + EXT_MAP[observable_type][new_extension._type] = new_extension + except KeyError: + if observable_type not in OBJ_MAP_OBSERVABLE: + raise ValueError("Unknown observable type '%s'" % observable_type) + else: + EXT_MAP[observable_type] = {new_extension._type: new_extension} + + +def CustomExtension(observable=None, type='x-custom-observable', properties={}): + """Decorator for custom extensions to STIX Cyber Observables + """ + + if not observable or not issubclass(observable, _Observable): + raise ValueError("'observable' must be a valid Observable class!") + + def custom_builder(cls): + + class _Custom(cls, _Extension): + _type = type + _properties = { + 'extensions': ExtensionsProperty(enclosing_type=_type), + } + _properties.update(properties) + + def __init__(self, **kwargs): + _Extension.__init__(self, **kwargs) + cls.__init__(self, **kwargs) + + _register_extension(observable, _Custom) + return _Custom + + return custom_builder diff --git a/stix2/test/test_custom.py b/stix2/test/test_custom.py index 60e982c..804776a 100644 --- a/stix2/test/test_custom.py +++ b/stix2/test/test_custom.py @@ -180,3 +180,60 @@ def test_observed_data_with_custom_observable_object(): allow_custom=True, ) assert ob_data.objects['0'].property1 == 'something' + + +@stix2.observables.CustomExtension(stix2.DomainName, 'x-new-ext', { + 'property1': stix2.properties.StringProperty(required=True), + 'property2': stix2.properties.IntegerProperty(), +}) +class NewExtension(): + def __init__(self, property2=None, **kwargs): + if property2 and property2 < 10: + raise ValueError("'property2' is too small.") + + +def test_custom_extension(): + ext = NewExtension(property1='something') + assert ext.property1 == 'something' + + with pytest.raises(stix2.exceptions.MissingPropertiesError): + NewExtension(property2=42) + + with pytest.raises(ValueError): + NewExtension(property1='something', property2=4) + + +def test_custom_extension_invalid(): + class Foo(object): + pass + with pytest.raises(ValueError): + @stix2.observables.CustomExtension(Foo, 'x-new-ext', { + 'property1': stix2.properties.StringProperty(required=True), + }) + class FooExtension(): + pass # pragma: no cover + + class Bar(stix2.observables._Observable): + pass + with pytest.raises(ValueError): + @stix2.observables.CustomExtension(Bar, 'x-new-ext', { + 'property1': stix2.properties.StringProperty(required=True), + }) + class BarExtension(): + pass + + +def test_parse_observable_with_custom_extension(): + input_str = """{ + "type": "domain-name", + "value": "example.com", + "extensions": { + "x-new-ext": { + "property1": "foo", + "property2": 12 + } + } + }""" + + parsed = stix2.parse_observable(input_str) + assert parsed.extensions['x-new-ext'].property2 == 12