Remove observables and extension mappings, custom code and apply property constrain in v21

stix2.1
Emmanuelle Vargas-Gonzalez 2018-07-10 15:20:16 -04:00
parent 1177694739
commit 54268ae7dd
2 changed files with 74 additions and 618 deletions

View File

@ -6,94 +6,17 @@ Observable and do not have a ``_type`` attribute.
""" """
from collections import OrderedDict from collections import OrderedDict
import copy import itertools
import re
from ..base import _Extension, _Observable, _STIXBase from ..base import _Extension, _Observable, _STIXBase
from ..exceptions import (AtLeastOnePropertyError, CustomContentError, from ..custom import custom_extension_builder, custom_observable_builder
DependentPropertiesError, ParseError) from ..exceptions import AtLeastOnePropertyError, DependentPropertiesError
from ..utils import TYPE_REGEX, _get_dict from ..properties import (BinaryProperty, BooleanProperty, DictionaryProperty,
from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty, EmbeddedObjectProperty, EnumProperty,
EmbeddedObjectProperty, EnumProperty, FloatProperty, ExtensionsProperty, FloatProperty, HashesProperty,
HashesProperty, HexProperty, IntegerProperty, HexProperty, IntegerProperty, ListProperty,
ListProperty, ObjectReferenceProperty, Property, ObjectReferenceProperty, StringProperty,
StringProperty, TimestampProperty, TypeProperty) TimestampProperty, TypeProperty)
class ObservableProperty(Property):
"""Property for holding Cyber Observable Objects.
"""
def __init__(self, allow_custom=False, *args, **kwargs):
self.allow_custom = allow_custom
super(ObservableProperty, self).__init__(*args, **kwargs)
def clean(self, value):
try:
dictified = _get_dict(value)
# get deep copy since we are going modify the dict and might
# modify the original dict as _get_dict() does not return new
# dict when passed a dict
dictified = copy.deepcopy(dictified)
except ValueError:
raise ValueError("The observable property must contain a dictionary")
if dictified == {}:
raise ValueError("The observable property must contain a non-empty dictionary")
valid_refs = dict((k, v['type']) for (k, v) in dictified.items())
for key, obj in dictified.items():
if self.allow_custom:
parsed_obj = parse_observable(obj, valid_refs, allow_custom=True)
else:
parsed_obj = parse_observable(obj, valid_refs)
dictified[key] = parsed_obj
return dictified
class ExtensionsProperty(DictionaryProperty):
"""Property for representing extensions on Observable objects.
"""
def __init__(self, allow_custom=False, enclosing_type=None, required=False):
self.allow_custom = allow_custom
self.enclosing_type = enclosing_type
super(ExtensionsProperty, self).__init__(required)
def clean(self, value):
try:
dictified = _get_dict(value)
# get deep copy since we are going modify the dict and might
# modify the original dict as _get_dict() does not return new
# dict when passed a dict
dictified = copy.deepcopy(dictified)
except ValueError:
raise ValueError("The extensions property must contain a dictionary")
if dictified == {}:
raise ValueError("The extensions property must contain a non-empty dictionary")
if self.enclosing_type in EXT_MAP:
specific_type_map = EXT_MAP[self.enclosing_type]
for key, subvalue in dictified.items():
if key in specific_type_map:
cls = specific_type_map[key]
if type(subvalue) is dict:
if self.allow_custom:
subvalue['allow_custom'] = True
dictified[key] = cls(**subvalue)
else:
dictified[key] = cls(**subvalue)
elif type(subvalue) is cls:
# If already an instance of an _Extension class, assume it's valid
dictified[key] = subvalue
else:
raise ValueError("Cannot determine extension type.")
else:
raise CustomContentError("Can't parse unknown extension type: {}".format(key))
else:
raise ValueError("The enclosing type '%s' has no extensions defined" % self.enclosing_type)
return dictified
class Artifact(_Observable): class Artifact(_Observable):
@ -841,229 +764,33 @@ class X509Certificate(_Observable):
]) ])
OBJ_MAP_OBSERVABLE = {
'artifact': Artifact,
'autonomous-system': AutonomousSystem,
'directory': Directory,
'domain-name': DomainName,
'email-addr': EmailAddress,
'email-message': EmailMessage,
'file': File,
'ipv4-addr': IPv4Address,
'ipv6-addr': IPv6Address,
'mac-addr': MACAddress,
'mutex': Mutex,
'network-traffic': NetworkTraffic,
'process': Process,
'software': Software,
'url': URL,
'user-account': UserAccount,
'windows-registry-key': WindowsRegistryKey,
'x509-certificate': X509Certificate,
}
EXT_MAP = {
'file': {
'archive-ext': ArchiveExt,
'ntfs-ext': NTFSExt,
'pdf-ext': PDFExt,
'raster-image-ext': RasterImageExt,
'windows-pebinary-ext': WindowsPEBinaryExt
},
'network-traffic': {
'http-request-ext': HTTPRequestExt,
'icmp-ext': ICMPExt,
'socket-ext': SocketExt,
'tcp-ext': TCPExt,
},
'process': {
'windows-process-ext': WindowsProcessExt,
'windows-service-ext': WindowsServiceExt,
},
'user-account': {
'unix-account-ext': UNIXAccountExt,
},
}
def parse_observable(data, _valid_refs=None, allow_custom=False):
"""Deserialize a string or file-like object into a STIX Cyber Observable
object.
Args:
data: The STIX 2 string to be parsed.
_valid_refs: A list of object references valid for the scope of the
object being parsed. Use empty list if no valid refs are present.
allow_custom: Whether to allow custom properties or not.
Default: False.
Returns:
An instantiated Python STIX Cyber Observable object.
"""
obj = _get_dict(data)
# get deep copy since we are going modify the dict and might
# modify the original dict as _get_dict() does not return new
# dict when passed a dict
obj = copy.deepcopy(obj)
obj['_valid_refs'] = _valid_refs or []
if 'type' not in obj:
raise ParseError("Can't parse observable with no 'type' property: %s" % str(obj))
try:
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
except KeyError:
if allow_custom:
# flag allows for unknown custom objects too, but will not
# be parsed into STIX observable object, just returned as is
return obj
raise CustomContentError("Can't parse unknown observable type '%s'! For custom observables, "
"use the CustomObservable decorator." % obj['type'])
if 'extensions' in obj and obj['type'] in EXT_MAP:
for name, ext in obj['extensions'].items():
try:
ext_class = EXT_MAP[obj['type']][name]
except KeyError:
if not allow_custom:
raise CustomContentError("Can't parse unknown extension type '%s'"
"for observable type '%s'!" % (name, obj['type']))
else: # extension was found
obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name])
return obj_class(allow_custom=allow_custom, **obj)
def _register_observable(new_observable):
"""Register a custom STIX Cyber Observable type.
"""
OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable
def CustomObservable(type='x-custom-observable', properties=None): def CustomObservable(type='x-custom-observable', properties=None):
"""Custom STIX Cyber Observable Object type decorator. """Custom STIX Cyber Observable Object type decorator.
Example: Example:
>>> from stix2.v20 import CustomObservable
>>> from stix2.properties import IntegerProperty, StringProperty
>>> @CustomObservable('x-custom-observable', [ >>> @CustomObservable('x-custom-observable', [
... ('property1', StringProperty(required=True)), ... ('property1', StringProperty(required=True)),
... ('property2', IntegerProperty()), ... ('property2', IntegerProperty()),
... ]) ... ])
... class MyNewObservableType(): ... class MyNewObservableType():
... pass ... pass
""" """
def wrapper(cls):
def custom_builder(cls): _properties = list(itertools.chain.from_iterable([
[('type', TypeProperty(type))],
class _Custom(cls, _Observable): properties,
[('extensions', ExtensionsProperty(enclosing_type=type))]
if not re.match(TYPE_REGEX, type): ]))
raise ValueError("Invalid observable type name '%s': must only contain the " return custom_observable_builder(cls, type, _properties, '2.0')
"characters a-z (lowercase ASCII), 0-9, and hyphen (-)." % type) return wrapper
elif len(type) < 3 or len(type) > 250:
raise ValueError("Invalid observable type name '%s': must be between 3 and 250 characters." % type)
_type = type
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
])
if not properties or not isinstance(properties, list):
raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]")
# Check properties ending in "_ref/s" are ObjectReferenceProperties
for prop_name, prop in properties:
if prop_name.endswith('_ref') and not isinstance(prop, ObjectReferenceProperty):
raise ValueError("'%s' is named like an object reference property but "
"is not an ObjectReferenceProperty." % prop_name)
elif (prop_name.endswith('_refs') and (not isinstance(prop, ListProperty)
or not isinstance(prop.contained, ObjectReferenceProperty))):
raise ValueError("'%s' is named like an object reference list property but "
"is not a ListProperty containing ObjectReferenceProperty." % prop_name)
_properties.update(properties)
_properties.update([
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
def __init__(self, **kwargs):
_Observable.__init__(self, **kwargs)
try:
cls.__init__(self, **kwargs)
except (AttributeError, TypeError) as e:
# Don't accidentally catch errors raised in a custom __init__()
if ("has no attribute '__init__'" in str(e) or
str(e) == "object.__init__() takes no parameters"):
return
raise e
_register_observable(_Custom)
return _Custom
return custom_builder
def _register_extension(observable, new_extension): def CustomExtension(observable=None, type='x-custom-observable-ext', properties=None):
"""Register a custom extension to a STIX Cyber Observable type.
"""
try:
observable_type = observable._type
except AttributeError:
raise ValueError("Unknown observable type. Custom observables must be "
"created with the @CustomObservable decorator.")
try:
EXT_MAP[observable_type][new_extension._type] = new_extension
except KeyError:
if observable_type not in OBJ_MAP_OBSERVABLE:
raise ValueError("Unknown observable type '%s'. Custom observables "
"must be created with the @CustomObservable decorator."
% observable_type)
else:
EXT_MAP[observable_type] = {new_extension._type: new_extension}
def CustomExtension(observable=None, type='x-custom-observable', properties=None):
"""Decorator for custom extensions to STIX Cyber Observables. """Decorator for custom extensions to STIX Cyber Observables.
""" """
def wrapper(cls):
if not observable or not issubclass(observable, _Observable): return custom_extension_builder(cls, observable, type, properties, '2.0')
raise ValueError("'observable' must be a valid Observable class!") return wrapper
def custom_builder(cls):
class _Custom(cls, _Extension):
if not re.match(TYPE_REGEX, type):
raise ValueError("Invalid extension type name '%s': must only contain the "
"characters a-z (lowercase ASCII), 0-9, and hyphen (-)." % type)
elif len(type) < 3 or len(type) > 250:
raise ValueError("Invalid extension type name '%s': must be between 3 and 250 characters." % type)
_type = type
_properties = OrderedDict()
if not properties or not isinstance(properties, list):
raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]")
_properties.update(properties)
def __init__(self, **kwargs):
_Extension.__init__(self, **kwargs)
try:
cls.__init__(self, **kwargs)
except (AttributeError, TypeError) as e:
# Don't accidentally catch errors raised in a custom __init__()
if ("has no attribute '__init__'" in str(e) or
str(e) == "object.__init__() takes no parameters"):
return
raise e
_register_extension(observable, _Custom)
return _Custom
return custom_builder

View File

@ -6,94 +6,17 @@ Observable and do not have a ``_type`` attribute.
""" """
from collections import OrderedDict from collections import OrderedDict
import copy import itertools
import re
from ..base import _Extension, _Observable, _STIXBase from ..base import _Extension, _Observable, _STIXBase
from ..exceptions import (AtLeastOnePropertyError, CustomContentError, from ..custom import custom_extension_builder, custom_observable_builder
DependentPropertiesError, ParseError) from ..exceptions import AtLeastOnePropertyError, DependentPropertiesError
from ..utils import TYPE_REGEX, _get_dict from ..properties import (BinaryProperty, BooleanProperty, DictionaryProperty,
from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty, EmbeddedObjectProperty, EnumProperty,
EmbeddedObjectProperty, EnumProperty, FloatProperty, ExtensionsProperty, FloatProperty, HashesProperty,
HashesProperty, HexProperty, IntegerProperty, HexProperty, IntegerProperty, ListProperty,
ListProperty, ObjectReferenceProperty, Property, ObjectReferenceProperty, StringProperty,
StringProperty, TimestampProperty, TypeProperty) TimestampProperty, TypeProperty)
class ObservableProperty(Property):
"""Property for holding Cyber Observable Objects.
"""
def __init__(self, allow_custom=False, *args, **kwargs):
self.allow_custom = allow_custom
super(ObservableProperty, self).__init__(*args, **kwargs)
def clean(self, value):
try:
dictified = _get_dict(value)
# get deep copy since we are going modify the dict and might
# modify the original dict as _get_dict() does not return new
# dict when passed a dict
dictified = copy.deepcopy(dictified)
except ValueError:
raise ValueError("The observable property must contain a dictionary")
if dictified == {}:
raise ValueError("The observable property must contain a non-empty dictionary")
valid_refs = dict((k, v['type']) for (k, v) in dictified.items())
for key, obj in dictified.items():
if self.allow_custom:
parsed_obj = parse_observable(obj, valid_refs, allow_custom=True)
else:
parsed_obj = parse_observable(obj, valid_refs)
dictified[key] = parsed_obj
return dictified
class ExtensionsProperty(DictionaryProperty):
"""Property for representing extensions on Observable objects.
"""
def __init__(self, allow_custom=False, enclosing_type=None, required=False):
self.allow_custom = allow_custom
self.enclosing_type = enclosing_type
super(ExtensionsProperty, self).__init__(required)
def clean(self, value):
try:
dictified = _get_dict(value)
# get deep copy since we are going modify the dict and might
# modify the original dict as _get_dict() does not return new
# dict when passed a dict
dictified = copy.deepcopy(dictified)
except ValueError:
raise ValueError("The extensions property must contain a dictionary")
if dictified == {}:
raise ValueError("The extensions property must contain a non-empty dictionary")
if self.enclosing_type in EXT_MAP:
specific_type_map = EXT_MAP[self.enclosing_type]
for key, subvalue in dictified.items():
if key in specific_type_map:
cls = specific_type_map[key]
if type(subvalue) is dict:
if self.allow_custom:
subvalue['allow_custom'] = True
dictified[key] = cls(**subvalue)
else:
dictified[key] = cls(**subvalue)
elif type(subvalue) is cls:
# If already an instance of an _Extension class, assume it's valid
dictified[key] = subvalue
else:
raise ValueError("Cannot determine extension type.")
else:
raise CustomContentError("Can't parse unknown extension type: {}".format(key))
else:
raise ValueError("The enclosing type '%s' has no extensions defined" % self.enclosing_type)
return dictified
class Artifact(_Observable): class Artifact(_Observable):
@ -109,7 +32,7 @@ class Artifact(_Observable):
('payload_bin', BinaryProperty()), ('payload_bin', BinaryProperty()),
('url', StringProperty()), ('url', StringProperty()),
('hashes', HashesProperty()), ('hashes', HashesProperty()),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
]) ])
def _check_object_constraints(self): def _check_object_constraints(self):
@ -130,7 +53,7 @@ class AutonomousSystem(_Observable):
('number', IntegerProperty(required=True)), ('number', IntegerProperty(required=True)),
('name', StringProperty()), ('name', StringProperty()),
('rir', StringProperty()), ('rir', StringProperty()),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
]) ])
@ -150,7 +73,7 @@ class Directory(_Observable):
('modified', TimestampProperty()), ('modified', TimestampProperty()),
('accessed', TimestampProperty()), ('accessed', TimestampProperty()),
('contains_refs', ListProperty(ObjectReferenceProperty(valid_types=['file', 'directory']))), ('contains_refs', ListProperty(ObjectReferenceProperty(valid_types=['file', 'directory']))),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
]) ])
@ -165,7 +88,7 @@ class DomainName(_Observable):
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('value', StringProperty(required=True)), ('value', StringProperty(required=True)),
('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name']))), ('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name']))),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
]) ])
@ -181,7 +104,7 @@ class EmailAddress(_Observable):
('value', StringProperty(required=True)), ('value', StringProperty(required=True)),
('display_name', StringProperty()), ('display_name', StringProperty()),
('belongs_to_ref', ObjectReferenceProperty(valid_types='user-account')), ('belongs_to_ref', ObjectReferenceProperty(valid_types='user-account')),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
]) ])
@ -222,11 +145,11 @@ class EmailMessage(_Observable):
('bcc_refs', ListProperty(ObjectReferenceProperty(valid_types='email-addr'))), ('bcc_refs', ListProperty(ObjectReferenceProperty(valid_types='email-addr'))),
('subject', StringProperty()), ('subject', StringProperty()),
('received_lines', ListProperty(StringProperty)), ('received_lines', ListProperty(StringProperty)),
('additional_header_fields', DictionaryProperty()), ('additional_header_fields', DictionaryProperty(spec_version='2.1')),
('body', StringProperty()), ('body', StringProperty()),
('body_multipart', ListProperty(EmbeddedObjectProperty(type=EmailMIMEComponent))), ('body_multipart', ListProperty(EmbeddedObjectProperty(type=EmailMIMEComponent))),
('raw_email_ref', ObjectReferenceProperty(valid_types='artifact')), ('raw_email_ref', ObjectReferenceProperty(valid_types='artifact')),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
]) ])
def _check_object_constraints(self): def _check_object_constraints(self):
@ -287,7 +210,7 @@ class PDFExt(_Extension):
_properties = OrderedDict([ _properties = OrderedDict([
('version', StringProperty()), ('version', StringProperty()),
('is_optimized', BooleanProperty()), ('is_optimized', BooleanProperty()),
('document_info_dict', DictionaryProperty()), ('document_info_dict', DictionaryProperty(spec_version='2.1')),
('pdfid0', StringProperty()), ('pdfid0', StringProperty()),
('pdfid1', StringProperty()), ('pdfid1', StringProperty()),
]) ])
@ -305,7 +228,7 @@ class RasterImageExt(_Extension):
('image_width', IntegerProperty()), ('image_width', IntegerProperty()),
('bits_per_pixel', IntegerProperty()), ('bits_per_pixel', IntegerProperty()),
('image_compression_algorithm', StringProperty()), ('image_compression_algorithm', StringProperty()),
('exif_tags', DictionaryProperty()), ('exif_tags', DictionaryProperty(spec_version='2.1')),
]) ])
@ -416,7 +339,7 @@ class File(_Observable):
('decryption_key', StringProperty()), ('decryption_key', StringProperty()),
('contains_refs', ListProperty(ObjectReferenceProperty)), ('contains_refs', ListProperty(ObjectReferenceProperty)),
('content_ref', ObjectReferenceProperty(valid_types='artifact')), ('content_ref', ObjectReferenceProperty(valid_types='artifact')),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
]) ])
def _check_object_constraints(self): def _check_object_constraints(self):
@ -437,7 +360,7 @@ class IPv4Address(_Observable):
('value', StringProperty(required=True)), ('value', StringProperty(required=True)),
('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))), ('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))),
('belongs_to_refs', ListProperty(ObjectReferenceProperty(valid_types='autonomous-system'))), ('belongs_to_refs', ListProperty(ObjectReferenceProperty(valid_types='autonomous-system'))),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
]) ])
@ -453,7 +376,7 @@ class IPv6Address(_Observable):
('value', StringProperty(required=True)), ('value', StringProperty(required=True)),
('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))), ('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))),
('belongs_to_refs', ListProperty(ObjectReferenceProperty(valid_types='autonomous-system'))), ('belongs_to_refs', ListProperty(ObjectReferenceProperty(valid_types='autonomous-system'))),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
]) ])
@ -467,7 +390,7 @@ class MACAddress(_Observable):
_properties = OrderedDict([ _properties = OrderedDict([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('value', StringProperty(required=True)), ('value', StringProperty(required=True)),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
]) ])
@ -481,7 +404,7 @@ class Mutex(_Observable):
_properties = OrderedDict([ _properties = OrderedDict([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('name', StringProperty(required=True)), ('name', StringProperty(required=True)),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
]) ])
@ -496,7 +419,7 @@ class HTTPRequestExt(_Extension):
('request_method', StringProperty(required=True)), ('request_method', StringProperty(required=True)),
('request_value', StringProperty(required=True)), ('request_value', StringProperty(required=True)),
('request_version', StringProperty()), ('request_version', StringProperty()),
('request_header', DictionaryProperty()), ('request_header', DictionaryProperty(spec_version='2.1')),
('message_body_length', IntegerProperty()), ('message_body_length', IntegerProperty()),
('message_body_data_ref', ObjectReferenceProperty(valid_types='artifact')), ('message_body_data_ref', ObjectReferenceProperty(valid_types='artifact')),
]) ])
@ -543,7 +466,7 @@ class SocketExt(_Extension):
"PF_AX25", "PF_AX25",
"PF_NETROM" "PF_NETROM"
])), ])),
('options', DictionaryProperty()), ('options', DictionaryProperty(spec_version='2.1')),
('socket_type', EnumProperty(allowed=[ ('socket_type', EnumProperty(allowed=[
"SOCK_STREAM", "SOCK_STREAM",
"SOCK_DGRAM", "SOCK_DGRAM",
@ -590,12 +513,12 @@ class NetworkTraffic(_Observable):
('dst_byte_count', IntegerProperty()), ('dst_byte_count', IntegerProperty()),
('src_packets', IntegerProperty()), ('src_packets', IntegerProperty()),
('dst_packets', IntegerProperty()), ('dst_packets', IntegerProperty()),
('ipfix', DictionaryProperty()), ('ipfix', DictionaryProperty(spec_version='2.1')),
('src_payload_ref', ObjectReferenceProperty(valid_types='artifact')), ('src_payload_ref', ObjectReferenceProperty(valid_types='artifact')),
('dst_payload_ref', ObjectReferenceProperty(valid_types='artifact')), ('dst_payload_ref', ObjectReferenceProperty(valid_types='artifact')),
('encapsulates_refs', ListProperty(ObjectReferenceProperty(valid_types='network-traffic'))), ('encapsulates_refs', ListProperty(ObjectReferenceProperty(valid_types='network-traffic'))),
('encapsulates_by_ref', ObjectReferenceProperty(valid_types='network-traffic')), ('encapsulates_by_ref', ObjectReferenceProperty(valid_types='network-traffic')),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
]) ])
def _check_object_constraints(self): def _check_object_constraints(self):
@ -616,7 +539,7 @@ class WindowsProcessExt(_Extension):
('priority', StringProperty()), ('priority', StringProperty()),
('owner_sid', StringProperty()), ('owner_sid', StringProperty()),
('window_title', StringProperty()), ('window_title', StringProperty()),
('startup_info', DictionaryProperty()), ('startup_info', DictionaryProperty(spec_version='2.1')),
]) ])
@ -675,13 +598,13 @@ class Process(_Observable):
('cwd', StringProperty()), ('cwd', StringProperty()),
('arguments', ListProperty(StringProperty)), ('arguments', ListProperty(StringProperty)),
('command_line', StringProperty()), ('command_line', StringProperty()),
('environment_variables', DictionaryProperty()), ('environment_variables', DictionaryProperty(spec_version='2.1')),
('opened_connection_refs', ListProperty(ObjectReferenceProperty(valid_types='network-traffic'))), ('opened_connection_refs', ListProperty(ObjectReferenceProperty(valid_types='network-traffic'))),
('creator_user_ref', ObjectReferenceProperty(valid_types='user-account')), ('creator_user_ref', ObjectReferenceProperty(valid_types='user-account')),
('binary_ref', ObjectReferenceProperty(valid_types='file')), ('binary_ref', ObjectReferenceProperty(valid_types='file')),
('parent_ref', ObjectReferenceProperty(valid_types='process')), ('parent_ref', ObjectReferenceProperty(valid_types='process')),
('child_refs', ListProperty(ObjectReferenceProperty('process'))), ('child_refs', ListProperty(ObjectReferenceProperty('process'))),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
]) ])
def _check_object_constraints(self): def _check_object_constraints(self):
@ -713,7 +636,7 @@ class Software(_Observable):
('languages', ListProperty(StringProperty)), ('languages', ListProperty(StringProperty)),
('vendor', StringProperty()), ('vendor', StringProperty()),
('version', StringProperty()), ('version', StringProperty()),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
]) ])
@ -727,7 +650,7 @@ class URL(_Observable):
_properties = OrderedDict([ _properties = OrderedDict([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('value', StringProperty(required=True)), ('value', StringProperty(required=True)),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
]) ])
@ -768,7 +691,7 @@ class UserAccount(_Observable):
('password_last_changed', TimestampProperty()), ('password_last_changed', TimestampProperty()),
('account_first_login', TimestampProperty()), ('account_first_login', TimestampProperty()),
('account_last_login', TimestampProperty()), ('account_last_login', TimestampProperty()),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
]) ])
@ -815,7 +738,7 @@ class WindowsRegistryKey(_Observable):
('modified', TimestampProperty()), ('modified', TimestampProperty()),
('creator_user_ref', ObjectReferenceProperty(valid_types='user-account')), ('creator_user_ref', ObjectReferenceProperty(valid_types='user-account')),
('number_of_subkeys', IntegerProperty()), ('number_of_subkeys', IntegerProperty()),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
]) ])
@property @property
@ -873,231 +796,37 @@ class X509Certificate(_Observable):
('subject_public_key_modulus', StringProperty()), ('subject_public_key_modulus', StringProperty()),
('subject_public_key_exponent', IntegerProperty()), ('subject_public_key_exponent', IntegerProperty()),
('x509_v3_extensions', EmbeddedObjectProperty(type=X509V3ExtenstionsType)), ('x509_v3_extensions', EmbeddedObjectProperty(type=X509V3ExtenstionsType)),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
]) ])
OBJ_MAP_OBSERVABLE = {
'artifact': Artifact,
'autonomous-system': AutonomousSystem,
'directory': Directory,
'domain-name': DomainName,
'email-addr': EmailAddress,
'email-message': EmailMessage,
'file': File,
'ipv4-addr': IPv4Address,
'ipv6-addr': IPv6Address,
'mac-addr': MACAddress,
'mutex': Mutex,
'network-traffic': NetworkTraffic,
'process': Process,
'software': Software,
'url': URL,
'user-account': UserAccount,
'windows-registry-key': WindowsRegistryKey,
'x509-certificate': X509Certificate,
}
EXT_MAP = {
'file': {
'archive-ext': ArchiveExt,
'ntfs-ext': NTFSExt,
'pdf-ext': PDFExt,
'raster-image-ext': RasterImageExt,
'windows-pebinary-ext': WindowsPEBinaryExt
},
'network-traffic': {
'http-request-ext': HTTPRequestExt,
'icmp-ext': ICMPExt,
'socket-ext': SocketExt,
'tcp-ext': TCPExt,
},
'process': {
'windows-process-ext': WindowsProcessExt,
'windows-service-ext': WindowsServiceExt,
},
'user-account': {
'unix-account-ext': UNIXAccountExt,
},
}
def parse_observable(data, _valid_refs=None, allow_custom=False):
"""Deserialize a string or file-like object into a STIX Cyber Observable
object.
Args:
data: The STIX 2 string to be parsed.
_valid_refs: A list of object references valid for the scope of the
object being parsed. Use empty list if no valid refs are present.
allow_custom: Whether to allow custom properties or not.
Default: False.
Returns:
An instantiated Python STIX Cyber Observable object.
"""
obj = _get_dict(data)
# get deep copy since we are going modify the dict and might
# modify the original dict as _get_dict() does not return new
# dict when passed a dict
obj = copy.deepcopy(obj)
obj['_valid_refs'] = _valid_refs or []
if 'type' not in obj:
raise ParseError("Can't parse observable with no 'type' property: %s" % str(obj))
try:
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
except KeyError:
if allow_custom:
# flag allows for unknown custom objects too, but will not
# be parsed into STIX observable object, just returned as is
return obj
raise CustomContentError("Can't parse unknown observable type '%s'! For custom observables, "
"use the CustomObservable decorator." % obj['type'])
if 'extensions' in obj and obj['type'] in EXT_MAP:
for name, ext in obj['extensions'].items():
try:
ext_class = EXT_MAP[obj['type']][name]
except KeyError:
if not allow_custom:
raise CustomContentError("Can't parse unknown extension type '%s'"
"for observable type '%s'!" % (name, obj['type']))
else: # extension was found
obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name])
return obj_class(allow_custom=allow_custom, **obj)
def _register_observable(new_observable):
"""Register a custom STIX Cyber Observable type.
"""
OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable
def CustomObservable(type='x-custom-observable', properties=None): def CustomObservable(type='x-custom-observable', properties=None):
"""Custom STIX Cyber Observable Object type decorator. """Custom STIX Cyber Observable Object type decorator.
Example: Example:
>>> from stix2.v21 import CustomObservable
>>> from stix2.properties import IntegerProperty, StringProperty
>>> @CustomObservable('x-custom-observable', [ >>> @CustomObservable('x-custom-observable', [
... ('property1', StringProperty(required=True)), ... ('property1', StringProperty(required=True)),
... ('property2', IntegerProperty()), ... ('property2', IntegerProperty()),
... ]) ... ])
... class MyNewObservableType(): ... class MyNewObservableType():
... pass ... pass
""" """
def wrapper(cls):
def custom_builder(cls): _properties = list(itertools.chain.from_iterable([
[('type', TypeProperty(type))],
class _Custom(cls, _Observable): properties,
[('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=type))]
if not re.match(TYPE_REGEX, type): ]))
raise ValueError("Invalid observable type name '%s': must only contain the " return custom_observable_builder(cls, type, _properties, '2.1')
"characters a-z (lowercase ASCII), 0-9, and hyphen (-)." % type) return wrapper
elif len(type) < 3 or len(type) > 250:
raise ValueError("Invalid observable type name '%s': must be between 3 and 250 characters." % type)
_type = type
_properties = OrderedDict([
('type', TypeProperty(_type)),
])
if not properties or not isinstance(properties, list):
raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]")
# Check properties ending in "_ref/s" are ObjectReferenceProperties
for prop_name, prop in properties:
if prop_name.endswith('_ref') and not isinstance(prop, ObjectReferenceProperty):
raise ValueError("'%s' is named like an object reference property but "
"is not an ObjectReferenceProperty." % prop_name)
elif (prop_name.endswith('_refs') and (not isinstance(prop, ListProperty)
or not isinstance(prop.contained, ObjectReferenceProperty))):
raise ValueError("'%s' is named like an object reference list property but "
"is not a ListProperty containing ObjectReferenceProperty." % prop_name)
_properties.update(properties)
_properties.update([
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
def __init__(self, **kwargs):
_Observable.__init__(self, **kwargs)
try:
cls.__init__(self, **kwargs)
except (AttributeError, TypeError) as e:
# Don't accidentally catch errors raised in a custom __init__()
if ("has no attribute '__init__'" in str(e) or
str(e) == "object.__init__() takes no parameters"):
return
raise e
_register_observable(_Custom)
return _Custom
return custom_builder
def _register_extension(observable, new_extension): def CustomExtension(observable=None, type='x-custom-observable-ext', properties=None):
"""Register a custom extension to a STIX Cyber Observable type.
"""
try:
observable_type = observable._type
except AttributeError:
raise ValueError("Unknown observable type. Custom observables must be "
"created with the @CustomObservable decorator.")
try:
EXT_MAP[observable_type][new_extension._type] = new_extension
except KeyError:
if observable_type not in OBJ_MAP_OBSERVABLE:
raise ValueError("Unknown observable type '%s'. Custom observables "
"must be created with the @CustomObservable decorator."
% observable_type)
else:
EXT_MAP[observable_type] = {new_extension._type: new_extension}
def CustomExtension(observable=None, type='x-custom-observable', properties=None):
"""Decorator for custom extensions to STIX Cyber Observables. """Decorator for custom extensions to STIX Cyber Observables.
""" """
def wrapper(cls):
if not observable or not issubclass(observable, _Observable): return custom_extension_builder(cls, observable, type, properties, '2.1')
raise ValueError("'observable' must be a valid Observable class!") return wrapper
def custom_builder(cls):
class _Custom(cls, _Extension):
if not re.match(TYPE_REGEX, type):
raise ValueError("Invalid extension type name '%s': must only contain the "
"characters a-z (lowercase ASCII), 0-9, and hyphen (-)." % type)
elif len(type) < 3 or len(type) > 250:
raise ValueError("Invalid extension type name '%s': must be between 3 and 250 characters." % type)
_type = type
if not properties or not isinstance(properties, list):
raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]")
_properties = OrderedDict(properties)
def __init__(self, **kwargs):
_Extension.__init__(self, **kwargs)
try:
cls.__init__(self, **kwargs)
except (AttributeError, TypeError) as e:
# Don't accidentally catch errors raised in a custom __init__()
if ("has no attribute '__init__'" in str(e) or
str(e) == "object.__init__() takes no parameters"):
return
raise e
_register_extension(observable, _Custom)
return _Custom
return custom_builder