"""STIX 2.0 Cyber Observable Objects Embedded observable object types, such as Email MIME Component, which is embedded in Email Message objects, inherit from _STIXBase instead of Observable and do not have a '_type' attribute. """ from collections import OrderedDict from .base import _Extension, _Observable, _STIXBase from .exceptions import (AtLeastOnePropertyError, DependentPropertiesError, ParseError) from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty, EmbeddedObjectProperty, EnumProperty, FloatProperty, HashesProperty, HexProperty, IntegerProperty, ListProperty, ObjectReferenceProperty, Property, StringProperty, TimestampProperty, TypeProperty) from .utils import get_dict class ObservableProperty(Property): def clean(self, value): try: dictified = get_dict(value) except ValueError: raise ValueError("The observable property must contain a dictionary") if dictified == {}: raise ValueError("The dictionary property must contain a non-empty dictionary") valid_refs = dict((k, v['type']) for (k, v) in dictified.items()) # from .__init__ import parse_observable # avoid circular import for key, obj in dictified.items(): parsed_obj = parse_observable(obj, valid_refs) if not issubclass(type(parsed_obj), _Observable): raise ValueError("Objects in an observable property must be " "Cyber Observable Objects") dictified[key] = parsed_obj return dictified class ExtensionsProperty(DictionaryProperty): """ Property for representing extensions on Observable objects """ def __init__(self, enclosing_type=None, required=False): self.enclosing_type = enclosing_type super(ExtensionsProperty, self).__init__(required) def clean(self, value): try: dictified = get_dict(value) except ValueError: raise ValueError("The extensions property must contain a dictionary") if dictified == {}: raise ValueError("The dictionary property must contain a non-empty dictionary") if self.enclosing_type in EXT_MAP: specific_type_map = EXT_MAP[self.enclosing_type] for key, subvalue in dictified.items(): if key in specific_type_map: cls = specific_type_map[key] if type(subvalue) is dict: dictified[key] = cls(**subvalue) elif type(subvalue) is cls: dictified[key] = subvalue else: raise ValueError("Cannot determine extension type.") else: raise ValueError("The key used in the extensions dictionary is not an extension type name") else: raise ValueError("The enclosing type has no extensions defined") return dictified class Artifact(_Observable): _type = 'artifact' _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ('mime_type', StringProperty()), ('payload_bin', BinaryProperty()), ('url', StringProperty()), ('hashes', HashesProperty()), ]) def _check_object_constraints(self): super(Artifact, self)._check_object_constraints() self._check_mutually_exclusive_properties(["payload_bin", "url"]) self._check_properties_dependency(["hashes"], ["url"]) class AutonomousSystem(_Observable): _type = 'autonomous-system' _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ('number', IntegerProperty()), ('name', StringProperty()), ('rir', StringProperty()), ]) class Directory(_Observable): _type = 'directory' _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ('path', StringProperty(required=True)), ('path_enc', StringProperty()), # these are not the created/modified timestamps of the object itself ('created', TimestampProperty()), ('modified', TimestampProperty()), ('accessed', TimestampProperty()), ('contains_refs', ListProperty(ObjectReferenceProperty(valid_types=['file', 'directory']))), ]) class DomainName(_Observable): _type = 'domain-name' _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ('value', StringProperty(required=True)), ('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name']))), ]) class EmailAddress(_Observable): _type = 'email-addr' _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ('value', StringProperty(required=True)), ('display_name', StringProperty()), ('belongs_to_ref', ObjectReferenceProperty(valid_types='user-account')), ]) class EmailMIMEComponent(_STIXBase): _properties = OrderedDict() _properties.update([ ('body', StringProperty()), ('body_raw_ref', ObjectReferenceProperty(valid_types=['artifact', 'file'])), ('content_type', StringProperty()), ('content_disposition', StringProperty()), ]) def _check_object_constraints(self): super(EmailMIMEComponent, self)._check_object_constraints() self._check_at_least_one_property(["body", "body_raw_ref"]) class EmailMessage(_Observable): _type = 'email-message' _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ('is_multipart', BooleanProperty(required=True)), ('date', TimestampProperty()), ('content_type', StringProperty()), ('from_ref', ObjectReferenceProperty(valid_types='email-addr')), ('sender_ref', ObjectReferenceProperty(valid_types='email-addr')), ('to_refs', ListProperty(ObjectReferenceProperty(valid_types='email-addr'))), ('cc_refs', ListProperty(ObjectReferenceProperty(valid_types='email-addr'))), ('bcc_refs', ListProperty(ObjectReferenceProperty(valid_types='email-addr'))), ('subject', StringProperty()), ('received_lines', ListProperty(StringProperty)), ('additional_header_fields', DictionaryProperty()), ('body', StringProperty()), ('body_multipart', ListProperty(EmbeddedObjectProperty(type=EmailMIMEComponent))), ('raw_email_ref', ObjectReferenceProperty(valid_types='artifact')), ]) def _check_object_constraints(self): super(EmailMessage, self)._check_object_constraints() self._check_properties_dependency(["is_multipart"], ["body_multipart"]) if self.get("is_multipart") is True and self.get("body"): # 'body' MAY only be used if is_multipart is false. raise DependentPropertiesError(self.__class__, [("is_multipart", "body")]) class ArchiveExt(_Extension): _properties = OrderedDict() _properties.update([ ('contains_refs', ListProperty(ObjectReferenceProperty(valid_types='file'), required=True)), ('version', StringProperty()), ('comment', StringProperty()), ]) class AlternateDataStream(_STIXBase): _properties = OrderedDict() _properties.update([ ('name', StringProperty(required=True)), ('hashes', HashesProperty()), ('size', IntegerProperty()), ]) class NTFSExt(_Extension): _properties = OrderedDict() _properties.update([ ('sid', StringProperty()), ('alternate_data_streams', ListProperty(EmbeddedObjectProperty(type=AlternateDataStream))), ]) class PDFExt(_Extension): _properties = OrderedDict() _properties.update([ ('version', StringProperty()), ('is_optimized', BooleanProperty()), ('document_info_dict', DictionaryProperty()), ('pdfid0', StringProperty()), ('pdfid1', StringProperty()), ]) class RasterImageExt(_Extension): _properties = OrderedDict() _properties.update([ ('image_height', IntegerProperty()), ('image_weight', IntegerProperty()), ('bits_per_pixel', IntegerProperty()), ('image_compression_algorithm', StringProperty()), ('exif_tags', DictionaryProperty()), ]) class WindowsPEOptionalHeaderType(_STIXBase): _properties = OrderedDict() _properties.update([ ('magic_hex', HexProperty()), ('major_linker_version', IntegerProperty()), ('minor_linker_version', IntegerProperty()), ('size_of_code', IntegerProperty()), ('size_of_initialized_data', IntegerProperty()), ('size_of_uninitialized_data', IntegerProperty()), ('address_of_entry_point', IntegerProperty()), ('base_of_code', IntegerProperty()), ('base_of_data', IntegerProperty()), ('image_base', IntegerProperty()), ('section_alignment', IntegerProperty()), ('file_alignment', IntegerProperty()), ('major_os_version', IntegerProperty()), ('minor_os_version', IntegerProperty()), ('major_image_version', IntegerProperty()), ('minor_image_version', IntegerProperty()), ('major_subsystem_version', IntegerProperty()), ('minor_subsystem_version', IntegerProperty()), ('win32_version_value_hex', HexProperty()), ('size_of_image', IntegerProperty()), ('size_of_headers', IntegerProperty()), ('checksum_hex', HexProperty()), ('subsystem_hex', HexProperty()), ('dll_characteristics_hex', HexProperty()), ('size_of_stack_reserve', IntegerProperty()), ('size_of_stack_commit', IntegerProperty()), ('size_of_heap_reserve', IntegerProperty()), ('size_of_heap_commit', IntegerProperty()), ('loader_flags_hex', HexProperty()), ('number_of_rva_and_sizes', IntegerProperty()), ('hashes', HashesProperty()), ]) def _check_object_constraints(self): super(WindowsPEOptionalHeaderType, self)._check_object_constraints() self._check_at_least_one_property() class WindowsPESection(_STIXBase): _properties = OrderedDict() _properties.update([ ('name', StringProperty(required=True)), ('size', IntegerProperty()), ('entropy', FloatProperty()), ('hashes', HashesProperty()), ]) class WindowsPEBinaryExt(_Extension): _properties = OrderedDict() _properties.update([ ('pe_type', StringProperty(required=True)), # open_vocab ('imphash', StringProperty()), ('machine_hex', HexProperty()), ('number_of_sections', IntegerProperty()), ('time_date_stamp', TimestampProperty(precision='second')), ('pointer_to_symbol_table_hex', HexProperty()), ('number_of_symbols', IntegerProperty()), ('size_of_optional_header', IntegerProperty()), ('characteristics_hex', HexProperty()), ('file_header_hashes', HashesProperty()), ('optional_header', EmbeddedObjectProperty(type=WindowsPEOptionalHeaderType)), ('sections', ListProperty(EmbeddedObjectProperty(type=WindowsPESection))), ]) class File(_Observable): _type = 'file' _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ('extensions', ExtensionsProperty(enclosing_type=_type)), ('hashes', HashesProperty()), ('size', IntegerProperty()), ('name', StringProperty()), ('name_enc', StringProperty()), ('magic_number_hex', HexProperty()), ('mime_type', StringProperty()), # these are not the created/modified timestamps of the object itself ('created', TimestampProperty()), ('modified', TimestampProperty()), ('accessed', TimestampProperty()), ('parent_directory_ref', ObjectReferenceProperty(valid_types='directory')), ('is_encrypted', BooleanProperty()), ('encryption_algorithm', StringProperty()), ('decryption_key', StringProperty()), ('contains_refs', ListProperty(ObjectReferenceProperty)), ('content_ref', ObjectReferenceProperty(valid_types='artifact')), ]) def _check_object_constraints(self): super(File, self)._check_object_constraints() self._check_properties_dependency(["is_encrypted"], ["encryption_algorithm", "decryption_key"]) self._check_at_least_one_property(["hashes", "name"]) class IPv4Address(_Observable): _type = 'ipv4-addr' _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ('value', StringProperty(required=True)), ('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))), ('belongs_to_refs', ListProperty(ObjectReferenceProperty(valid_types='autonomous-system'))), ]) class IPv6Address(_Observable): _type = 'ipv6-addr' _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ('value', StringProperty(required=True)), ('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))), ('belongs_to_refs', ListProperty(ObjectReferenceProperty(valid_types='autonomous-system'))), ]) class MACAddress(_Observable): _type = 'mac-addr' _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ('value', StringProperty(required=True)), ]) class Mutex(_Observable): _type = 'mutex' _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ('name', StringProperty()), ]) class HTTPRequestExt(_Extension): _properties = OrderedDict() _properties.update([ ('request_method', StringProperty(required=True)), ('request_value', StringProperty(required=True)), ('request_version', StringProperty()), ('request_header', DictionaryProperty()), ('message_body_length', IntegerProperty()), ('message_body_data_ref', ObjectReferenceProperty(valid_types='artifact')), ]) class ICMPExt(_Extension): _properties = OrderedDict() _properties.update([ ('icmp_type_hex', HexProperty(required=True)), ('icmp_code_hex', HexProperty(required=True)), ]) class SocketExt(_Extension): _properties = OrderedDict() _properties.update([ ('address_family', EnumProperty([ "AF_UNSPEC", "AF_INET", "AF_IPX", "AF_APPLETALK", "AF_NETBIOS", "AF_INET6", "AF_IRDA", "AF_BTH", ], required=True)), ('is_blocking', BooleanProperty()), ('is_listening', BooleanProperty()), ('protocol_family', EnumProperty([ "PF_INET", "PF_IPX", "PF_APPLETALK", "PF_INET6", "PF_AX25", "PF_NETROM" ])), ('options', DictionaryProperty()), ('socket_type', EnumProperty([ "SOCK_STREAM", "SOCK_DGRAM", "SOCK_RAW", "SOCK_RDM", "SOCK_SEQPACKET", ])), ]) class TCPExt(_Extension): _properties = OrderedDict() _properties.update([ ('src_flags_hex', HexProperty()), ('dst_flags_hex', HexProperty()), ]) class NetworkTraffic(_Observable): _type = 'network-traffic' _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ('extensions', ExtensionsProperty(enclosing_type=_type)), ('start', TimestampProperty()), ('end', TimestampProperty()), ('is_active', BooleanProperty()), ('src_ref', ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'mac-addr', 'domain-name'])), ('dst_ref', ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'mac-addr', 'domain-name'])), ('src_port', IntegerProperty()), ('dst_port', IntegerProperty()), ('protocols', ListProperty(StringProperty, required=True)), ('src_byte_count', IntegerProperty()), ('dst_byte_count', IntegerProperty()), ('src_packets', IntegerProperty()), ('dst_packets', IntegerProperty()), ('ipfix', DictionaryProperty()), ('src_payload_ref', ObjectReferenceProperty(valid_types='artifact')), ('dst_payload_ref', ObjectReferenceProperty(valid_types='artifact')), ('encapsulates_refs', ListProperty(ObjectReferenceProperty(valid_types='network-traffic'))), ('encapsulates_by_ref', ObjectReferenceProperty(valid_types='network-traffic')), ]) def _check_object_constraints(self): super(NetworkTraffic, self)._check_object_constraints() self._check_at_least_one_property(["src_ref", "dst_ref"]) class WindowsProcessExt(_Extension): _properties = OrderedDict() _properties.update([ ('aslr_enabled', BooleanProperty()), ('dep_enabled', BooleanProperty()), ('priority', StringProperty()), ('owner_sid', StringProperty()), ('window_title', StringProperty()), ('startup_info', DictionaryProperty()), ]) class WindowsServiceExt(_Extension): _properties = OrderedDict() _properties.update([ ('service_name', StringProperty(required=True)), ('descriptions', ListProperty(StringProperty)), ('display_name', StringProperty()), ('group_name', StringProperty()), ('start_type', EnumProperty([ "SERVICE_AUTO_START", "SERVICE_BOOT_START", "SERVICE_DEMAND_START", "SERVICE_DISABLED", "SERVICE_SYSTEM_ALERT", ])), ('service_dll_refs', ListProperty(ObjectReferenceProperty(valid_types='file'))), ('service_type', EnumProperty([ "SERVICE_KERNEL_DRIVER", "SERVICE_FILE_SYSTEM_DRIVER", "SERVICE_WIN32_OWN_PROCESS", "SERVICE_WIN32_SHARE_PROCESS", ])), ('service_status', EnumProperty([ "SERVICE_CONTINUE_PENDING", "SERVICE_PAUSE_PENDING", "SERVICE_PAUSED", "SERVICE_RUNNING", "SERVICE_START_PENDING", "SERVICE_STOP_PENDING", "SERVICE_STOPPED", ])), ]) class Process(_Observable): _type = 'process' _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ('extensions', ExtensionsProperty(enclosing_type=_type)), ('is_hidden', BooleanProperty()), ('pid', IntegerProperty()), ('name', StringProperty()), # this is not the created timestamps of the object itself ('created', TimestampProperty()), ('cwd', StringProperty()), ('arguments', ListProperty(StringProperty)), ('command_line', StringProperty()), ('environment_variables', DictionaryProperty()), ('opened_connection_refs', ListProperty(ObjectReferenceProperty(valid_types='network-traffic'))), ('creator_user_ref', ObjectReferenceProperty(valid_types='user-account')), ('binary_ref', ObjectReferenceProperty(valid_types='file')), ('parent_ref', ObjectReferenceProperty(valid_types='process')), ('child_refs', ListProperty(ObjectReferenceProperty('process'))), ]) def _check_object_constraints(self): # no need to check windows-service-ext, since it has a required property super(Process, self)._check_object_constraints() try: self._check_at_least_one_property() if "windows-process-ext" in self.get('extensions', {}): self.extensions["windows-process-ext"]._check_at_least_one_property() except AtLeastOnePropertyError as enclosing_exc: if 'extensions' not in self: raise enclosing_exc else: if "windows-process-ext" in self.get('extensions', {}): self.extensions["windows-process-ext"]._check_at_least_one_property() class Software(_Observable): _type = 'software' _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ('name', StringProperty(required=True)), ('cpe', StringProperty()), ('languages', ListProperty(StringProperty)), ('vendor', StringProperty()), ('version', StringProperty()), ]) class URL(_Observable): _type = 'url' _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ('value', StringProperty(required=True)), ]) class UNIXAccountExt(_Extension): _properties = OrderedDict() _properties.update([ ('gid', IntegerProperty()), ('groups', ListProperty(StringProperty)), ('home_dir', StringProperty()), ('shell', StringProperty()), ]) class UserAccount(_Observable): _type = 'user-account' _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ('extensions', ExtensionsProperty(enclosing_type=_type)), ('user_id', StringProperty(required=True)), ('account_login', StringProperty()), ('account_type', StringProperty()), # open vocab ('display_name', StringProperty()), ('is_service_account', BooleanProperty()), ('is_privileged', BooleanProperty()), ('can_escalate_privs', BooleanProperty()), ('is_disabled', BooleanProperty()), ('account_created', TimestampProperty()), ('account_expires', TimestampProperty()), ('password_last_changed', TimestampProperty()), ('account_first_login', TimestampProperty()), ('account_last_login', TimestampProperty()), ]) class WindowsRegistryValueType(_STIXBase): _type = 'windows-registry-value-type' _properties = OrderedDict() _properties.update([ ('name', StringProperty(required=True)), ('data', StringProperty()), ('data_type', EnumProperty([ 'REG_NONE', 'REG_SZ', 'REG_EXPAND_SZ', 'REG_BINARY', 'REG_DWORD', 'REG_DWORD_BIG_ENDIAN', 'REG_LINK', 'REG_MULTI_SZ', 'REG_RESOURCE_LIST', 'REG_FULL_RESOURCE_DESCRIPTION', 'REG_RESOURCE_REQUIREMENTS_LIST', 'REG_QWORD', 'REG_INVALID_TYPE', ])), ]) class WindowsRegistryKey(_Observable): _type = 'windows-registry-key' _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ('key', StringProperty(required=True)), ('values', ListProperty(EmbeddedObjectProperty(type=WindowsRegistryValueType))), # this is not the modified timestamps of the object itself ('modified', TimestampProperty()), ('creator_user_ref', ObjectReferenceProperty(valid_types='user-account')), ('number_of_subkeys', IntegerProperty()), ]) @property def values(self): # Needed because 'values' is a property on collections.Mapping objects return self._inner['values'] class X509V3ExtenstionsType(_STIXBase): _type = 'x509-v3-extensions-type' _properties = OrderedDict() _properties.update([ ('basic_constraints', StringProperty()), ('name_constraints', StringProperty()), ('policy_constraints', StringProperty()), ('key_usage', StringProperty()), ('extended_key_usage', StringProperty()), ('subject_key_identifier', StringProperty()), ('authority_key_identifier', StringProperty()), ('subject_alternative_name', StringProperty()), ('issuer_alternative_name', StringProperty()), ('subject_directory_attributes', StringProperty()), ('crl_distribution_points', StringProperty()), ('inhibit_any_policy', StringProperty()), ('private_key_usage_period_not_before', TimestampProperty()), ('private_key_usage_period_not_after', TimestampProperty()), ('certificate_policies', StringProperty()), ('policy_mappings', StringProperty()), ]) class X509Certificate(_Observable): _type = 'x509-certificate' _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ('is_self_signed', BooleanProperty()), ('hashes', HashesProperty()), ('version', StringProperty()), ('serial_number', StringProperty()), ('signature_algorithm', StringProperty()), ('issuer', StringProperty()), ('validity_not_before', TimestampProperty()), ('validity_not_after', TimestampProperty()), ('subject', StringProperty()), ('subject_public_key_algorithm', StringProperty()), ('subject_public_key_modulus', StringProperty()), ('subject_public_key_exponent', IntegerProperty()), ('x509_v3_extensions', EmbeddedObjectProperty(type=X509V3ExtenstionsType)), ]) OBJ_MAP_OBSERVABLE = { 'artifact': Artifact, 'autonomous-system': AutonomousSystem, 'directory': Directory, 'domain-name': DomainName, 'email-addr': EmailAddress, 'email-message': EmailMessage, 'file': File, 'ipv4-addr': IPv4Address, 'ipv6-addr': IPv6Address, 'mac-addr': MACAddress, 'mutex': Mutex, 'network-traffic': NetworkTraffic, 'process': Process, 'software': Software, 'url': URL, 'user-account': UserAccount, 'windows-registry-key': WindowsRegistryKey, 'x509-certificate': X509Certificate, } EXT_MAP_FILE = { 'archive-ext': ArchiveExt, 'ntfs-ext': NTFSExt, 'pdf-ext': PDFExt, 'raster-image-ext': RasterImageExt, 'windows-pebinary-ext': WindowsPEBinaryExt } EXT_MAP_NETWORK_TRAFFIC = { 'http-request-ext': HTTPRequestExt, 'icmp-ext': ICMPExt, 'socket-ext': SocketExt, 'tcp-ext': TCPExt, } EXT_MAP_PROCESS = { 'windows-process-ext': WindowsProcessExt, 'windows-service-ext': WindowsServiceExt, } EXT_MAP_USER_ACCOUNT = { 'unix-account-ext': UNIXAccountExt, } EXT_MAP = { 'file': EXT_MAP_FILE, 'network-traffic': EXT_MAP_NETWORK_TRAFFIC, 'process': EXT_MAP_PROCESS, 'user-account': EXT_MAP_USER_ACCOUNT, } def parse_observable(data, _valid_refs, allow_custom=False): """Deserialize a string or file-like object into a STIX Cyber Observable object. Args: data: The STIX 2 string to be parsed. _valid_refs: A list of object references valid for the scope of the object being parsed. Use empty list if no valid refs are present. allow_custom: Whether to allow custom properties or not. Default: False. Returns: An instantiated Python STIX Cyber Observable object. """ obj = get_dict(data) obj['_valid_refs'] = _valid_refs if 'type' not in obj: raise ParseError("Can't parse object with no 'type' property: %s" % str(obj)) try: obj_class = OBJ_MAP_OBSERVABLE[obj['type']] except KeyError: raise ParseError("Can't parse unknown object type '%s'! For custom observables, use the CustomObservable decorator." % obj['type']) if 'extensions' in obj and obj['type'] in EXT_MAP: for name, ext in obj['extensions'].items(): if name not in EXT_MAP[obj['type']]: raise ParseError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type'])) ext_class = EXT_MAP[obj['type']][name] obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name]) return obj_class(allow_custom=allow_custom, **obj) def _register_observable(new_observable): """Register a custom STIX Cyber Observable type. """ OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable def CustomObservable(type='x-custom-observable', properties=None): """Custom STIX Cyber Observable type decorator Example 1: @CustomObservable('x-custom-observable', [ ('property1', StringProperty(required=True)), ('property2', IntegerProperty()), ]) class MyNewObservableType(): pass """ def custom_builder(cls): class _Custom(cls, _Observable): _type = type _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), ]) if not properties: raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]") _properties.update(properties) def __init__(self, **kwargs): _Observable.__init__(self, **kwargs) cls.__init__(self, **kwargs) _register_observable(_Custom) return _Custom return custom_builder