Apply OrderedDict changes to Observables.

stix2.1
Emmanuelle Vargas-Gonzalez 2017-08-14 10:29:17 -04:00
parent 1329e2e76f
commit e2c9ecccaf
1 changed files with 402 additions and 350 deletions

View File

@ -5,6 +5,8 @@ embedded in Email Message objects, inherit from _STIXBase instead of Observable
and do not have a '_type' attribute. and do not have a '_type' attribute.
""" """
from collections import OrderedDict
from .base import _Extension, _Observable, _STIXBase from .base import _Extension, _Observable, _STIXBase
from .exceptions import (AtLeastOnePropertyError, DependentPropertiesError, from .exceptions import (AtLeastOnePropertyError, DependentPropertiesError,
ParseError) ParseError)
@ -75,13 +77,14 @@ class ExtensionsProperty(DictionaryProperty):
class Artifact(_Observable): class Artifact(_Observable):
_type = 'artifact' _type = 'artifact'
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
'mime_type': StringProperty(), ('type', TypeProperty(_type)),
'payload_bin': BinaryProperty(), ('mime_type', StringProperty()),
'url': StringProperty(), ('payload_bin', BinaryProperty()),
'hashes': HashesProperty(), ('url', StringProperty()),
} ('hashes', HashesProperty()),
])
def _check_object_constraints(self): def _check_object_constraints(self):
super(Artifact, self)._check_object_constraints() super(Artifact, self)._check_object_constraints()
@ -91,54 +94,59 @@ class Artifact(_Observable):
class AutonomousSystem(_Observable): class AutonomousSystem(_Observable):
_type = 'autonomous-system' _type = 'autonomous-system'
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
'number': IntegerProperty(), ('type', TypeProperty(_type)),
'name': StringProperty(), ('number', IntegerProperty()),
'rir': StringProperty(), ('name', StringProperty()),
} ('rir', StringProperty()),
])
class Directory(_Observable): class Directory(_Observable):
_type = 'directory' _type = 'directory'
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
'path': StringProperty(required=True), ('type', TypeProperty(_type)),
'path_enc': StringProperty(), ('path', StringProperty(required=True)),
('path_enc', StringProperty()),
# these are not the created/modified timestamps of the object itself # these are not the created/modified timestamps of the object itself
'created': TimestampProperty(), ('created', TimestampProperty()),
'modified': TimestampProperty(), ('modified', TimestampProperty()),
'accessed': TimestampProperty(), ('accessed', TimestampProperty()),
'contains_refs': ListProperty(ObjectReferenceProperty(valid_types=['file', 'directory'])), ('contains_refs', ListProperty(ObjectReferenceProperty(valid_types=['file', 'directory']))),
} ])
class DomainName(_Observable): class DomainName(_Observable):
_type = 'domain-name' _type = 'domain-name'
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
'value': StringProperty(required=True), ('type', TypeProperty(_type)),
'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name'])), ('value', StringProperty(required=True)),
} ('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name']))),
])
class EmailAddress(_Observable): class EmailAddress(_Observable):
_type = 'email-addr' _type = 'email-addr'
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
'value': StringProperty(required=True), ('type', TypeProperty(_type)),
'display_name': StringProperty(), ('value', StringProperty(required=True)),
'belongs_to_ref': ObjectReferenceProperty(valid_types='user-account'), ('display_name', StringProperty()),
} ('belongs_to_ref', ObjectReferenceProperty(valid_types='user-account')),
])
class EmailMIMEComponent(_STIXBase): class EmailMIMEComponent(_STIXBase):
_properties = { _properties = OrderedDict()
'body': StringProperty(), _properties = _properties.update([
'body_raw_ref': ObjectReferenceProperty(valid_types=['artifact', 'file']), ('body', StringProperty()),
'content_type': StringProperty(), ('body_raw_ref', ObjectReferenceProperty(valid_types=['artifact', 'file'])),
'content_disposition': StringProperty(), ('content_type', StringProperty()),
} ('content_disposition', StringProperty()),
])
def _check_object_constraints(self): def _check_object_constraints(self):
super(EmailMIMEComponent, self)._check_object_constraints() super(EmailMIMEComponent, self)._check_object_constraints()
@ -147,23 +155,24 @@ class EmailMIMEComponent(_STIXBase):
class EmailMessage(_Observable): class EmailMessage(_Observable):
_type = 'email-message' _type = 'email-message'
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
'is_multipart': BooleanProperty(required=True), ('type', TypeProperty(_type)),
'date': TimestampProperty(), ('is_multipart', BooleanProperty(required=True)),
'content_type': StringProperty(), ('date', TimestampProperty()),
'from_ref': ObjectReferenceProperty(valid_types='email-addr'), ('content_type', StringProperty()),
'sender_ref': ObjectReferenceProperty(valid_types='email-addr'), ('from_ref', ObjectReferenceProperty(valid_types='email-addr')),
'to_refs': ListProperty(ObjectReferenceProperty(valid_types='email-addr')), ('sender_ref', ObjectReferenceProperty(valid_types='email-addr')),
'cc_refs': ListProperty(ObjectReferenceProperty(valid_types='email-addr')), ('to_refs', ListProperty(ObjectReferenceProperty(valid_types='email-addr'))),
'bcc_refs': ListProperty(ObjectReferenceProperty(valid_types='email-addr')), ('cc_refs', ListProperty(ObjectReferenceProperty(valid_types='email-addr'))),
'subject': StringProperty(), ('bcc_refs', ListProperty(ObjectReferenceProperty(valid_types='email-addr'))),
'received_lines': ListProperty(StringProperty), ('subject', StringProperty()),
'additional_header_fields': DictionaryProperty(), ('received_lines', ListProperty(StringProperty)),
'body': StringProperty(), ('additional_header_fields', DictionaryProperty()),
'body_multipart': ListProperty(EmbeddedObjectProperty(type=EmailMIMEComponent)), ('body', StringProperty()),
'raw_email_ref': ObjectReferenceProperty(valid_types='artifact'), ('body_multipart', ListProperty(EmbeddedObjectProperty(type=EmailMIMEComponent))),
} ('raw_email_ref', ObjectReferenceProperty(valid_types='artifact')),
])
def _check_object_constraints(self): def _check_object_constraints(self):
super(EmailMessage, self)._check_object_constraints() super(EmailMessage, self)._check_object_constraints()
@ -174,82 +183,88 @@ class EmailMessage(_Observable):
class ArchiveExt(_Extension): class ArchiveExt(_Extension):
_properties = { _properties = OrderedDict()
'contains_refs': ListProperty(ObjectReferenceProperty(valid_types='file'), required=True), _properties = _properties.update([
'version': StringProperty(), ('contains_refs', ListProperty(ObjectReferenceProperty(valid_types='file'), required=True)),
'comment': StringProperty(), ('version', StringProperty()),
} ('comment', StringProperty()),
])
class AlternateDataStream(_STIXBase): class AlternateDataStream(_STIXBase):
_properties = { _properties = OrderedDict()
'name': StringProperty(required=True), _properties = _properties.update([
'hashes': HashesProperty(), ('name', StringProperty(required=True)),
'size': IntegerProperty(), ('hashes', HashesProperty()),
} ('size', IntegerProperty()),
])
class NTFSExt(_Extension): class NTFSExt(_Extension):
_properties = { _properties = OrderedDict()
'sid': StringProperty(), _properties = _properties.update([
'alternate_data_streams': ListProperty(EmbeddedObjectProperty(type=AlternateDataStream)), ('sid', StringProperty()),
} ('alternate_data_streams', ListProperty(EmbeddedObjectProperty(type=AlternateDataStream))),
])
class PDFExt(_Extension): class PDFExt(_Extension):
_properties = { _properties = OrderedDict()
'version': StringProperty(), _properties = _properties.update([
'is_optimized': BooleanProperty(), ('version', StringProperty()),
'document_info_dict': DictionaryProperty(), ('is_optimized', BooleanProperty()),
'pdfid0': StringProperty(), ('document_info_dict', DictionaryProperty()),
'pdfid1': StringProperty(), ('pdfid0', StringProperty()),
} ('pdfid1', StringProperty()),
])
class RasterImageExt(_Extension): class RasterImageExt(_Extension):
_properties = { _properties = OrderedDict()
'image_height': IntegerProperty(), _properties = _properties.update([
'image_weight': IntegerProperty(), ('image_height', IntegerProperty()),
'bits_per_pixel': IntegerProperty(), ('image_weight', IntegerProperty()),
'image_compression_algorithm': StringProperty(), ('bits_per_pixel', IntegerProperty()),
'exif_tags': DictionaryProperty(), ('image_compression_algorithm', StringProperty()),
} ('exif_tags', DictionaryProperty()),
])
class WindowsPEOptionalHeaderType(_STIXBase): class WindowsPEOptionalHeaderType(_STIXBase):
_properties = { _properties = OrderedDict()
'magic_hex': HexProperty(), _properties = _properties.update([
'major_linker_version': IntegerProperty(), ('magic_hex', HexProperty()),
'minor_linker_version': IntegerProperty(), ('major_linker_version', IntegerProperty()),
'size_of_code': IntegerProperty(), ('minor_linker_version', IntegerProperty()),
'size_of_initialized_data': IntegerProperty(), ('size_of_code', IntegerProperty()),
'size_of_uninitialized_data': IntegerProperty(), ('size_of_initialized_data', IntegerProperty()),
'address_of_entry_point': IntegerProperty(), ('size_of_uninitialized_data', IntegerProperty()),
'base_of_code': IntegerProperty(), ('address_of_entry_point', IntegerProperty()),
'base_of_data': IntegerProperty(), ('base_of_code', IntegerProperty()),
'image_base': IntegerProperty(), ('base_of_data', IntegerProperty()),
'section_alignment': IntegerProperty(), ('image_base', IntegerProperty()),
'file_alignment': IntegerProperty(), ('section_alignment', IntegerProperty()),
'major_os_version': IntegerProperty(), ('file_alignment', IntegerProperty()),
'minor_os_version': IntegerProperty(), ('major_os_version', IntegerProperty()),
'major_image_version': IntegerProperty(), ('minor_os_version', IntegerProperty()),
'minor_image_version': IntegerProperty(), ('major_image_version', IntegerProperty()),
'major_subsystem_version': IntegerProperty(), ('minor_image_version', IntegerProperty()),
'minor_subsystem_version': IntegerProperty(), ('major_subsystem_version', IntegerProperty()),
'win32_version_value_hex': HexProperty(), ('minor_subsystem_version', IntegerProperty()),
'size_of_image': IntegerProperty(), ('win32_version_value_hex', HexProperty()),
'size_of_headers': IntegerProperty(), ('size_of_image', IntegerProperty()),
'checksum_hex': HexProperty(), ('size_of_headers', IntegerProperty()),
'subsystem_hex': HexProperty(), ('checksum_hex', HexProperty()),
'dll_characteristics_hex': HexProperty(), ('subsystem_hex', HexProperty()),
'size_of_stack_reserve': IntegerProperty(), ('dll_characteristics_hex', HexProperty()),
'size_of_stack_commit': IntegerProperty(), ('size_of_stack_reserve', IntegerProperty()),
'size_of_heap_reserve': IntegerProperty(), ('size_of_stack_commit', IntegerProperty()),
'size_of_heap_commit': IntegerProperty(), ('size_of_heap_reserve', IntegerProperty()),
'loader_flags_hex': HexProperty(), ('size_of_heap_commit', IntegerProperty()),
'number_of_rva_and_sizes': IntegerProperty(), ('loader_flags_hex', HexProperty()),
'hashes': HashesProperty(), ('number_of_rva_and_sizes', IntegerProperty()),
} ('hashes', HashesProperty()),
])
def _check_object_constraints(self): def _check_object_constraints(self):
super(WindowsPEOptionalHeaderType, self)._check_object_constraints() super(WindowsPEOptionalHeaderType, self)._check_object_constraints()
@ -257,53 +272,56 @@ class WindowsPEOptionalHeaderType(_STIXBase):
class WindowsPESection(_STIXBase): class WindowsPESection(_STIXBase):
_properties = { _properties = OrderedDict()
'name': StringProperty(required=True), _properties = _properties.update([
'size': IntegerProperty(), ('name', StringProperty(required=True)),
'entropy': FloatProperty(), ('size', IntegerProperty()),
'hashes': HashesProperty(), ('entropy', FloatProperty()),
} ('hashes', HashesProperty()),
])
class WindowsPEBinaryExt(_Extension): class WindowsPEBinaryExt(_Extension):
_properties = { _properties = OrderedDict()
'pe_type': StringProperty(required=True), # open_vocab _properties = _properties.update([
'imphash': StringProperty(), ('pe_type', StringProperty(required=True)), # open_vocab
'machine_hex': HexProperty(), ('imphash', StringProperty()),
'number_of_sections': IntegerProperty(), ('machine_hex', HexProperty()),
'time_date_stamp': TimestampProperty(precision='second'), ('number_of_sections', IntegerProperty()),
'pointer_to_symbol_table_hex': HexProperty(), ('time_date_stamp', TimestampProperty(precision='second')),
'number_of_symbols': IntegerProperty(), ('pointer_to_symbol_table_hex', HexProperty()),
'size_of_optional_header': IntegerProperty(), ('number_of_symbols', IntegerProperty()),
'characteristics_hex': HexProperty(), ('size_of_optional_header', IntegerProperty()),
'file_header_hashes': HashesProperty(), ('characteristics_hex', HexProperty()),
'optional_header': EmbeddedObjectProperty(type=WindowsPEOptionalHeaderType), ('file_header_hashes', HashesProperty()),
'sections': ListProperty(EmbeddedObjectProperty(type=WindowsPESection)), ('optional_header', EmbeddedObjectProperty(type=WindowsPEOptionalHeaderType)),
} ('sections', ListProperty(EmbeddedObjectProperty(type=WindowsPESection))),
])
class File(_Observable): class File(_Observable):
_type = 'file' _type = 'file'
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
'extensions': ExtensionsProperty(enclosing_type=_type), ('type', TypeProperty(_type)),
'hashes': HashesProperty(), ('extensions', ExtensionsProperty(enclosing_type=_type)),
'size': IntegerProperty(), ('hashes', HashesProperty()),
'name': StringProperty(), ('size', IntegerProperty()),
'name_enc': StringProperty(), ('name', StringProperty()),
'magic_number_hex': HexProperty(), ('name_enc', StringProperty()),
'mime_type': StringProperty(), ('magic_number_hex', HexProperty()),
('mime_type', StringProperty()),
# these are not the created/modified timestamps of the object itself # these are not the created/modified timestamps of the object itself
'created': TimestampProperty(), ('created', TimestampProperty()),
'modified': TimestampProperty(), ('modified', TimestampProperty()),
'accessed': TimestampProperty(), ('accessed', TimestampProperty()),
'parent_directory_ref': ObjectReferenceProperty(valid_types='directory'), ('parent_directory_ref', ObjectReferenceProperty(valid_types='directory')),
'is_encrypted': BooleanProperty(), ('is_encrypted', BooleanProperty()),
'encryption_algorithm': StringProperty(), ('encryption_algorithm', StringProperty()),
'decryption_key': StringProperty(), ('decryption_key', StringProperty()),
'contains_refs': ListProperty(ObjectReferenceProperty), ('contains_refs', ListProperty(ObjectReferenceProperty)),
'content_ref': ObjectReferenceProperty(valid_types='artifact'), ('content_ref', ObjectReferenceProperty(valid_types='artifact')),
} ])
def _check_object_constraints(self): def _check_object_constraints(self):
super(File, self)._check_object_constraints() super(File, self)._check_object_constraints()
@ -313,61 +331,68 @@ class File(_Observable):
class IPv4Address(_Observable): class IPv4Address(_Observable):
_type = 'ipv4-addr' _type = 'ipv4-addr'
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
'value': StringProperty(required=True), ('type', TypeProperty(_type)),
'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types='mac-addr')), ('value', StringProperty(required=True)),
'belongs_to_refs': ListProperty(ObjectReferenceProperty(valid_types='autonomous-system')), ('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))),
} ('belongs_to_refs', ListProperty(ObjectReferenceProperty(valid_types='autonomous-system'))),
])
class IPv6Address(_Observable): class IPv6Address(_Observable):
_type = 'ipv6-addr' _type = 'ipv6-addr'
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
'value': StringProperty(required=True), ('type', TypeProperty(_type)),
'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types='mac-addr')), ('value', StringProperty(required=True)),
'belongs_to_refs': ListProperty(ObjectReferenceProperty(valid_types='autonomous-system')), ('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))),
} ('belongs_to_refs', ListProperty(ObjectReferenceProperty(valid_types='autonomous-system'))),
])
class MACAddress(_Observable): class MACAddress(_Observable):
_type = 'mac-addr' _type = 'mac-addr'
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
'value': StringProperty(required=True), ('type', TypeProperty(_type)),
} ('value', StringProperty(required=True)),
])
class Mutex(_Observable): class Mutex(_Observable):
_type = 'mutex' _type = 'mutex'
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
'name': StringProperty(), ('type', TypeProperty(_type)),
} ('name', StringProperty()),
])
class HTTPRequestExt(_Extension): class HTTPRequestExt(_Extension):
_properties = { _properties = OrderedDict()
'request_method': StringProperty(required=True), _properties = _properties.update([
'request_value': StringProperty(required=True), ('request_method', StringProperty(required=True)),
'request_version': StringProperty(), ('request_value', StringProperty(required=True)),
'request_header': DictionaryProperty(), ('request_version', StringProperty()),
'message_body_length': IntegerProperty(), ('request_header', DictionaryProperty()),
'message_body_data_ref': ObjectReferenceProperty(valid_types='artifact'), ('message_body_length', IntegerProperty()),
} ('message_body_data_ref', ObjectReferenceProperty(valid_types='artifact')),
])
class ICMPExt(_Extension): class ICMPExt(_Extension):
_properties = { _properties = OrderedDict()
'icmp_type_hex': HexProperty(required=True), _properties = _properties.update([
'icmp_code_hex': HexProperty(required=True), ('icmp_type_hex', HexProperty(required=True)),
} ('icmp_code_hex', HexProperty(required=True)),
])
class SocketExt(_Extension): class SocketExt(_Extension):
_properties = { _properties = OrderedDict()
'address_family': EnumProperty([ _properties = _properties.update([
('address_family', EnumProperty([
"AF_UNSPEC", "AF_UNSPEC",
"AF_INET", "AF_INET",
"AF_IPX", "AF_IPX",
@ -376,58 +401,60 @@ class SocketExt(_Extension):
"AF_INET6", "AF_INET6",
"AF_IRDA", "AF_IRDA",
"AF_BTH", "AF_BTH",
], required=True), ], required=True)),
'is_blocking': BooleanProperty(), ('is_blocking', BooleanProperty()),
'is_listening': BooleanProperty(), ('is_listening', BooleanProperty()),
'protocol_family': EnumProperty([ ('protocol_family', EnumProperty([
"PF_INET", "PF_INET",
"PF_IPX", "PF_IPX",
"PF_APPLETALK", "PF_APPLETALK",
"PF_INET6", "PF_INET6",
"PF_AX25", "PF_AX25",
"PF_NETROM" "PF_NETROM"
]), ])),
'options': DictionaryProperty(), ('options', DictionaryProperty()),
'socket_type': EnumProperty([ ('socket_type', EnumProperty([
"SOCK_STREAM", "SOCK_STREAM",
"SOCK_DGRAM", "SOCK_DGRAM",
"SOCK_RAW", "SOCK_RAW",
"SOCK_RDM", "SOCK_RDM",
"SOCK_SEQPACKET", "SOCK_SEQPACKET",
]), ])),
} ])
class TCPExt(_Extension): class TCPExt(_Extension):
_properties = { _properties = OrderedDict()
'src_flags_hex': HexProperty(), _properties = _properties.update([
'dst_flags_hex': HexProperty(), ('src_flags_hex', HexProperty()),
} ('dst_flags_hex', HexProperty()),
])
class NetworkTraffic(_Observable): class NetworkTraffic(_Observable):
_type = 'network-traffic' _type = 'network-traffic'
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
'extensions': ExtensionsProperty(enclosing_type=_type), ('type', TypeProperty(_type)),
'start': TimestampProperty(), ('extensions', ExtensionsProperty(enclosing_type=_type)),
'end': TimestampProperty(), ('start', TimestampProperty()),
'is_active': BooleanProperty(), ('end', TimestampProperty()),
'src_ref': ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'mac-addr', 'domain-name']), ('is_active', BooleanProperty()),
'dst_ref': ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'mac-addr', 'domain-name']), ('src_ref', ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'mac-addr', 'domain-name'])),
'src_port': IntegerProperty(), ('dst_ref', ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'mac-addr', 'domain-name'])),
'dst_port': IntegerProperty(), ('src_port', IntegerProperty()),
'protocols': ListProperty(StringProperty, required=True), ('dst_port', IntegerProperty()),
'src_byte_count': IntegerProperty(), ('protocols', ListProperty(StringProperty, required=True)),
'dst_byte_count': IntegerProperty(), ('src_byte_count', IntegerProperty()),
'src_packets': IntegerProperty(), ('dst_byte_count', IntegerProperty()),
'dst_packets': IntegerProperty(), ('src_packets', IntegerProperty()),
'ipfix': DictionaryProperty(), ('dst_packets', IntegerProperty()),
'src_payload_ref': ObjectReferenceProperty(valid_types='artifact'), ('ipfix', DictionaryProperty()),
'dst_payload_ref': ObjectReferenceProperty(valid_types='artifact'), ('src_payload_ref', ObjectReferenceProperty(valid_types='artifact')),
'encapsulates_refs': ListProperty(ObjectReferenceProperty(valid_types='network-traffic')), ('dst_payload_ref', ObjectReferenceProperty(valid_types='artifact')),
'encapsulates_by_ref': ObjectReferenceProperty(valid_types='network-traffic'), ('encapsulates_refs', ListProperty(ObjectReferenceProperty(valid_types='network-traffic'))),
} ('encapsulates_by_ref', ObjectReferenceProperty(valid_types='network-traffic')),
])
def _check_object_constraints(self): def _check_object_constraints(self):
super(NetworkTraffic, self)._check_object_constraints() super(NetworkTraffic, self)._check_object_constraints()
@ -435,37 +462,39 @@ class NetworkTraffic(_Observable):
class WindowsProcessExt(_Extension): class WindowsProcessExt(_Extension):
_properties = { _properties = OrderedDict()
'aslr_enabled': BooleanProperty(), _properties = _properties.update([
'dep_enabled': BooleanProperty(), ('aslr_enabled', BooleanProperty()),
'priority': StringProperty(), ('dep_enabled', BooleanProperty()),
'owner_sid': StringProperty(), ('priority', StringProperty()),
'window_title': StringProperty(), ('owner_sid', StringProperty()),
'startup_info': DictionaryProperty(), ('window_title', StringProperty()),
} ('startup_info', DictionaryProperty()),
])
class WindowsServiceExt(_Extension): class WindowsServiceExt(_Extension):
_properties = { _properties = OrderedDict()
'service_name': StringProperty(required=True), _properties = _properties.update([
'descriptions': ListProperty(StringProperty), ('service_name', StringProperty(required=True)),
'display_name': StringProperty(), ('descriptions', ListProperty(StringProperty)),
'group_name': StringProperty(), ('display_name', StringProperty()),
'start_type': EnumProperty([ ('group_name', StringProperty()),
('start_type', EnumProperty([
"SERVICE_AUTO_START", "SERVICE_AUTO_START",
"SERVICE_BOOT_START", "SERVICE_BOOT_START",
"SERVICE_DEMAND_START", "SERVICE_DEMAND_START",
"SERVICE_DISABLED", "SERVICE_DISABLED",
"SERVICE_SYSTEM_ALERT", "SERVICE_SYSTEM_ALERT",
]), ])),
'service_dll_refs': ListProperty(ObjectReferenceProperty(valid_types='file')), ('service_dll_refs', ListProperty(ObjectReferenceProperty(valid_types='file'))),
'service_type': EnumProperty([ ('service_type', EnumProperty([
"SERVICE_KERNEL_DRIVER", "SERVICE_KERNEL_DRIVER",
"SERVICE_FILE_SYSTEM_DRIVER", "SERVICE_FILE_SYSTEM_DRIVER",
"SERVICE_WIN32_OWN_PROCESS", "SERVICE_WIN32_OWN_PROCESS",
"SERVICE_WIN32_SHARE_PROCESS", "SERVICE_WIN32_SHARE_PROCESS",
]), ])),
'service_status': EnumProperty([ ('service_status', EnumProperty([
"SERVICE_CONTINUE_PENDING", "SERVICE_CONTINUE_PENDING",
"SERVICE_PAUSE_PENDING", "SERVICE_PAUSE_PENDING",
"SERVICE_PAUSED", "SERVICE_PAUSED",
@ -473,30 +502,31 @@ class WindowsServiceExt(_Extension):
"SERVICE_START_PENDING", "SERVICE_START_PENDING",
"SERVICE_STOP_PENDING", "SERVICE_STOP_PENDING",
"SERVICE_STOPPED", "SERVICE_STOPPED",
]), ])),
} ])
class Process(_Observable): class Process(_Observable):
_type = 'process' _type = 'process'
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
'extensions': ExtensionsProperty(enclosing_type=_type), ('type', TypeProperty(_type)),
'is_hidden': BooleanProperty(), ('extensions', ExtensionsProperty(enclosing_type=_type)),
'pid': IntegerProperty(), ('is_hidden', BooleanProperty()),
'name': StringProperty(), ('pid', IntegerProperty()),
('name', StringProperty()),
# this is not the created timestamps of the object itself # this is not the created timestamps of the object itself
'created': TimestampProperty(), ('created', TimestampProperty()),
'cwd': StringProperty(), ('cwd', StringProperty()),
'arguments': ListProperty(StringProperty), ('arguments', ListProperty(StringProperty)),
'command_line': StringProperty(), ('command_line', StringProperty()),
'environment_variables': DictionaryProperty(), ('environment_variables', DictionaryProperty()),
'opened_connection_refs': ListProperty(ObjectReferenceProperty(valid_types='network-traffic')), ('opened_connection_refs', ListProperty(ObjectReferenceProperty(valid_types='network-traffic'))),
'creator_user_ref': ObjectReferenceProperty(valid_types='user-account'), ('creator_user_ref', ObjectReferenceProperty(valid_types='user-account')),
'binary_ref': ObjectReferenceProperty(valid_types='file'), ('binary_ref', ObjectReferenceProperty(valid_types='file')),
'parent_ref': ObjectReferenceProperty(valid_types='process'), ('parent_ref', ObjectReferenceProperty(valid_types='process')),
'child_refs': ListProperty(ObjectReferenceProperty('process')), ('child_refs', ListProperty(ObjectReferenceProperty('process'))),
} ])
def _check_object_constraints(self): def _check_object_constraints(self):
# no need to check windows-service-ext, since it has a required property # no need to check windows-service-ext, since it has a required property
@ -515,60 +545,65 @@ class Process(_Observable):
class Software(_Observable): class Software(_Observable):
_type = 'software' _type = 'software'
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
'name': StringProperty(required=True), ('type', TypeProperty(_type)),
'cpe': StringProperty(), ('name', StringProperty(required=True)),
'languages': ListProperty(StringProperty), ('cpe', StringProperty()),
'vendor': StringProperty(), ('languages', ListProperty(StringProperty)),
'version': StringProperty(), ('vendor', StringProperty()),
} ('version', StringProperty()),
])
class URL(_Observable): class URL(_Observable):
_type = 'url' _type = 'url'
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
'value': StringProperty(required=True), ('type', TypeProperty(_type)),
} ('value', StringProperty(required=True)),
])
class UNIXAccountExt(_Extension): class UNIXAccountExt(_Extension):
_properties = { _properties = OrderedDict()
'gid': IntegerProperty(), _properties = _properties.update([
'groups': ListProperty(StringProperty), ('gid', IntegerProperty()),
'home_dir': StringProperty(), ('groups', ListProperty(StringProperty)),
'shell': StringProperty(), ('home_dir', StringProperty()),
} ('shell', StringProperty()),
])
class UserAccount(_Observable): class UserAccount(_Observable):
_type = 'user-account' _type = 'user-account'
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
'extensions': ExtensionsProperty(enclosing_type=_type), ('type', TypeProperty(_type)),
'user_id': StringProperty(required=True), ('extensions', ExtensionsProperty(enclosing_type=_type)),
'account_login': StringProperty(), ('user_id', StringProperty(required=True)),
'account_type': StringProperty(), # open vocab ('account_login', StringProperty()),
'display_name': StringProperty(), ('account_type', StringProperty()), # open vocab
'is_service_account': BooleanProperty(), ('display_name', StringProperty()),
'is_privileged': BooleanProperty(), ('is_service_account', BooleanProperty()),
'can_escalate_privs': BooleanProperty(), ('is_privileged', BooleanProperty()),
'is_disabled': BooleanProperty(), ('can_escalate_privs', BooleanProperty()),
'account_created': TimestampProperty(), ('is_disabled', BooleanProperty()),
'account_expires': TimestampProperty(), ('account_created', TimestampProperty()),
'password_last_changed': TimestampProperty(), ('account_expires', TimestampProperty()),
'account_first_login': TimestampProperty(), ('password_last_changed', TimestampProperty()),
'account_last_login': TimestampProperty(), ('account_first_login', TimestampProperty()),
} ('account_last_login', TimestampProperty()),
])
class WindowsRegistryValueType(_STIXBase): class WindowsRegistryValueType(_STIXBase):
_type = 'windows-registry-value-type' _type = 'windows-registry-value-type'
_properties = { _properties = OrderedDict()
'name': StringProperty(required=True), _properties = _properties.update([
'data': StringProperty(), ('name', StringProperty(required=True)),
'data_type': EnumProperty([ ('data', StringProperty()),
('data_type', EnumProperty([
'REG_NONE', 'REG_NONE',
'REG_SZ', 'REG_SZ',
'REG_EXPAND_SZ', 'REG_EXPAND_SZ',
@ -582,21 +617,22 @@ class WindowsRegistryValueType(_STIXBase):
'REG_RESOURCE_REQUIREMENTS_LIST', 'REG_RESOURCE_REQUIREMENTS_LIST',
'REG_QWORD', 'REG_QWORD',
'REG_INVALID_TYPE', 'REG_INVALID_TYPE',
]), ])),
} ])
class WindowsRegistryKey(_Observable): class WindowsRegistryKey(_Observable):
_type = 'windows-registry-key' _type = 'windows-registry-key'
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
'key': StringProperty(required=True), ('type', TypeProperty(_type)),
'values': ListProperty(EmbeddedObjectProperty(type=WindowsRegistryValueType)), ('key', StringProperty(required=True)),
('values', ListProperty(EmbeddedObjectProperty(type=WindowsRegistryValueType))),
# this is not the modified timestamps of the object itself # this is not the modified timestamps of the object itself
'modified': TimestampProperty(), ('modified', TimestampProperty()),
'creator_user_ref': ObjectReferenceProperty(valid_types='user-account'), ('creator_user_ref', ObjectReferenceProperty(valid_types='user-account')),
'number_of_subkeys': IntegerProperty(), ('number_of_subkeys', IntegerProperty()),
} ])
@property @property
def values(self): def values(self):
@ -606,44 +642,46 @@ class WindowsRegistryKey(_Observable):
class X509V3ExtenstionsType(_STIXBase): class X509V3ExtenstionsType(_STIXBase):
_type = 'x509-v3-extensions-type' _type = 'x509-v3-extensions-type'
_properties = { _properties = OrderedDict()
'basic_constraints': StringProperty(), _properties = _properties.update([
'name_constraints': StringProperty(), ('basic_constraints', StringProperty()),
'policy_constraints': StringProperty(), ('name_constraints', StringProperty()),
'key_usage': StringProperty(), ('policy_constraints', StringProperty()),
'extended_key_usage': StringProperty(), ('key_usage', StringProperty()),
'subject_key_identifier': StringProperty(), ('extended_key_usage', StringProperty()),
'authority_key_identifier': StringProperty(), ('subject_key_identifier', StringProperty()),
'subject_alternative_name': StringProperty(), ('authority_key_identifier', StringProperty()),
'issuer_alternative_name': StringProperty(), ('subject_alternative_name', StringProperty()),
'subject_directory_attributes': StringProperty(), ('issuer_alternative_name', StringProperty()),
'crl_distribution_points': StringProperty(), ('subject_directory_attributes', StringProperty()),
'inhibit_any_policy': StringProperty(), ('crl_distribution_points', StringProperty()),
'private_key_usage_period_not_before': TimestampProperty(), ('inhibit_any_policy', StringProperty()),
'private_key_usage_period_not_after': TimestampProperty(), ('private_key_usage_period_not_before', TimestampProperty()),
'certificate_policies': StringProperty(), ('private_key_usage_period_not_after', TimestampProperty()),
'policy_mappings': StringProperty(), ('certificate_policies', StringProperty()),
} ('policy_mappings', StringProperty()),
])
class X509Certificate(_Observable): class X509Certificate(_Observable):
_type = 'x509-certificate' _type = 'x509-certificate'
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
'is_self_signed': BooleanProperty(), ('type', TypeProperty(_type)),
'hashes': HashesProperty(), ('is_self_signed', BooleanProperty()),
'version': StringProperty(), ('hashes', HashesProperty()),
'serial_number': StringProperty(), ('version', StringProperty()),
'signature_algorithm': StringProperty(), ('serial_number', StringProperty()),
'issuer': StringProperty(), ('signature_algorithm', StringProperty()),
'validity_not_before': TimestampProperty(), ('issuer', StringProperty()),
'validity_not_after': TimestampProperty(), ('validity_not_before', TimestampProperty()),
'subject': StringProperty(), ('validity_not_after', TimestampProperty()),
'subject_public_key_algorithm': StringProperty(), ('subject', StringProperty()),
'subject_public_key_modulus': StringProperty(), ('subject_public_key_algorithm', StringProperty()),
'subject_public_key_exponent': IntegerProperty(), ('subject_public_key_modulus', StringProperty()),
'x509_v3_extensions': EmbeddedObjectProperty(type=X509V3ExtenstionsType), ('subject_public_key_exponent', IntegerProperty()),
} ('x509_v3_extensions', EmbeddedObjectProperty(type=X509V3ExtenstionsType)),
])
OBJ_MAP_OBSERVABLE = { OBJ_MAP_OBSERVABLE = {
@ -700,7 +738,7 @@ EXT_MAP = {
} }
def parse_observable(data, _valid_refs=[], allow_custom=False): def parse_observable(data, _valid_refs, allow_custom=False):
"""Deserialize a string or file-like object into a STIX Cyber Observable object. """Deserialize a string or file-like object into a STIX Cyber Observable object.
Args: Args:
@ -739,17 +777,31 @@ def _register_observable(new_observable):
OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable
def CustomObservable(type='x-custom-observable', properties={}): def CustomObservable(type='x-custom-observable', properties=None):
"""Custom STIX Cyber Observable type decorator """Custom STIX Cyber Observable type decorator
Example 1:
@CustomObservable('x-custom-observable', [
('property1', StringProperty(required=True)),
('property2', IntegerProperty()),
])
class MyNewObservableType():
pass
""" """
def custom_builder(cls): def custom_builder(cls):
class _Custom(cls, _Observable): class _Custom(cls, _Observable):
_type = type _type = type
_properties = { _properties = OrderedDict()
'type': TypeProperty(_type), _properties = _properties.update([
} ('type', TypeProperty(_type)),
])
if not properties:
raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]")
_properties.update(properties) _properties.update(properties)
def __init__(self, **kwargs): def __init__(self, **kwargs):