cti-python-stix2/stix2/observables.py

906 lines
33 KiB
Python

"""STIX 2.0 Cyber Observable Objects.
Embedded observable object types, such as Email MIME Component, which is
embedded in Email Message objects, inherit from ``_STIXBase`` instead of
Observable and do not have a ``_type`` attribute.
"""
from collections import OrderedDict
from .base import _Extension, _Observable, _STIXBase
from .exceptions import (AtLeastOnePropertyError, DependentPropertiesError,
ParseError)
from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty,
EmbeddedObjectProperty, EnumProperty, FloatProperty,
HashesProperty, HexProperty, IntegerProperty,
ListProperty, ObjectReferenceProperty, Property,
StringProperty, TimestampProperty, TypeProperty)
from .utils import get_dict
class ObservableProperty(Property):
"""Property for holding Cyber Observable Objects.
"""
def clean(self, value):
try:
dictified = get_dict(value)
except ValueError:
raise ValueError("The observable property must contain a dictionary")
if dictified == {}:
raise ValueError("The observable property must contain a non-empty dictionary")
valid_refs = dict((k, v['type']) for (k, v) in dictified.items())
# from .__init__ import parse_observable # avoid circular import
for key, obj in dictified.items():
parsed_obj = parse_observable(obj, valid_refs)
dictified[key] = parsed_obj
return dictified
class ExtensionsProperty(DictionaryProperty):
"""Property for representing extensions on Observable objects.
"""
def __init__(self, enclosing_type=None, required=False):
self.enclosing_type = enclosing_type
super(ExtensionsProperty, self).__init__(required)
def clean(self, value):
try:
dictified = get_dict(value)
except ValueError:
raise ValueError("The extensions property must contain a dictionary")
if dictified == {}:
raise ValueError("The extensions property must contain a non-empty dictionary")
if self.enclosing_type in EXT_MAP:
specific_type_map = EXT_MAP[self.enclosing_type]
for key, subvalue in dictified.items():
if key in specific_type_map:
cls = specific_type_map[key]
if type(subvalue) is dict:
dictified[key] = cls(**subvalue)
elif type(subvalue) is cls:
dictified[key] = subvalue
else:
raise ValueError("Cannot determine extension type.")
else:
raise ValueError("The key used in the extensions dictionary is not an extension type name")
else:
raise ValueError("The enclosing type '%s' has no extensions defined" % self.enclosing_type)
return dictified
class Artifact(_Observable):
_type = 'artifact'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('mime_type', StringProperty()),
('payload_bin', BinaryProperty()),
('url', StringProperty()),
('hashes', HashesProperty()),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
def _check_object_constraints(self):
super(Artifact, self)._check_object_constraints()
self._check_mutually_exclusive_properties(["payload_bin", "url"])
self._check_properties_dependency(["hashes"], ["url"])
class AutonomousSystem(_Observable):
_type = 'autonomous-system'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('number', IntegerProperty()),
('name', StringProperty()),
('rir', StringProperty()),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
class Directory(_Observable):
_type = 'directory'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('path', StringProperty(required=True)),
('path_enc', StringProperty()),
# these are not the created/modified timestamps of the object itself
('created', TimestampProperty()),
('modified', TimestampProperty()),
('accessed', TimestampProperty()),
('contains_refs', ListProperty(ObjectReferenceProperty(valid_types=['file', 'directory']))),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
class DomainName(_Observable):
_type = 'domain-name'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name']))),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
class EmailAddress(_Observable):
_type = 'email-addr'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('display_name', StringProperty()),
('belongs_to_ref', ObjectReferenceProperty(valid_types='user-account')),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
class EmailMIMEComponent(_STIXBase):
_properties = OrderedDict()
_properties.update([
('body', StringProperty()),
('body_raw_ref', ObjectReferenceProperty(valid_types=['artifact', 'file'])),
('content_type', StringProperty()),
('content_disposition', StringProperty()),
])
def _check_object_constraints(self):
super(EmailMIMEComponent, self)._check_object_constraints()
self._check_at_least_one_property(["body", "body_raw_ref"])
class EmailMessage(_Observable):
_type = 'email-message'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('is_multipart', BooleanProperty(required=True)),
('date', TimestampProperty()),
('content_type', StringProperty()),
('from_ref', ObjectReferenceProperty(valid_types='email-addr')),
('sender_ref', ObjectReferenceProperty(valid_types='email-addr')),
('to_refs', ListProperty(ObjectReferenceProperty(valid_types='email-addr'))),
('cc_refs', ListProperty(ObjectReferenceProperty(valid_types='email-addr'))),
('bcc_refs', ListProperty(ObjectReferenceProperty(valid_types='email-addr'))),
('subject', StringProperty()),
('received_lines', ListProperty(StringProperty)),
('additional_header_fields', DictionaryProperty()),
('body', StringProperty()),
('body_multipart', ListProperty(EmbeddedObjectProperty(type=EmailMIMEComponent))),
('raw_email_ref', ObjectReferenceProperty(valid_types='artifact')),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
def _check_object_constraints(self):
super(EmailMessage, self)._check_object_constraints()
self._check_properties_dependency(["is_multipart"], ["body_multipart"])
if self.get("is_multipart") is True and self.get("body"):
# 'body' MAY only be used if is_multipart is false.
raise DependentPropertiesError(self.__class__, [("is_multipart", "body")])
class ArchiveExt(_Extension):
_type = 'archive-ext'
_properties = OrderedDict()
_properties.update([
('contains_refs', ListProperty(ObjectReferenceProperty(valid_types='file'), required=True)),
('version', StringProperty()),
('comment', StringProperty()),
])
class AlternateDataStream(_STIXBase):
_properties = OrderedDict()
_properties.update([
('name', StringProperty(required=True)),
('hashes', HashesProperty()),
('size', IntegerProperty()),
])
class NTFSExt(_Extension):
_type = 'ntfs-ext'
_properties = OrderedDict()
_properties.update([
('sid', StringProperty()),
('alternate_data_streams', ListProperty(EmbeddedObjectProperty(type=AlternateDataStream))),
])
class PDFExt(_Extension):
_type = 'pdf-ext'
_properties = OrderedDict()
_properties.update([
('version', StringProperty()),
('is_optimized', BooleanProperty()),
('document_info_dict', DictionaryProperty()),
('pdfid0', StringProperty()),
('pdfid1', StringProperty()),
])
class RasterImageExt(_Extension):
_type = 'raster-image-ext'
_properties = OrderedDict()
_properties.update([
('image_height', IntegerProperty()),
('image_weight', IntegerProperty()),
('bits_per_pixel', IntegerProperty()),
('image_compression_algorithm', StringProperty()),
('exif_tags', DictionaryProperty()),
])
class WindowsPEOptionalHeaderType(_STIXBase):
_properties = OrderedDict()
_properties.update([
('magic_hex', HexProperty()),
('major_linker_version', IntegerProperty()),
('minor_linker_version', IntegerProperty()),
('size_of_code', IntegerProperty()),
('size_of_initialized_data', IntegerProperty()),
('size_of_uninitialized_data', IntegerProperty()),
('address_of_entry_point', IntegerProperty()),
('base_of_code', IntegerProperty()),
('base_of_data', IntegerProperty()),
('image_base', IntegerProperty()),
('section_alignment', IntegerProperty()),
('file_alignment', IntegerProperty()),
('major_os_version', IntegerProperty()),
('minor_os_version', IntegerProperty()),
('major_image_version', IntegerProperty()),
('minor_image_version', IntegerProperty()),
('major_subsystem_version', IntegerProperty()),
('minor_subsystem_version', IntegerProperty()),
('win32_version_value_hex', HexProperty()),
('size_of_image', IntegerProperty()),
('size_of_headers', IntegerProperty()),
('checksum_hex', HexProperty()),
('subsystem_hex', HexProperty()),
('dll_characteristics_hex', HexProperty()),
('size_of_stack_reserve', IntegerProperty()),
('size_of_stack_commit', IntegerProperty()),
('size_of_heap_reserve', IntegerProperty()),
('size_of_heap_commit', IntegerProperty()),
('loader_flags_hex', HexProperty()),
('number_of_rva_and_sizes', IntegerProperty()),
('hashes', HashesProperty()),
])
def _check_object_constraints(self):
super(WindowsPEOptionalHeaderType, self)._check_object_constraints()
self._check_at_least_one_property()
class WindowsPESection(_STIXBase):
_properties = OrderedDict()
_properties.update([
('name', StringProperty(required=True)),
('size', IntegerProperty()),
('entropy', FloatProperty()),
('hashes', HashesProperty()),
])
class WindowsPEBinaryExt(_Extension):
_type = 'windows-pebinary-ext'
_properties = OrderedDict()
_properties.update([
('pe_type', StringProperty(required=True)), # open_vocab
('imphash', StringProperty()),
('machine_hex', HexProperty()),
('number_of_sections', IntegerProperty()),
('time_date_stamp', TimestampProperty(precision='second')),
('pointer_to_symbol_table_hex', HexProperty()),
('number_of_symbols', IntegerProperty()),
('size_of_optional_header', IntegerProperty()),
('characteristics_hex', HexProperty()),
('file_header_hashes', HashesProperty()),
('optional_header', EmbeddedObjectProperty(type=WindowsPEOptionalHeaderType)),
('sections', ListProperty(EmbeddedObjectProperty(type=WindowsPESection))),
])
class File(_Observable):
_type = 'file'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('extensions', ExtensionsProperty(enclosing_type=_type)),
('hashes', HashesProperty()),
('size', IntegerProperty()),
('name', StringProperty()),
('name_enc', StringProperty()),
('magic_number_hex', HexProperty()),
('mime_type', StringProperty()),
# these are not the created/modified timestamps of the object itself
('created', TimestampProperty()),
('modified', TimestampProperty()),
('accessed', TimestampProperty()),
('parent_directory_ref', ObjectReferenceProperty(valid_types='directory')),
('is_encrypted', BooleanProperty()),
('encryption_algorithm', StringProperty()),
('decryption_key', StringProperty()),
('contains_refs', ListProperty(ObjectReferenceProperty)),
('content_ref', ObjectReferenceProperty(valid_types='artifact')),
])
def _check_object_constraints(self):
super(File, self)._check_object_constraints()
self._check_properties_dependency(["is_encrypted"], ["encryption_algorithm", "decryption_key"])
self._check_at_least_one_property(["hashes", "name"])
class IPv4Address(_Observable):
_type = 'ipv4-addr'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))),
('belongs_to_refs', ListProperty(ObjectReferenceProperty(valid_types='autonomous-system'))),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
class IPv6Address(_Observable):
_type = 'ipv6-addr'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))),
('belongs_to_refs', ListProperty(ObjectReferenceProperty(valid_types='autonomous-system'))),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
class MACAddress(_Observable):
_type = 'mac-addr'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
class Mutex(_Observable):
_type = 'mutex'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('name', StringProperty()),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
class HTTPRequestExt(_Extension):
_type = 'http-request-ext'
_properties = OrderedDict()
_properties.update([
('request_method', StringProperty(required=True)),
('request_value', StringProperty(required=True)),
('request_version', StringProperty()),
('request_header', DictionaryProperty()),
('message_body_length', IntegerProperty()),
('message_body_data_ref', ObjectReferenceProperty(valid_types='artifact')),
])
class ICMPExt(_Extension):
_type = 'icmp-ext'
_properties = OrderedDict()
_properties.update([
('icmp_type_hex', HexProperty(required=True)),
('icmp_code_hex', HexProperty(required=True)),
])
class SocketExt(_Extension):
_type = 'socket-ext'
_properties = OrderedDict()
_properties.update([
('address_family', EnumProperty([
"AF_UNSPEC",
"AF_INET",
"AF_IPX",
"AF_APPLETALK",
"AF_NETBIOS",
"AF_INET6",
"AF_IRDA",
"AF_BTH",
], required=True)),
('is_blocking', BooleanProperty()),
('is_listening', BooleanProperty()),
('protocol_family', EnumProperty([
"PF_INET",
"PF_IPX",
"PF_APPLETALK",
"PF_INET6",
"PF_AX25",
"PF_NETROM"
])),
('options', DictionaryProperty()),
('socket_type', EnumProperty([
"SOCK_STREAM",
"SOCK_DGRAM",
"SOCK_RAW",
"SOCK_RDM",
"SOCK_SEQPACKET",
])),
])
class TCPExt(_Extension):
_type = 'tcp-ext'
_properties = OrderedDict()
_properties.update([
('src_flags_hex', HexProperty()),
('dst_flags_hex', HexProperty()),
])
class NetworkTraffic(_Observable):
_type = 'network-traffic'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('extensions', ExtensionsProperty(enclosing_type=_type)),
('start', TimestampProperty()),
('end', TimestampProperty()),
('is_active', BooleanProperty()),
('src_ref', ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'mac-addr', 'domain-name'])),
('dst_ref', ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'mac-addr', 'domain-name'])),
('src_port', IntegerProperty()),
('dst_port', IntegerProperty()),
('protocols', ListProperty(StringProperty, required=True)),
('src_byte_count', IntegerProperty()),
('dst_byte_count', IntegerProperty()),
('src_packets', IntegerProperty()),
('dst_packets', IntegerProperty()),
('ipfix', DictionaryProperty()),
('src_payload_ref', ObjectReferenceProperty(valid_types='artifact')),
('dst_payload_ref', ObjectReferenceProperty(valid_types='artifact')),
('encapsulates_refs', ListProperty(ObjectReferenceProperty(valid_types='network-traffic'))),
('encapsulates_by_ref', ObjectReferenceProperty(valid_types='network-traffic')),
])
def _check_object_constraints(self):
super(NetworkTraffic, self)._check_object_constraints()
self._check_at_least_one_property(["src_ref", "dst_ref"])
class WindowsProcessExt(_Extension):
_type = 'windows-process-ext'
_properties = OrderedDict()
_properties.update([
('aslr_enabled', BooleanProperty()),
('dep_enabled', BooleanProperty()),
('priority', StringProperty()),
('owner_sid', StringProperty()),
('window_title', StringProperty()),
('startup_info', DictionaryProperty()),
])
class WindowsServiceExt(_Extension):
_type = 'windows-service-ext'
_properties = OrderedDict()
_properties.update([
('service_name', StringProperty(required=True)),
('descriptions', ListProperty(StringProperty)),
('display_name', StringProperty()),
('group_name', StringProperty()),
('start_type', EnumProperty([
"SERVICE_AUTO_START",
"SERVICE_BOOT_START",
"SERVICE_DEMAND_START",
"SERVICE_DISABLED",
"SERVICE_SYSTEM_ALERT",
])),
('service_dll_refs', ListProperty(ObjectReferenceProperty(valid_types='file'))),
('service_type', EnumProperty([
"SERVICE_KERNEL_DRIVER",
"SERVICE_FILE_SYSTEM_DRIVER",
"SERVICE_WIN32_OWN_PROCESS",
"SERVICE_WIN32_SHARE_PROCESS",
])),
('service_status', EnumProperty([
"SERVICE_CONTINUE_PENDING",
"SERVICE_PAUSE_PENDING",
"SERVICE_PAUSED",
"SERVICE_RUNNING",
"SERVICE_START_PENDING",
"SERVICE_STOP_PENDING",
"SERVICE_STOPPED",
])),
])
class Process(_Observable):
_type = 'process'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('extensions', ExtensionsProperty(enclosing_type=_type)),
('is_hidden', BooleanProperty()),
('pid', IntegerProperty()),
('name', StringProperty()),
# this is not the created timestamps of the object itself
('created', TimestampProperty()),
('cwd', StringProperty()),
('arguments', ListProperty(StringProperty)),
('command_line', StringProperty()),
('environment_variables', DictionaryProperty()),
('opened_connection_refs', ListProperty(ObjectReferenceProperty(valid_types='network-traffic'))),
('creator_user_ref', ObjectReferenceProperty(valid_types='user-account')),
('binary_ref', ObjectReferenceProperty(valid_types='file')),
('parent_ref', ObjectReferenceProperty(valid_types='process')),
('child_refs', ListProperty(ObjectReferenceProperty('process'))),
])
def _check_object_constraints(self):
# no need to check windows-service-ext, since it has a required property
super(Process, self)._check_object_constraints()
try:
self._check_at_least_one_property()
if "windows-process-ext" in self.get('extensions', {}):
self.extensions["windows-process-ext"]._check_at_least_one_property()
except AtLeastOnePropertyError as enclosing_exc:
if 'extensions' not in self:
raise enclosing_exc
else:
if "windows-process-ext" in self.get('extensions', {}):
self.extensions["windows-process-ext"]._check_at_least_one_property()
class Software(_Observable):
_type = 'software'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('name', StringProperty(required=True)),
('cpe', StringProperty()),
('languages', ListProperty(StringProperty)),
('vendor', StringProperty()),
('version', StringProperty()),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
class URL(_Observable):
_type = 'url'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
class UNIXAccountExt(_Extension):
_type = 'unix-account-ext'
_properties = OrderedDict()
_properties.update([
('gid', IntegerProperty()),
('groups', ListProperty(StringProperty)),
('home_dir', StringProperty()),
('shell', StringProperty()),
])
class UserAccount(_Observable):
_type = 'user-account'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('extensions', ExtensionsProperty(enclosing_type=_type)),
('user_id', StringProperty(required=True)),
('account_login', StringProperty()),
('account_type', StringProperty()), # open vocab
('display_name', StringProperty()),
('is_service_account', BooleanProperty()),
('is_privileged', BooleanProperty()),
('can_escalate_privs', BooleanProperty()),
('is_disabled', BooleanProperty()),
('account_created', TimestampProperty()),
('account_expires', TimestampProperty()),
('password_last_changed', TimestampProperty()),
('account_first_login', TimestampProperty()),
('account_last_login', TimestampProperty()),
])
class WindowsRegistryValueType(_STIXBase):
_type = 'windows-registry-value-type'
_properties = OrderedDict()
_properties.update([
('name', StringProperty(required=True)),
('data', StringProperty()),
('data_type', EnumProperty([
'REG_NONE',
'REG_SZ',
'REG_EXPAND_SZ',
'REG_BINARY',
'REG_DWORD',
'REG_DWORD_BIG_ENDIAN',
'REG_LINK',
'REG_MULTI_SZ',
'REG_RESOURCE_LIST',
'REG_FULL_RESOURCE_DESCRIPTION',
'REG_RESOURCE_REQUIREMENTS_LIST',
'REG_QWORD',
'REG_INVALID_TYPE',
])),
])
class WindowsRegistryKey(_Observable):
_type = 'windows-registry-key'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('key', StringProperty(required=True)),
('values', ListProperty(EmbeddedObjectProperty(type=WindowsRegistryValueType))),
# this is not the modified timestamps of the object itself
('modified', TimestampProperty()),
('creator_user_ref', ObjectReferenceProperty(valid_types='user-account')),
('number_of_subkeys', IntegerProperty()),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
@property
def values(self):
# Needed because 'values' is a property on collections.Mapping objects
return self._inner['values']
class X509V3ExtenstionsType(_STIXBase):
_type = 'x509-v3-extensions-type'
_properties = OrderedDict()
_properties.update([
('basic_constraints', StringProperty()),
('name_constraints', StringProperty()),
('policy_constraints', StringProperty()),
('key_usage', StringProperty()),
('extended_key_usage', StringProperty()),
('subject_key_identifier', StringProperty()),
('authority_key_identifier', StringProperty()),
('subject_alternative_name', StringProperty()),
('issuer_alternative_name', StringProperty()),
('subject_directory_attributes', StringProperty()),
('crl_distribution_points', StringProperty()),
('inhibit_any_policy', StringProperty()),
('private_key_usage_period_not_before', TimestampProperty()),
('private_key_usage_period_not_after', TimestampProperty()),
('certificate_policies', StringProperty()),
('policy_mappings', StringProperty()),
])
class X509Certificate(_Observable):
_type = 'x509-certificate'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('is_self_signed', BooleanProperty()),
('hashes', HashesProperty()),
('version', StringProperty()),
('serial_number', StringProperty()),
('signature_algorithm', StringProperty()),
('issuer', StringProperty()),
('validity_not_before', TimestampProperty()),
('validity_not_after', TimestampProperty()),
('subject', StringProperty()),
('subject_public_key_algorithm', StringProperty()),
('subject_public_key_modulus', StringProperty()),
('subject_public_key_exponent', IntegerProperty()),
('x509_v3_extensions', EmbeddedObjectProperty(type=X509V3ExtenstionsType)),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
OBJ_MAP_OBSERVABLE = {
'artifact': Artifact,
'autonomous-system': AutonomousSystem,
'directory': Directory,
'domain-name': DomainName,
'email-addr': EmailAddress,
'email-message': EmailMessage,
'file': File,
'ipv4-addr': IPv4Address,
'ipv6-addr': IPv6Address,
'mac-addr': MACAddress,
'mutex': Mutex,
'network-traffic': NetworkTraffic,
'process': Process,
'software': Software,
'url': URL,
'user-account': UserAccount,
'windows-registry-key': WindowsRegistryKey,
'x509-certificate': X509Certificate,
}
EXT_MAP = {
'file': {
'archive-ext': ArchiveExt,
'ntfs-ext': NTFSExt,
'pdf-ext': PDFExt,
'raster-image-ext': RasterImageExt,
'windows-pebinary-ext': WindowsPEBinaryExt
},
'network-traffic': {
'http-request-ext': HTTPRequestExt,
'icmp-ext': ICMPExt,
'socket-ext': SocketExt,
'tcp-ext': TCPExt,
},
'process': {
'windows-process-ext': WindowsProcessExt,
'windows-service-ext': WindowsServiceExt,
},
'user-account': {
'unix-account-ext': UNIXAccountExt,
},
}
def parse_observable(data, _valid_refs=None, allow_custom=False):
"""Deserialize a string or file-like object into a STIX Cyber Observable
object.
Args:
data: The STIX 2 string to be parsed.
_valid_refs: A list of object references valid for the scope of the
object being parsed. Use empty list if no valid refs are present.
allow_custom: Whether to allow custom properties or not.
Default: False.
Returns:
An instantiated Python STIX Cyber Observable object.
"""
obj = get_dict(data)
obj['_valid_refs'] = _valid_refs or []
if 'type' not in obj:
raise ParseError("Can't parse observable with no 'type' property: %s" % str(obj))
try:
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
except KeyError:
raise ParseError("Can't parse unknown observable type '%s'! For custom observables, "
"use the CustomObservable decorator." % obj['type'])
if 'extensions' in obj and obj['type'] in EXT_MAP:
for name, ext in obj['extensions'].items():
if name not in EXT_MAP[obj['type']]:
raise ParseError("Can't parse Unknown extension type '%s' for observable type '%s'!" % (name, obj['type']))
ext_class = EXT_MAP[obj['type']][name]
obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name])
return obj_class(allow_custom=allow_custom, **obj)
def _register_observable(new_observable):
"""Register a custom STIX Cyber Observable type.
"""
OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable
def CustomObservable(type='x-custom-observable', properties=None):
"""Custom STIX Cyber Observable Object type decorator.
Example:
>>> @CustomObservable('x-custom-observable', [
... ('property1', StringProperty(required=True)),
... ('property2', IntegerProperty()),
... ])
... class MyNewObservableType():
... pass
"""
def custom_builder(cls):
class _Custom(cls, _Observable):
_type = type
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
])
if not properties or not isinstance(properties, list):
raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]")
# Check properties ending in "_ref/s" are ObjectReferenceProperties
for prop_name, prop in properties:
if prop_name.endswith('_ref') and not isinstance(prop, ObjectReferenceProperty):
raise ValueError("'%s' is named like an object reference property but "
"is not an ObjectReferenceProperty." % prop_name)
elif (prop_name.endswith('_refs') and (not isinstance(prop, ListProperty)
or not isinstance(prop.contained, ObjectReferenceProperty))):
raise ValueError("'%s' is named like an object reference list property but "
"is not a ListProperty containing ObjectReferenceProperty." % prop_name)
_properties.update(properties)
def __init__(self, **kwargs):
_Observable.__init__(self, **kwargs)
try:
cls.__init__(self, **kwargs)
except (AttributeError, TypeError) as e:
# Don't accidentally catch errors raised in a custom __init__()
if ("has no attribute '__init__'" in str(e) or
str(e) == "object.__init__() takes no parameters"):
return
raise e
_register_observable(_Custom)
return _Custom
return custom_builder
def _register_extension(observable, new_extension):
"""Register a custom extension to a STIX Cyber Observable type.
"""
try:
observable_type = observable._type
except AttributeError:
raise ValueError("Unknown observable type. Custom observables must be "
"created with the @CustomObservable decorator.")
try:
EXT_MAP[observable_type][new_extension._type] = new_extension
except KeyError:
if observable_type not in OBJ_MAP_OBSERVABLE:
raise ValueError("Unknown observable type '%s'. Custom observables "
"must be created with the @CustomObservable decorator."
% observable_type)
else:
EXT_MAP[observable_type] = {new_extension._type: new_extension}
def CustomExtension(observable=None, type='x-custom-observable', properties={}):
"""Decorator for custom extensions to STIX Cyber Observables.
"""
if not observable or not issubclass(observable, _Observable):
raise ValueError("'observable' must be a valid Observable class!")
def custom_builder(cls):
class _Custom(cls, _Extension):
_type = type
_properties = {
'extensions': ExtensionsProperty(enclosing_type=_type),
}
_properties.update(properties)
def __init__(self, **kwargs):
_Extension.__init__(self, **kwargs)
try:
cls.__init__(self, **kwargs)
except (AttributeError, TypeError) as e:
# Don't accidentally catch errors raised in a custom __init__()
if ("has no attribute '__init__'" in str(e) or
str(e) == "object.__init__() takes no parameters"):
return
raise e
_register_extension(observable, _Custom)
return _Custom
return custom_builder