major changes to support the extension mechanism
moved some version specific logic into their area, changes to decorators to support Object Registration with Extensionpull/1/head
parent
22c4351688
commit
e4165f96aa
|
@ -127,7 +127,17 @@ class _STIXBase(Mapping):
|
||||||
raise ValueError("'custom_properties' must be a dictionary")
|
raise ValueError("'custom_properties' must be a dictionary")
|
||||||
|
|
||||||
extra_kwargs = list(set(kwargs) - set(self._properties))
|
extra_kwargs = list(set(kwargs) - set(self._properties))
|
||||||
|
if extra_kwargs and issubclass(cls, stix2.v21._Extension):
|
||||||
|
props_to_remove = ['extends_stix_object_definition', 'is_new_object', 'is_extension_so']
|
||||||
|
extra_kwargs = [prop for prop in extra_kwargs if prop not in props_to_remove]
|
||||||
|
|
||||||
if extra_kwargs and not self._allow_custom:
|
if extra_kwargs and not self._allow_custom:
|
||||||
|
ext_found = False
|
||||||
|
for key_id, ext_def in kwargs.get('extensions', {}).items():
|
||||||
|
if key_id.startswith('stix-extension--'):
|
||||||
|
ext_found = True
|
||||||
|
break
|
||||||
|
if ext_found is False:
|
||||||
raise ExtraPropertiesError(cls, extra_kwargs)
|
raise ExtraPropertiesError(cls, extra_kwargs)
|
||||||
|
|
||||||
# because allow_custom is true, any extra kwargs are custom
|
# because allow_custom is true, any extra kwargs are custom
|
||||||
|
@ -155,6 +165,12 @@ class _STIXBase(Mapping):
|
||||||
required_properties = set(get_required_properties(self._properties))
|
required_properties = set(get_required_properties(self._properties))
|
||||||
missing_kwargs = required_properties - set(setting_kwargs)
|
missing_kwargs = required_properties - set(setting_kwargs)
|
||||||
if missing_kwargs:
|
if missing_kwargs:
|
||||||
|
new_ext_check = (
|
||||||
|
getattr(self, 'extends_stix_object_definition', False) or
|
||||||
|
getattr(self, 'is_new_object', False) or
|
||||||
|
getattr(self, 'is_extension_so', False)
|
||||||
|
) and issubclass(cls, stix2.v21._Extension)
|
||||||
|
if new_ext_check is False:
|
||||||
raise MissingPropertiesError(cls, missing_kwargs)
|
raise MissingPropertiesError(cls, missing_kwargs)
|
||||||
|
|
||||||
for prop_name, prop_metadata in self._properties.items():
|
for prop_name, prop_metadata in self._properties.items():
|
||||||
|
@ -274,21 +290,8 @@ class _Observable(_STIXBase):
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
# the constructor might be called independently of an observed data object
|
# the constructor might be called independently of an observed data object
|
||||||
self._STIXBase__valid_refs = kwargs.pop('_valid_refs', [])
|
self._STIXBase__valid_refs = kwargs.pop('_valid_refs', [])
|
||||||
self._properties['extensions'].allow_custom = kwargs.get('allow_custom', False)
|
|
||||||
super(_Observable, self).__init__(**kwargs)
|
super(_Observable, self).__init__(**kwargs)
|
||||||
|
|
||||||
if 'id' not in kwargs and not isinstance(self, stix2.v20._Observable):
|
|
||||||
# Specific to 2.1+ observables: generate a deterministic ID
|
|
||||||
id_ = self._generate_id()
|
|
||||||
|
|
||||||
# Spec says fall back to UUIDv4 if no contributing properties were
|
|
||||||
# given. That's what already happened (the following is actually
|
|
||||||
# overwriting the default uuidv4), so nothing to do here.
|
|
||||||
if id_ is not None:
|
|
||||||
# Can't assign to self (we're immutable), so slip the ID in
|
|
||||||
# more sneakily.
|
|
||||||
self._inner["id"] = id_
|
|
||||||
|
|
||||||
def _check_ref(self, ref, prop, prop_name):
|
def _check_ref(self, ref, prop, prop_name):
|
||||||
"""
|
"""
|
||||||
Only for checking `*_ref` or `*_refs` properties in spec_version 2.0
|
Only for checking `*_ref` or `*_refs` properties in spec_version 2.0
|
||||||
|
|
|
@ -4,8 +4,8 @@ import six
|
||||||
|
|
||||||
from .base import _cls_init
|
from .base import _cls_init
|
||||||
from .parsing import (
|
from .parsing import (
|
||||||
_register_marking, _register_object, _register_observable,
|
_get_extension_class, _register_extension, _register_marking,
|
||||||
_register_observable_extension,
|
_register_object, _register_observable,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -34,6 +34,11 @@ def _custom_object_builder(cls, type, properties, version, base_class):
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
base_class.__init__(self, **kwargs)
|
base_class.__init__(self, **kwargs)
|
||||||
_cls_init(cls, self, kwargs)
|
_cls_init(cls, self, kwargs)
|
||||||
|
ext = getattr(self, 'with_extension', None)
|
||||||
|
if ext and version != '2.0':
|
||||||
|
if 'extensions' not in self._inner:
|
||||||
|
self._inner['extensions'] = {}
|
||||||
|
self._inner['extensions'][ext] = _get_extension_class(ext, version)()
|
||||||
|
|
||||||
_CustomObject.__name__ = cls.__name__
|
_CustomObject.__name__ = cls.__name__
|
||||||
|
|
||||||
|
@ -51,6 +56,11 @@ def _custom_marking_builder(cls, type, properties, version, base_class):
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
base_class.__init__(self, **kwargs)
|
base_class.__init__(self, **kwargs)
|
||||||
|
ext = getattr(self, 'with_extension', None)
|
||||||
|
if ext and version != '2.0':
|
||||||
|
if 'extensions' not in self._inner:
|
||||||
|
self._inner['extensions'] = {}
|
||||||
|
self._inner['extensions'][ext] = _get_extension_class(ext, version)()
|
||||||
_cls_init(cls, self, kwargs)
|
_cls_init(cls, self, kwargs)
|
||||||
|
|
||||||
_CustomMarking.__name__ = cls.__name__
|
_CustomMarking.__name__ = cls.__name__
|
||||||
|
@ -75,6 +85,11 @@ def _custom_observable_builder(cls, type, properties, version, base_class, id_co
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
base_class.__init__(self, **kwargs)
|
base_class.__init__(self, **kwargs)
|
||||||
_cls_init(cls, self, kwargs)
|
_cls_init(cls, self, kwargs)
|
||||||
|
ext = getattr(self, 'with_extension', None)
|
||||||
|
if ext and version != '2.0':
|
||||||
|
if 'extensions' not in self._inner:
|
||||||
|
self._inner['extensions'] = {}
|
||||||
|
self._inner['extensions'][ext] = _get_extension_class(ext, version)()
|
||||||
|
|
||||||
_CustomObservable.__name__ = cls.__name__
|
_CustomObservable.__name__ = cls.__name__
|
||||||
|
|
||||||
|
@ -82,7 +97,7 @@ def _custom_observable_builder(cls, type, properties, version, base_class, id_co
|
||||||
return _CustomObservable
|
return _CustomObservable
|
||||||
|
|
||||||
|
|
||||||
def _custom_extension_builder(cls, observable, type, properties, version, base_class):
|
def _custom_extension_builder(cls, type, properties, version, base_class):
|
||||||
prop_dict = _get_properties_dict(properties)
|
prop_dict = _get_properties_dict(properties)
|
||||||
|
|
||||||
class _CustomExtension(cls, base_class):
|
class _CustomExtension(cls, base_class):
|
||||||
|
@ -96,5 +111,5 @@ def _custom_extension_builder(cls, observable, type, properties, version, base_c
|
||||||
|
|
||||||
_CustomExtension.__name__ = cls.__name__
|
_CustomExtension.__name__ = cls.__name__
|
||||||
|
|
||||||
_register_observable_extension(observable, _CustomExtension, version=version)
|
_register_extension(_CustomExtension, version=version)
|
||||||
return _CustomExtension
|
return _CustomExtension
|
||||||
|
|
|
@ -7,7 +7,7 @@ import re
|
||||||
|
|
||||||
import stix2
|
import stix2
|
||||||
|
|
||||||
from .base import _DomainObject, _Observable
|
from .base import _DomainObject
|
||||||
from .exceptions import DuplicateRegistrationError, ParseError
|
from .exceptions import DuplicateRegistrationError, ParseError
|
||||||
from .utils import PREFIX_21_REGEX, _get_dict, get_class_hierarchy_names
|
from .utils import PREFIX_21_REGEX, _get_dict, get_class_hierarchy_names
|
||||||
|
|
||||||
|
@ -90,6 +90,12 @@ def _detect_spec_version(stix_dict):
|
||||||
return v
|
return v
|
||||||
|
|
||||||
|
|
||||||
|
def _get_extension_class(extension_uuid, version):
|
||||||
|
"""Retrieve a registered class Extension"""
|
||||||
|
v = 'v' + version.replace('.', '')
|
||||||
|
return STIX2_OBJ_MAPS[v]['extensions'].get(extension_uuid)
|
||||||
|
|
||||||
|
|
||||||
def dict_to_stix2(stix_dict, allow_custom=False, version=None):
|
def dict_to_stix2(stix_dict, allow_custom=False, version=None):
|
||||||
"""convert dictionary to full python-stix2 object
|
"""convert dictionary to full python-stix2 object
|
||||||
|
|
||||||
|
@ -137,6 +143,13 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None):
|
||||||
# flag allows for unknown custom objects too, but will not
|
# flag allows for unknown custom objects too, but will not
|
||||||
# be parsed into STIX object, returned as is
|
# be parsed into STIX object, returned as is
|
||||||
return stix_dict
|
return stix_dict
|
||||||
|
for key_id, ext_def in stix_dict.get('extensions', {}).items():
|
||||||
|
if key_id.startswith('stix-extension--') and (
|
||||||
|
ext_def.get('is_new_object', False) or ext_def.get('is_extension_so', False)
|
||||||
|
):
|
||||||
|
# prevents ParseError for unregistered objects when
|
||||||
|
# 'is_new_object' or 'is_extension_so' are set to True and allow_custom=False
|
||||||
|
return stix_dict
|
||||||
raise ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % stix_dict['type'])
|
raise ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % stix_dict['type'])
|
||||||
|
|
||||||
return obj_class(allow_custom=allow_custom, **stix_dict)
|
return obj_class(allow_custom=allow_custom, **stix_dict)
|
||||||
|
@ -318,27 +331,20 @@ def _register_observable(new_observable, version=stix2.DEFAULT_VERSION):
|
||||||
OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable
|
OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable
|
||||||
|
|
||||||
|
|
||||||
def _register_observable_extension(
|
def _register_extension(
|
||||||
observable, new_extension, version=stix2.DEFAULT_VERSION,
|
new_extension, version=stix2.DEFAULT_VERSION,
|
||||||
):
|
):
|
||||||
"""Register a custom extension to a STIX Cyber Observable type.
|
"""Register a custom extension to any STIX Object type.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
observable: An observable class or instance
|
new_extension (class): A class to register in the Extensions map.
|
||||||
new_extension (class): A class to register in the Observables
|
|
||||||
Extensions map.
|
|
||||||
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1").
|
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1").
|
||||||
Defaults to the latest supported version.
|
Defaults to the latest supported version.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
obs_class = observable if isinstance(observable, type) else \
|
|
||||||
type(observable)
|
|
||||||
ext_type = new_extension._type
|
ext_type = new_extension._type
|
||||||
properties = new_extension._properties
|
properties = new_extension._properties
|
||||||
|
|
||||||
if not issubclass(obs_class, _Observable):
|
|
||||||
raise ValueError("'observable' must be a valid Observable class!")
|
|
||||||
|
|
||||||
stix2.properties._validate_type(ext_type, version)
|
stix2.properties._validate_type(ext_type, version)
|
||||||
|
|
||||||
if not new_extension._properties:
|
if not new_extension._properties:
|
||||||
|
@ -348,42 +354,22 @@ def _register_observable_extension(
|
||||||
)
|
)
|
||||||
|
|
||||||
if version == "2.1":
|
if version == "2.1":
|
||||||
if not ext_type.endswith('-ext'):
|
if not (ext_type.endswith('-ext') or ext_type.startswith('stix-extension--')):
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"Invalid extension type name '%s': must end with '-ext'." %
|
"Invalid extension type name '%s': must end with '-ext' or start with 'stix-extension--<UUID>'." %
|
||||||
ext_type,
|
ext_type,
|
||||||
)
|
)
|
||||||
|
|
||||||
for prop_name, prop_value in properties.items():
|
for prop_name in properties.keys():
|
||||||
if not re.match(PREFIX_21_REGEX, prop_name):
|
if not re.match(PREFIX_21_REGEX, prop_name):
|
||||||
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
|
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
|
||||||
|
|
||||||
v = 'v' + version.replace('.', '')
|
v = 'v' + version.replace('.', '')
|
||||||
|
EXT_MAP = STIX2_OBJ_MAPS[v]['extensions']
|
||||||
|
|
||||||
try:
|
if ext_type in EXT_MAP:
|
||||||
observable_type = observable._type
|
raise DuplicateRegistrationError("Extension", ext_type)
|
||||||
except AttributeError:
|
EXT_MAP[ext_type] = new_extension
|
||||||
raise ValueError(
|
|
||||||
"Unknown observable type. Custom observables must be "
|
|
||||||
"created with the @CustomObservable decorator.",
|
|
||||||
)
|
|
||||||
|
|
||||||
OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables']
|
|
||||||
EXT_MAP = STIX2_OBJ_MAPS[v]['observable-extensions']
|
|
||||||
|
|
||||||
try:
|
|
||||||
if ext_type in EXT_MAP[observable_type].keys():
|
|
||||||
raise DuplicateRegistrationError("Observable Extension", ext_type)
|
|
||||||
EXT_MAP[observable_type][ext_type] = new_extension
|
|
||||||
except KeyError:
|
|
||||||
if observable_type not in OBJ_MAP_OBSERVABLE:
|
|
||||||
raise ValueError(
|
|
||||||
"Unknown observable type '%s'. Custom observables "
|
|
||||||
"must be created with the @CustomObservable decorator."
|
|
||||||
% observable_type,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
EXT_MAP[observable_type] = {ext_type: new_extension}
|
|
||||||
|
|
||||||
|
|
||||||
def _collect_stix2_mappings():
|
def _collect_stix2_mappings():
|
||||||
|
@ -401,7 +387,7 @@ def _collect_stix2_mappings():
|
||||||
STIX2_OBJ_MAPS[ver] = {}
|
STIX2_OBJ_MAPS[ver] = {}
|
||||||
STIX2_OBJ_MAPS[ver]['objects'] = mod.OBJ_MAP
|
STIX2_OBJ_MAPS[ver]['objects'] = mod.OBJ_MAP
|
||||||
STIX2_OBJ_MAPS[ver]['observables'] = mod.OBJ_MAP_OBSERVABLE
|
STIX2_OBJ_MAPS[ver]['observables'] = mod.OBJ_MAP_OBSERVABLE
|
||||||
STIX2_OBJ_MAPS[ver]['observable-extensions'] = mod.EXT_MAP
|
STIX2_OBJ_MAPS[ver]['extensions'] = mod.EXT_MAP
|
||||||
elif re.match(r'^stix2\.v2[0-9]\.common$', name) and is_pkg is False:
|
elif re.match(r'^stix2\.v2[0-9]\.common$', name) and is_pkg is False:
|
||||||
mod = importlib.import_module(name, str(top_level_module.__name__))
|
mod = importlib.import_module(name, str(top_level_module.__name__))
|
||||||
STIX2_OBJ_MAPS[ver]['markings'] = mod.OBJ_MAP_MARKING
|
STIX2_OBJ_MAPS[ver]['markings'] = mod.OBJ_MAP_MARKING
|
||||||
|
|
|
@ -10,7 +10,10 @@ class _STIXBase20(_STIXBase):
|
||||||
|
|
||||||
|
|
||||||
class _Observable(_Observable, _STIXBase20):
|
class _Observable(_Observable, _STIXBase20):
|
||||||
pass
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
self._properties['extensions'].allow_custom = kwargs.get('allow_custom', False)
|
||||||
|
super(_Observable, self).__init__(**kwargs)
|
||||||
|
|
||||||
|
|
||||||
class _Extension(_Extension, _STIXBase20):
|
class _Extension(_Extension, _STIXBase20):
|
||||||
|
|
|
@ -793,15 +793,15 @@ def CustomObservable(type='x-custom-observable', properties=None):
|
||||||
_properties = list(itertools.chain.from_iterable([
|
_properties = list(itertools.chain.from_iterable([
|
||||||
[('type', TypeProperty(type, spec_version='2.0'))],
|
[('type', TypeProperty(type, spec_version='2.0'))],
|
||||||
properties,
|
properties,
|
||||||
[('extensions', ExtensionsProperty(spec_version="2.0", enclosing_type=type))],
|
[('extensions', ExtensionsProperty(spec_version='2.0'))],
|
||||||
]))
|
]))
|
||||||
return _custom_observable_builder(cls, type, _properties, '2.0', _Observable)
|
return _custom_observable_builder(cls, type, _properties, '2.0', _Observable)
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
def CustomExtension(observable=None, type='x-custom-observable-ext', properties=None):
|
def CustomExtension(type='x-custom-observable-ext', properties=None):
|
||||||
"""Decorator for custom extensions to STIX Cyber Observables.
|
"""Decorator for custom extensions to STIX Cyber Observables.
|
||||||
"""
|
"""
|
||||||
def wrapper(cls):
|
def wrapper(cls):
|
||||||
return _custom_extension_builder(cls, observable, type, properties, '2.0', _Extension)
|
return _custom_extension_builder(cls, type, properties, '2.0', _Extension)
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
|
@ -14,11 +14,43 @@ class _STIXBase21(_STIXBase):
|
||||||
|
|
||||||
|
|
||||||
class _Observable(_Observable, _STIXBase21):
|
class _Observable(_Observable, _STIXBase21):
|
||||||
pass
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
super(_Observable, self).__init__(**kwargs)
|
||||||
|
if 'id' not in kwargs:
|
||||||
|
# Specific to 2.1+ observables: generate a deterministic ID
|
||||||
|
id_ = self._generate_id()
|
||||||
|
|
||||||
|
# Spec says fall back to UUIDv4 if no contributing properties were
|
||||||
|
# given. That's what already happened (the following is actually
|
||||||
|
# overwriting the default uuidv4), so nothing to do here.
|
||||||
|
if id_ is not None:
|
||||||
|
# Can't assign to self (we're immutable), so slip the ID in
|
||||||
|
# more sneakily.
|
||||||
|
self._inner["id"] = id_
|
||||||
|
|
||||||
|
|
||||||
class _Extension(_Extension, _STIXBase21):
|
class _Extension(_Extension, _STIXBase21):
|
||||||
pass
|
extends_stix_object_definition = False
|
||||||
|
is_new_object = False
|
||||||
|
is_extension_so = False
|
||||||
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
super(_Extension, self).__init__(**kwargs)
|
||||||
|
if getattr(self, "extends_stix_object_definition", False):
|
||||||
|
self._inner["extends_stix_object_definition"] = True
|
||||||
|
elif getattr(self, "is_new_object", False):
|
||||||
|
self._inner["is_new_object"] = True
|
||||||
|
elif getattr(self, "is_extension_so", False):
|
||||||
|
self._inner["is_extension_so"] = True
|
||||||
|
|
||||||
|
def _check_at_least_one_property(self, list_of_properties=None):
|
||||||
|
new_ext_check = (getattr(self, "extends_stix_object_definition", False) or
|
||||||
|
getattr(self, "is_new_object", False) or
|
||||||
|
getattr(self, "is_extension_so", False))
|
||||||
|
|
||||||
|
if new_ext_check is False:
|
||||||
|
super(_Extension, self)._check_at_least_one_property(list_of_properties=list_of_properties)
|
||||||
|
|
||||||
|
|
||||||
class _DomainObject(_DomainObject, _STIXBase21):
|
class _DomainObject(_DomainObject, _STIXBase21):
|
||||||
|
|
|
@ -920,7 +920,7 @@ class X509Certificate(_Observable):
|
||||||
self._check_at_least_one_property(att_list)
|
self._check_at_least_one_property(att_list)
|
||||||
|
|
||||||
|
|
||||||
def CustomObservable(type='x-custom-observable', properties=None, id_contrib_props=None):
|
def CustomObservable(type='x-custom-observable', properties=None, id_contrib_props=None, extension_name=None):
|
||||||
"""Custom STIX Cyber Observable Object type decorator.
|
"""Custom STIX Cyber Observable Object type decorator.
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
|
@ -939,15 +939,24 @@ def CustomObservable(type='x-custom-observable', properties=None, id_contrib_pro
|
||||||
[('type', TypeProperty(type, spec_version='2.1'))],
|
[('type', TypeProperty(type, spec_version='2.1'))],
|
||||||
[('id', IDProperty(type, spec_version='2.1'))],
|
[('id', IDProperty(type, spec_version='2.1'))],
|
||||||
properties,
|
properties,
|
||||||
[('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=type))],
|
[('extensions', ExtensionsProperty(spec_version='2.1'))],
|
||||||
]))
|
]))
|
||||||
|
if extension_name:
|
||||||
|
@CustomExtension(type=extension_name, properties=properties)
|
||||||
|
class NameExtension:
|
||||||
|
is_extension_so = True
|
||||||
|
|
||||||
|
extension = extension_name.split('--')[1]
|
||||||
|
extension = extension.replace('-', '')
|
||||||
|
NameExtension.__name__ = 'STIXExtension' + extension
|
||||||
|
cls.with_extension = extension_name
|
||||||
return _custom_observable_builder(cls, type, _properties, '2.1', _Observable, id_contrib_props)
|
return _custom_observable_builder(cls, type, _properties, '2.1', _Observable, id_contrib_props)
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
def CustomExtension(observable=None, type='x-custom-observable-ext', properties=None):
|
def CustomExtension(type='x-custom-observable-ext', properties=None):
|
||||||
"""Decorator for custom extensions to STIX Cyber Observables.
|
"""Custom STIX Object Extension decorator.
|
||||||
"""
|
"""
|
||||||
def wrapper(cls):
|
def wrapper(cls):
|
||||||
return _custom_extension_builder(cls, observable, type, properties, '2.1', _Extension)
|
return _custom_extension_builder(cls, type, properties, '2.1', _Extension)
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
|
@ -7,6 +7,7 @@ import warnings
|
||||||
from six.moves.urllib.parse import quote_plus
|
from six.moves.urllib.parse import quote_plus
|
||||||
from stix2patterns.validator import run_validator
|
from stix2patterns.validator import run_validator
|
||||||
|
|
||||||
|
from . import observables
|
||||||
from ..custom import _custom_object_builder
|
from ..custom import _custom_object_builder
|
||||||
from ..exceptions import (
|
from ..exceptions import (
|
||||||
InvalidValueError, PropertyPresenceError, STIXDeprecationWarning,
|
InvalidValueError, PropertyPresenceError, STIXDeprecationWarning,
|
||||||
|
@ -778,7 +779,7 @@ class Vulnerability(_DomainObject):
|
||||||
])
|
])
|
||||||
|
|
||||||
|
|
||||||
def CustomObject(type='x-custom-type', properties=None):
|
def CustomObject(type='x-custom-type', properties=None, extension_name=None):
|
||||||
"""Custom STIX Object type decorator.
|
"""Custom STIX Object type decorator.
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
|
@ -808,6 +809,7 @@ def CustomObject(type='x-custom-type', properties=None):
|
||||||
|
|
||||||
"""
|
"""
|
||||||
def wrapper(cls):
|
def wrapper(cls):
|
||||||
|
extension_properties = [x for x in properties if not x[0].startswith('x_')]
|
||||||
_properties = list(itertools.chain.from_iterable([
|
_properties = list(itertools.chain.from_iterable([
|
||||||
[
|
[
|
||||||
('type', TypeProperty(type, spec_version='2.1')),
|
('type', TypeProperty(type, spec_version='2.1')),
|
||||||
|
@ -817,7 +819,7 @@ def CustomObject(type='x-custom-type', properties=None):
|
||||||
('created', TimestampProperty(default=lambda: NOW, precision='millisecond', precision_constraint='min')),
|
('created', TimestampProperty(default=lambda: NOW, precision='millisecond', precision_constraint='min')),
|
||||||
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond', precision_constraint='min')),
|
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond', precision_constraint='min')),
|
||||||
],
|
],
|
||||||
[x for x in properties if not x[0].startswith('x_')],
|
extension_properties,
|
||||||
[
|
[
|
||||||
('revoked', BooleanProperty(default=lambda: False)),
|
('revoked', BooleanProperty(default=lambda: False)),
|
||||||
('labels', ListProperty(StringProperty)),
|
('labels', ListProperty(StringProperty)),
|
||||||
|
@ -826,10 +828,19 @@ def CustomObject(type='x-custom-type', properties=None):
|
||||||
('external_references', ListProperty(ExternalReference)),
|
('external_references', ListProperty(ExternalReference)),
|
||||||
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
|
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
|
||||||
('granular_markings', ListProperty(GranularMarking)),
|
('granular_markings', ListProperty(GranularMarking)),
|
||||||
('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=type)),
|
('extensions', ExtensionsProperty(spec_version='2.1')),
|
||||||
],
|
],
|
||||||
sorted([x for x in properties if x[0].startswith('x_')], key=lambda x: x[0]),
|
sorted([x for x in properties if x[0].startswith('x_')], key=lambda x: x[0]),
|
||||||
]))
|
]))
|
||||||
|
if extension_name:
|
||||||
|
@observables.CustomExtension(type=extension_name, properties=extension_properties)
|
||||||
|
class NameExtension:
|
||||||
|
is_new_object = True
|
||||||
|
|
||||||
|
extension = extension_name.split('--')[1]
|
||||||
|
extension = extension.replace('-', '')
|
||||||
|
NameExtension.__name__ = 'STIXExtension' + extension
|
||||||
|
cls.with_extension = extension_name
|
||||||
return _custom_object_builder(cls, type, _properties, '2.1', _DomainObject)
|
return _custom_object_builder(cls, type, _properties, '2.1', _DomainObject)
|
||||||
|
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
Loading…
Reference in New Issue