"""STIX 2.0 Cyber Observable Objects Embedded observable object types, such as Email MIME Component, which is embedded in Email Message objects, inherit from _STIXBase instead of Observable and do not have a '_type' attribute. """ from .base import _Extension, _Observable, _STIXBase from .exceptions import (AtLeastOnePropertyError, DependentPropertiesError, ParseError) from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty, EmbeddedObjectProperty, EnumProperty, FloatProperty, HashesProperty, HexProperty, IntegerProperty, ListProperty, ObjectReferenceProperty, Property, StringProperty, TimestampProperty, TypeProperty) from .utils import get_dict class ObservableProperty(Property): def clean(self, value): try: dictified = get_dict(value) except ValueError: raise ValueError("The observable property must contain a dictionary") if dictified == {}: raise ValueError("The observable property must contain a non-empty dictionary") valid_refs = dict((k, v['type']) for (k, v) in dictified.items()) # from .__init__ import parse_observable # avoid circular import for key, obj in dictified.items(): parsed_obj = parse_observable(obj, valid_refs) dictified[key] = parsed_obj return dictified class ExtensionsProperty(DictionaryProperty): """ Property for representing extensions on Observable objects """ def __init__(self, enclosing_type=None, required=False): self.enclosing_type = enclosing_type super(ExtensionsProperty, self).__init__(required) def clean(self, value): try: dictified = get_dict(value) except ValueError: raise ValueError("The extensions property must contain a dictionary") if dictified == {}: raise ValueError("The extensions property must contain a non-empty dictionary") if self.enclosing_type in EXT_MAP: specific_type_map = EXT_MAP[self.enclosing_type] for key, subvalue in dictified.items(): if key in specific_type_map: cls = specific_type_map[key] if type(subvalue) is dict: dictified[key] = cls(**subvalue) elif type(subvalue) is cls: dictified[key] = subvalue else: raise ValueError("Cannot determine extension type.") else: raise ValueError("The key used in the extensions dictionary is not an extension type name") else: raise ValueError("The enclosing type '%s' has no extensions defined" % self.enclosing_type) return dictified class Artifact(_Observable): _type = 'artifact' _properties = { 'type': TypeProperty(_type), 'extensions': ExtensionsProperty(enclosing_type=_type), 'mime_type': StringProperty(), 'payload_bin': BinaryProperty(), 'url': StringProperty(), 'hashes': HashesProperty(), } def _check_object_constraints(self): super(Artifact, self)._check_object_constraints() self._check_mutually_exclusive_properties(["payload_bin", "url"]) self._check_properties_dependency(["hashes"], ["url"]) class AutonomousSystem(_Observable): _type = 'autonomous-system' _properties = { 'type': TypeProperty(_type), 'extensions': ExtensionsProperty(enclosing_type=_type), 'number': IntegerProperty(), 'name': StringProperty(), 'rir': StringProperty(), } class Directory(_Observable): _type = 'directory' _properties = { 'type': TypeProperty(_type), 'extensions': ExtensionsProperty(enclosing_type=_type), 'path': StringProperty(required=True), 'path_enc': StringProperty(), # these are not the created/modified timestamps of the object itself 'created': TimestampProperty(), 'modified': TimestampProperty(), 'accessed': TimestampProperty(), 'contains_refs': ListProperty(ObjectReferenceProperty(valid_types=['file', 'directory'])), } class DomainName(_Observable): _type = 'domain-name' _properties = { 'type': TypeProperty(_type), 'extensions': ExtensionsProperty(enclosing_type=_type), 'value': StringProperty(required=True), 'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name'])), } class EmailAddress(_Observable): _type = 'email-addr' _properties = { 'type': TypeProperty(_type), 'extensions': ExtensionsProperty(enclosing_type=_type), 'value': StringProperty(required=True), 'display_name': StringProperty(), 'belongs_to_ref': ObjectReferenceProperty(valid_types='user-account'), } class EmailMIMEComponent(_STIXBase): _properties = { 'body': StringProperty(), 'body_raw_ref': ObjectReferenceProperty(valid_types=['artifact', 'file']), 'content_type': StringProperty(), 'content_disposition': StringProperty(), } def _check_object_constraints(self): super(EmailMIMEComponent, self)._check_object_constraints() self._check_at_least_one_property(["body", "body_raw_ref"]) class EmailMessage(_Observable): _type = 'email-message' _properties = { 'type': TypeProperty(_type), 'extensions': ExtensionsProperty(enclosing_type=_type), 'is_multipart': BooleanProperty(required=True), 'date': TimestampProperty(), 'content_type': StringProperty(), 'from_ref': ObjectReferenceProperty(valid_types='email-addr'), 'sender_ref': ObjectReferenceProperty(valid_types='email-addr'), 'to_refs': ListProperty(ObjectReferenceProperty(valid_types='email-addr')), 'cc_refs': ListProperty(ObjectReferenceProperty(valid_types='email-addr')), 'bcc_refs': ListProperty(ObjectReferenceProperty(valid_types='email-addr')), 'subject': StringProperty(), 'received_lines': ListProperty(StringProperty), 'additional_header_fields': DictionaryProperty(), 'body': StringProperty(), 'body_multipart': ListProperty(EmbeddedObjectProperty(type=EmailMIMEComponent)), 'raw_email_ref': ObjectReferenceProperty(valid_types='artifact'), } def _check_object_constraints(self): super(EmailMessage, self)._check_object_constraints() self._check_properties_dependency(["is_multipart"], ["body_multipart"]) if self.get("is_multipart") is True and self.get("body"): # 'body' MAY only be used if is_multipart is false. raise DependentPropertiesError(self.__class__, [("is_multipart", "body")]) class ArchiveExt(_Extension): _type = 'archive-ext' _properties = { 'contains_refs': ListProperty(ObjectReferenceProperty(valid_types='file'), required=True), 'version': StringProperty(), 'comment': StringProperty(), } class AlternateDataStream(_STIXBase): _properties = { 'name': StringProperty(required=True), 'hashes': HashesProperty(), 'size': IntegerProperty(), } class NTFSExt(_Extension): _type = 'ntfs-ext' _properties = { 'sid': StringProperty(), 'alternate_data_streams': ListProperty(EmbeddedObjectProperty(type=AlternateDataStream)), } class PDFExt(_Extension): _type = 'pdf-ext' _properties = { 'version': StringProperty(), 'is_optimized': BooleanProperty(), 'document_info_dict': DictionaryProperty(), 'pdfid0': StringProperty(), 'pdfid1': StringProperty(), } class RasterImageExt(_Extension): _type = 'raster-image-ext' _properties = { 'image_height': IntegerProperty(), 'image_weight': IntegerProperty(), 'bits_per_pixel': IntegerProperty(), 'image_compression_algorithm': StringProperty(), 'exif_tags': DictionaryProperty(), } class WindowsPEOptionalHeaderType(_STIXBase): _properties = { 'magic_hex': HexProperty(), 'major_linker_version': IntegerProperty(), 'minor_linker_version': IntegerProperty(), 'size_of_code': IntegerProperty(), 'size_of_initialized_data': IntegerProperty(), 'size_of_uninitialized_data': IntegerProperty(), 'address_of_entry_point': IntegerProperty(), 'base_of_code': IntegerProperty(), 'base_of_data': IntegerProperty(), 'image_base': IntegerProperty(), 'section_alignment': IntegerProperty(), 'file_alignment': IntegerProperty(), 'major_os_version': IntegerProperty(), 'minor_os_version': IntegerProperty(), 'major_image_version': IntegerProperty(), 'minor_image_version': IntegerProperty(), 'major_subsystem_version': IntegerProperty(), 'minor_subsystem_version': IntegerProperty(), 'win32_version_value_hex': HexProperty(), 'size_of_image': IntegerProperty(), 'size_of_headers': IntegerProperty(), 'checksum_hex': HexProperty(), 'subsystem_hex': HexProperty(), 'dll_characteristics_hex': HexProperty(), 'size_of_stack_reserve': IntegerProperty(), 'size_of_stack_commit': IntegerProperty(), 'size_of_heap_reserve': IntegerProperty(), 'size_of_heap_commit': IntegerProperty(), 'loader_flags_hex': HexProperty(), 'number_of_rva_and_sizes': IntegerProperty(), 'hashes': HashesProperty(), } def _check_object_constraints(self): super(WindowsPEOptionalHeaderType, self)._check_object_constraints() self._check_at_least_one_property() class WindowsPESection(_STIXBase): _properties = { 'name': StringProperty(required=True), 'size': IntegerProperty(), 'entropy': FloatProperty(), 'hashes': HashesProperty(), } class WindowsPEBinaryExt(_Extension): _type = 'windows-pebinary-ext' _properties = { 'pe_type': StringProperty(required=True), # open_vocab 'imphash': StringProperty(), 'machine_hex': HexProperty(), 'number_of_sections': IntegerProperty(), 'time_date_stamp': TimestampProperty(precision='second'), 'pointer_to_symbol_table_hex': HexProperty(), 'number_of_symbols': IntegerProperty(), 'size_of_optional_header': IntegerProperty(), 'characteristics_hex': HexProperty(), 'file_header_hashes': HashesProperty(), 'optional_header': EmbeddedObjectProperty(type=WindowsPEOptionalHeaderType), 'sections': ListProperty(EmbeddedObjectProperty(type=WindowsPESection)), } class File(_Observable): _type = 'file' _properties = { 'type': TypeProperty(_type), 'extensions': ExtensionsProperty(enclosing_type=_type), 'hashes': HashesProperty(), 'size': IntegerProperty(), 'name': StringProperty(), 'name_enc': StringProperty(), 'magic_number_hex': HexProperty(), 'mime_type': StringProperty(), # these are not the created/modified timestamps of the object itself 'created': TimestampProperty(), 'modified': TimestampProperty(), 'accessed': TimestampProperty(), 'parent_directory_ref': ObjectReferenceProperty(valid_types='directory'), 'is_encrypted': BooleanProperty(), 'encryption_algorithm': StringProperty(), 'decryption_key': StringProperty(), 'contains_refs': ListProperty(ObjectReferenceProperty), 'content_ref': ObjectReferenceProperty(valid_types='artifact'), } def _check_object_constraints(self): super(File, self)._check_object_constraints() self._check_properties_dependency(["is_encrypted"], ["encryption_algorithm", "decryption_key"]) self._check_at_least_one_property(["hashes", "name"]) class IPv4Address(_Observable): _type = 'ipv4-addr' _properties = { 'type': TypeProperty(_type), 'extensions': ExtensionsProperty(enclosing_type=_type), 'value': StringProperty(required=True), 'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types='mac-addr')), 'belongs_to_refs': ListProperty(ObjectReferenceProperty(valid_types='autonomous-system')), } class IPv6Address(_Observable): _type = 'ipv6-addr' _properties = { 'type': TypeProperty(_type), 'extensions': ExtensionsProperty(enclosing_type=_type), 'value': StringProperty(required=True), 'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types='mac-addr')), 'belongs_to_refs': ListProperty(ObjectReferenceProperty(valid_types='autonomous-system')), } class MACAddress(_Observable): _type = 'mac-addr' _properties = { 'type': TypeProperty(_type), 'extensions': ExtensionsProperty(enclosing_type=_type), 'value': StringProperty(required=True), } class Mutex(_Observable): _type = 'mutex' _properties = { 'type': TypeProperty(_type), 'extensions': ExtensionsProperty(enclosing_type=_type), 'name': StringProperty(), } class HTTPRequestExt(_Extension): _type = 'http-request-ext' _properties = { 'request_method': StringProperty(required=True), 'request_value': StringProperty(required=True), 'request_version': StringProperty(), 'request_header': DictionaryProperty(), 'message_body_length': IntegerProperty(), 'message_body_data_ref': ObjectReferenceProperty(valid_types='artifact'), } class ICMPExt(_Extension): _type = 'icmp-ext' _properties = { 'icmp_type_hex': HexProperty(required=True), 'icmp_code_hex': HexProperty(required=True), } class SocketExt(_Extension): _type = 'socket-ext' _properties = { 'address_family': EnumProperty([ "AF_UNSPEC", "AF_INET", "AF_IPX", "AF_APPLETALK", "AF_NETBIOS", "AF_INET6", "AF_IRDA", "AF_BTH", ], required=True), 'is_blocking': BooleanProperty(), 'is_listening': BooleanProperty(), 'protocol_family': EnumProperty([ "PF_INET", "PF_IPX", "PF_APPLETALK", "PF_INET6", "PF_AX25", "PF_NETROM" ]), 'options': DictionaryProperty(), 'socket_type': EnumProperty([ "SOCK_STREAM", "SOCK_DGRAM", "SOCK_RAW", "SOCK_RDM", "SOCK_SEQPACKET", ]), } class TCPExt(_Extension): _type = 'tcp-ext' _properties = { 'src_flags_hex': HexProperty(), 'dst_flags_hex': HexProperty(), } class NetworkTraffic(_Observable): _type = 'network-traffic' _properties = { 'type': TypeProperty(_type), 'extensions': ExtensionsProperty(enclosing_type=_type), 'start': TimestampProperty(), 'end': TimestampProperty(), 'is_active': BooleanProperty(), 'src_ref': ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'mac-addr', 'domain-name']), 'dst_ref': ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'mac-addr', 'domain-name']), 'src_port': IntegerProperty(), 'dst_port': IntegerProperty(), 'protocols': ListProperty(StringProperty, required=True), 'src_byte_count': IntegerProperty(), 'dst_byte_count': IntegerProperty(), 'src_packets': IntegerProperty(), 'dst_packets': IntegerProperty(), 'ipfix': DictionaryProperty(), 'src_payload_ref': ObjectReferenceProperty(valid_types='artifact'), 'dst_payload_ref': ObjectReferenceProperty(valid_types='artifact'), 'encapsulates_refs': ListProperty(ObjectReferenceProperty(valid_types='network-traffic')), 'encapsulates_by_ref': ObjectReferenceProperty(valid_types='network-traffic'), } def _check_object_constraints(self): super(NetworkTraffic, self)._check_object_constraints() self._check_at_least_one_property(["src_ref", "dst_ref"]) class WindowsProcessExt(_Extension): _type = 'windows-process-ext' _properties = { 'aslr_enabled': BooleanProperty(), 'dep_enabled': BooleanProperty(), 'priority': StringProperty(), 'owner_sid': StringProperty(), 'window_title': StringProperty(), 'startup_info': DictionaryProperty(), } class WindowsServiceExt(_Extension): _type = 'windows-service-ext' _properties = { 'service_name': StringProperty(required=True), 'descriptions': ListProperty(StringProperty), 'display_name': StringProperty(), 'group_name': StringProperty(), 'start_type': EnumProperty([ "SERVICE_AUTO_START", "SERVICE_BOOT_START", "SERVICE_DEMAND_START", "SERVICE_DISABLED", "SERVICE_SYSTEM_ALERT", ]), 'service_dll_refs': ListProperty(ObjectReferenceProperty(valid_types='file')), 'service_type': EnumProperty([ "SERVICE_KERNEL_DRIVER", "SERVICE_FILE_SYSTEM_DRIVER", "SERVICE_WIN32_OWN_PROCESS", "SERVICE_WIN32_SHARE_PROCESS", ]), 'service_status': EnumProperty([ "SERVICE_CONTINUE_PENDING", "SERVICE_PAUSE_PENDING", "SERVICE_PAUSED", "SERVICE_RUNNING", "SERVICE_START_PENDING", "SERVICE_STOP_PENDING", "SERVICE_STOPPED", ]), } class Process(_Observable): _type = 'process' _properties = { 'type': TypeProperty(_type), 'extensions': ExtensionsProperty(enclosing_type=_type), 'is_hidden': BooleanProperty(), 'pid': IntegerProperty(), 'name': StringProperty(), # this is not the created timestamps of the object itself 'created': TimestampProperty(), 'cwd': StringProperty(), 'arguments': ListProperty(StringProperty), 'command_line': StringProperty(), 'environment_variables': DictionaryProperty(), 'opened_connection_refs': ListProperty(ObjectReferenceProperty(valid_types='network-traffic')), 'creator_user_ref': ObjectReferenceProperty(valid_types='user-account'), 'binary_ref': ObjectReferenceProperty(valid_types='file'), 'parent_ref': ObjectReferenceProperty(valid_types='process'), 'child_refs': ListProperty(ObjectReferenceProperty('process')), } def _check_object_constraints(self): # no need to check windows-service-ext, since it has a required property super(Process, self)._check_object_constraints() try: self._check_at_least_one_property() if "windows-process-ext" in self.get('extensions', {}): self.extensions["windows-process-ext"]._check_at_least_one_property() except AtLeastOnePropertyError as enclosing_exc: if 'extensions' not in self: raise enclosing_exc else: if "windows-process-ext" in self.get('extensions', {}): self.extensions["windows-process-ext"]._check_at_least_one_property() class Software(_Observable): _type = 'software' _properties = { 'type': TypeProperty(_type), 'extensions': ExtensionsProperty(enclosing_type=_type), 'name': StringProperty(required=True), 'cpe': StringProperty(), 'languages': ListProperty(StringProperty), 'vendor': StringProperty(), 'version': StringProperty(), } class URL(_Observable): _type = 'url' _properties = { 'type': TypeProperty(_type), 'extensions': ExtensionsProperty(enclosing_type=_type), 'value': StringProperty(required=True), } class UNIXAccountExt(_Extension): _type = 'unix-account-ext' _properties = { 'gid': IntegerProperty(), 'groups': ListProperty(StringProperty), 'home_dir': StringProperty(), 'shell': StringProperty(), } class UserAccount(_Observable): _type = 'user-account' _properties = { 'type': TypeProperty(_type), 'extensions': ExtensionsProperty(enclosing_type=_type), 'user_id': StringProperty(required=True), 'account_login': StringProperty(), 'account_type': StringProperty(), # open vocab 'display_name': StringProperty(), 'is_service_account': BooleanProperty(), 'is_privileged': BooleanProperty(), 'can_escalate_privs': BooleanProperty(), 'is_disabled': BooleanProperty(), 'account_created': TimestampProperty(), 'account_expires': TimestampProperty(), 'password_last_changed': TimestampProperty(), 'account_first_login': TimestampProperty(), 'account_last_login': TimestampProperty(), } class WindowsRegistryValueType(_STIXBase): _type = 'windows-registry-value-type' _properties = { 'name': StringProperty(required=True), 'data': StringProperty(), 'data_type': EnumProperty([ 'REG_NONE', 'REG_SZ', 'REG_EXPAND_SZ', 'REG_BINARY', 'REG_DWORD', 'REG_DWORD_BIG_ENDIAN', 'REG_LINK', 'REG_MULTI_SZ', 'REG_RESOURCE_LIST', 'REG_FULL_RESOURCE_DESCRIPTION', 'REG_RESOURCE_REQUIREMENTS_LIST', 'REG_QWORD', 'REG_INVALID_TYPE', ]), } class WindowsRegistryKey(_Observable): _type = 'windows-registry-key' _properties = { 'type': TypeProperty(_type), 'extensions': ExtensionsProperty(enclosing_type=_type), 'key': StringProperty(required=True), 'values': ListProperty(EmbeddedObjectProperty(type=WindowsRegistryValueType)), # this is not the modified timestamps of the object itself 'modified': TimestampProperty(), 'creator_user_ref': ObjectReferenceProperty(valid_types='user-account'), 'number_of_subkeys': IntegerProperty(), } @property def values(self): # Needed because 'values' is a property on collections.Mapping objects return self._inner['values'] class X509V3ExtenstionsType(_STIXBase): _type = 'x509-v3-extensions-type' _properties = { 'basic_constraints': StringProperty(), 'name_constraints': StringProperty(), 'policy_constraints': StringProperty(), 'key_usage': StringProperty(), 'extended_key_usage': StringProperty(), 'subject_key_identifier': StringProperty(), 'authority_key_identifier': StringProperty(), 'subject_alternative_name': StringProperty(), 'issuer_alternative_name': StringProperty(), 'subject_directory_attributes': StringProperty(), 'crl_distribution_points': StringProperty(), 'inhibit_any_policy': StringProperty(), 'private_key_usage_period_not_before': TimestampProperty(), 'private_key_usage_period_not_after': TimestampProperty(), 'certificate_policies': StringProperty(), 'policy_mappings': StringProperty(), } class X509Certificate(_Observable): _type = 'x509-certificate' _properties = { 'type': TypeProperty(_type), 'extensions': ExtensionsProperty(enclosing_type=_type), 'is_self_signed': BooleanProperty(), 'hashes': HashesProperty(), 'version': StringProperty(), 'serial_number': StringProperty(), 'signature_algorithm': StringProperty(), 'issuer': StringProperty(), 'validity_not_before': TimestampProperty(), 'validity_not_after': TimestampProperty(), 'subject': StringProperty(), 'subject_public_key_algorithm': StringProperty(), 'subject_public_key_modulus': StringProperty(), 'subject_public_key_exponent': IntegerProperty(), 'x509_v3_extensions': EmbeddedObjectProperty(type=X509V3ExtenstionsType), } OBJ_MAP_OBSERVABLE = { 'artifact': Artifact, 'autonomous-system': AutonomousSystem, 'directory': Directory, 'domain-name': DomainName, 'email-addr': EmailAddress, 'email-message': EmailMessage, 'file': File, 'ipv4-addr': IPv4Address, 'ipv6-addr': IPv6Address, 'mac-addr': MACAddress, 'mutex': Mutex, 'network-traffic': NetworkTraffic, 'process': Process, 'software': Software, 'url': URL, 'user-account': UserAccount, 'windows-registry-key': WindowsRegistryKey, 'x509-certificate': X509Certificate, } EXT_MAP = { 'file': { 'archive-ext': ArchiveExt, 'ntfs-ext': NTFSExt, 'pdf-ext': PDFExt, 'raster-image-ext': RasterImageExt, 'windows-pebinary-ext': WindowsPEBinaryExt }, 'network-traffic': { 'http-request-ext': HTTPRequestExt, 'icmp-ext': ICMPExt, 'socket-ext': SocketExt, 'tcp-ext': TCPExt, }, 'process': { 'windows-process-ext': WindowsProcessExt, 'windows-service-ext': WindowsServiceExt, }, 'user-account': { 'unix-account-ext': UNIXAccountExt, }, } def parse_observable(data, _valid_refs=[], allow_custom=False): """Deserialize a string or file-like object into a STIX Cyber Observable object. Args: data: The STIX 2 string to be parsed. _valid_refs: A list of object references valid for the scope of the object being parsed. allow_custom: Whether to allow custom properties or not. Default: False. Returns: An instantiated Python STIX Cyber Observable object. """ obj = get_dict(data) obj['_valid_refs'] = _valid_refs if 'type' not in obj: raise ParseError("Can't parse observable with no 'type' property: %s" % str(obj)) try: obj_class = OBJ_MAP_OBSERVABLE[obj['type']] except KeyError: raise ParseError("Can't parse unknown observable type '%s'! For custom observables, " "use the CustomObservable decorator." % obj['type']) if 'extensions' in obj and obj['type'] in EXT_MAP: for name, ext in obj['extensions'].items(): if name not in EXT_MAP[obj['type']]: raise ParseError("Can't parse Unknown extension type '%s' for observable type '%s'!" % (name, obj['type'])) ext_class = EXT_MAP[obj['type']][name] obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name]) return obj_class(allow_custom=allow_custom, **obj) def _register_observable(new_observable): """Register a custom STIX Cyber Observable type. """ OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable def CustomObservable(type='x-custom-observable', properties={}): """Custom STIX Cyber Observable type decorator """ def custom_builder(cls): class _Custom(cls, _Observable): _type = type _properties = { 'type': TypeProperty(_type), } _properties.update(properties) def __init__(self, **kwargs): _Observable.__init__(self, **kwargs) cls.__init__(self, **kwargs) _register_observable(_Custom) return _Custom return custom_builder def _register_extension(observable, new_extension): """Register a custom extension to a STIX Cyber Observable type. """ try: observable_type = observable._type except AttributeError: raise ValueError("Unknown observable type. Custom observables must be " "created with the @CustomObservable decorator.") try: EXT_MAP[observable_type][new_extension._type] = new_extension except KeyError: if observable_type not in OBJ_MAP_OBSERVABLE: raise ValueError("Unknown observable type '%s'. Custom observables " "must be created with the @CustomObservable decorator." % observable_type) else: EXT_MAP[observable_type] = {new_extension._type: new_extension} def CustomExtension(observable=None, type='x-custom-observable', properties={}): """Decorator for custom extensions to STIX Cyber Observables """ if not observable or not issubclass(observable, _Observable): raise ValueError("'observable' must be a valid Observable class!") def custom_builder(cls): class _Custom(cls, _Extension): _type = type _properties = { 'extensions': ExtensionsProperty(enclosing_type=_type), } _properties.update(properties) def __init__(self, **kwargs): _Extension.__init__(self, **kwargs) cls.__init__(self, **kwargs) _register_extension(observable, _Custom) return _Custom return custom_builder