Merge branch 'dev-extensions-proposal' of github.com:oasis-open/cti-python-stix2 into dev-extensions-proposal

pull/1/head
Rich Piazza 2021-07-07 11:02:05 -04:00
commit d6a1aa9f74
18 changed files with 365 additions and 149 deletions

View File

@ -1,5 +1,6 @@
"""Base classes for type definitions in the STIX2 library.""" """Base classes for type definitions in the STIX2 library."""
import collections
import collections.abc import collections.abc
import copy import copy
import itertools import itertools
@ -35,16 +36,6 @@ def get_required_properties(properties):
class _STIXBase(collections.abc.Mapping): class _STIXBase(collections.abc.Mapping):
"""Base class for STIX object types""" """Base class for STIX object types"""
def object_properties(self):
props = set(self._properties.keys())
custom_props = list(set(self._inner.keys()) - props)
custom_props.sort()
all_properties = list(self._properties.keys())
all_properties.extend(custom_props) # Any custom properties to the bottom
return all_properties
def _check_property(self, prop_name, prop, kwargs, allow_custom): def _check_property(self, prop_name, prop, kwargs, allow_custom):
if prop_name not in kwargs: if prop_name not in kwargs:
if hasattr(prop, 'default'): if hasattr(prop, 'default'):
@ -131,46 +122,45 @@ class _STIXBase(collections.abc.Mapping):
if custom_props and not isinstance(custom_props, dict): if custom_props and not isinstance(custom_props, dict):
raise ValueError("'custom_properties' must be a dictionary") raise ValueError("'custom_properties' must be a dictionary")
# Detect any keyword arguments not allowed for a specific type. # Detect any keyword arguments representing customization.
# In STIX 2.1, this is complicated by "toplevel-property-extension" # In STIX 2.1, this is complicated by "toplevel-property-extension"
# type extensions, which can add extra properties which are *not* # type extensions, which can add extra properties which are *not*
# considered custom. # considered custom.
extra_kwargs = kwargs.keys() - self._properties.keys()
extensions = kwargs.get("extensions") extensions = kwargs.get("extensions")
registered_toplevel_extension_props = {}
has_unregistered_toplevel_extension = False
if extensions: if extensions:
has_unregistered_toplevel_extension = False
registered_toplevel_extension_props = set()
for ext_id, ext in extensions.items(): for ext_id, ext in extensions.items():
if ext.get("extension_type") == "toplevel-property-extension": if ext.get("extension_type") == "toplevel-property-extension":
registered_ext_class = class_for_type( registered_ext_class = class_for_type(
ext_id, "2.1", "extensions", ext_id, "2.1", "extensions",
) )
if registered_ext_class: if registered_ext_class:
registered_toplevel_extension_props |= \ registered_toplevel_extension_props.update(
registered_ext_class._properties.keys() registered_ext_class._toplevel_properties,
)
else: else:
has_unregistered_toplevel_extension = True has_unregistered_toplevel_extension = True
if has_unregistered_toplevel_extension: if has_unregistered_toplevel_extension:
# Must assume all extras are extension properties, not custom. # Must assume all extras are extension properties, not custom.
extra_kwargs.clear() custom_kwargs = set()
else: else:
# All toplevel property extensions (if any) have been # All toplevel property extensions (if any) have been
# registered. So we can tell what their properties are and # registered. So we can tell what their properties are and
# treat only those as not custom. # treat only those as not custom.
extra_kwargs -= registered_toplevel_extension_props custom_kwargs = kwargs.keys() - self._properties.keys() \
- registered_toplevel_extension_props.keys()
if extra_kwargs and not allow_custom: if custom_kwargs and not allow_custom:
raise ExtraPropertiesError(cls, extra_kwargs) raise ExtraPropertiesError(cls, custom_kwargs)
if custom_props: if custom_props:
# loophole for custom_properties... # loophole for custom_properties...
allow_custom = True allow_custom = True
all_custom_prop_names = (extra_kwargs | custom_props.keys()) - \ all_custom_prop_names = (custom_kwargs | custom_props.keys()) - \
self._properties.keys() self._properties.keys()
if all_custom_prop_names: if all_custom_prop_names:
if not isinstance(self, stix2.v20._STIXBase20): if not isinstance(self, stix2.v20._STIXBase20):
@ -181,39 +171,52 @@ class _STIXBase(collections.abc.Mapping):
reason="Property name '%s' must begin with an alpha character." % prop_name, reason="Property name '%s' must begin with an alpha character." % prop_name,
) )
# Remove any keyword arguments whose value is None or [] (i.e. empty list) # defined_properties = all properties defined on this type, plus all
setting_kwargs = { # properties defined on this instance as a result of toplevel property
k: v # extensions.
for k, v in itertools.chain(kwargs.items(), custom_props.items()) defined_properties = collections.ChainMap(
if v is not None and v != [] self._properties, registered_toplevel_extension_props,
} )
# Detect any missing required properties assigned_properties = collections.ChainMap(kwargs, custom_props)
required_properties = set(get_required_properties(self._properties))
missing_kwargs = required_properties - set(setting_kwargs) # Establish property order: spec-defined, toplevel extension, custom.
if missing_kwargs: toplevel_extension_props = registered_toplevel_extension_props.keys() \
# In this scenario, we are inside within the scope of the extension. | (kwargs.keys() - self._properties.keys() - custom_kwargs)
# It is possible to check if this is a new Extension Class by property_order = itertools.chain(
# querying "extension_type". Note: There is an API limitation currently self._properties,
# because a toplevel-property-extension cannot validate its parent properties toplevel_extension_props,
new_ext_check = ( sorted(all_custom_prop_names),
bool(getattr(self, "extension_type", None)) )
and issubclass(cls, stix2.v21._Extension)
) setting_kwargs = {}
if new_ext_check is False:
raise MissingPropertiesError(cls, missing_kwargs)
has_custom = bool(all_custom_prop_names) has_custom = bool(all_custom_prop_names)
for prop_name, prop_metadata in self._properties.items(): for prop_name in property_order:
temp_custom = self._check_property(
prop_name, prop_metadata, setting_kwargs, allow_custom,
)
has_custom = has_custom or temp_custom prop_val = assigned_properties.get(prop_name)
if prop_val not in (None, []):
setting_kwargs[prop_name] = prop_val
prop = defined_properties.get(prop_name)
if prop:
temp_custom = self._check_property(
prop_name, prop, setting_kwargs, allow_custom,
)
has_custom = has_custom or temp_custom
# Detect any missing required properties
required_properties = set(
get_required_properties(defined_properties),
)
missing_kwargs = required_properties - setting_kwargs.keys()
if missing_kwargs:
raise MissingPropertiesError(cls, missing_kwargs)
# Cache defaulted optional properties for serialization # Cache defaulted optional properties for serialization
defaulted = [] defaulted = []
for name, prop in self._properties.items(): for name, prop in defined_properties.items():
try: try:
if ( if (
not prop.required and not hasattr(prop, '_fixed_value') and not prop.required and not hasattr(prop, '_fixed_value') and
@ -278,7 +281,7 @@ class _STIXBase(collections.abc.Mapping):
return self.serialize() return self.serialize()
def __repr__(self): def __repr__(self):
props = ', '.join([f"{k}={self[k]!r}" for k in self.object_properties() if self.get(k)]) props = ', '.join([f"{k}={self[k]!r}" for k in self])
return f'{self.__class__.__name__}({props})' return f'{self.__class__.__name__}({props})'
def __deepcopy__(self, memo): def __deepcopy__(self, memo):

View File

@ -1,6 +1,7 @@
from collections import OrderedDict from collections import OrderedDict
from .base import _cls_init from .base import _cls_init
from .properties import EnumProperty
from .registration import ( from .registration import (
_get_extension_class, _register_extension, _register_marking, _get_extension_class, _register_extension, _register_marking,
_register_object, _register_observable, _register_object, _register_observable,
@ -93,12 +94,46 @@ def _custom_observable_builder(cls, type, properties, version, base_class, id_co
def _custom_extension_builder(cls, type, properties, version, base_class): def _custom_extension_builder(cls, type, properties, version, base_class):
prop_dict = _get_properties_dict(properties)
properties = _get_properties_dict(properties)
toplevel_properties = None
# Auto-create an "extension_type" property from the class attribute, if
# it exists. How to treat the other properties which were given depends on
# the extension type.
extension_type = getattr(cls, "extension_type", None)
if extension_type:
# I suppose I could also go with a plain string property, since the
# value is fixed... but an enum property seems more true to the
# property's semantics. Also, I can't import a vocab module for the
# enum values without circular import errors. :(
extension_type_prop = EnumProperty(
[
"new-sdo", "new-sco", "new-sro", "property-extension",
"toplevel-property-extension",
],
required=False,
fixed=extension_type,
)
nested_properties = {
"extension_type": extension_type_prop,
}
if extension_type == "toplevel-property-extension":
toplevel_properties = properties
else:
nested_properties.update(properties)
else:
nested_properties = properties
class _CustomExtension(cls, base_class): class _CustomExtension(cls, base_class):
_type = type _type = type
_properties = prop_dict _properties = nested_properties
if extension_type == "toplevel-property-extension":
_toplevel_properties = toplevel_properties
def __init__(self, **kwargs): def __init__(self, **kwargs):
base_class.__init__(self, **kwargs) base_class.__init__(self, **kwargs)

View File

@ -1,6 +1,5 @@
"""STIX2 core serialization methods.""" """STIX2 core serialization methods."""
import copy
import datetime as dt import datetime as dt
import io import io
@ -24,7 +23,7 @@ class STIXJSONEncoder(json.JSONEncoder):
if isinstance(obj, (dt.date, dt.datetime)): if isinstance(obj, (dt.date, dt.datetime)):
return format_datetime(obj) return format_datetime(obj)
elif isinstance(obj, stix2.base._STIXBase): elif isinstance(obj, stix2.base._STIXBase):
tmp_obj = dict(copy.deepcopy(obj)) tmp_obj = dict(obj)
for prop_name in obj._defaulted_optional_properties: for prop_name in obj._defaulted_optional_properties:
del tmp_obj[prop_name] del tmp_obj[prop_name]
return tmp_obj return tmp_obj
@ -177,7 +176,7 @@ def find_property_index(obj, search_key, search_value):
if isinstance(obj, stix2.base._STIXBase): if isinstance(obj, stix2.base._STIXBase):
if search_key in obj and obj[search_key] == search_value: if search_key in obj and obj[search_key] == search_value:
idx = _find(obj.object_properties(), search_key) idx = _find(list(obj), search_key)
else: else:
idx = _find_property_in_seq(obj.values(), search_key, search_value) idx = _find_property_in_seq(obj.values(), search_key, search_value)
elif isinstance(obj, dict): elif isinstance(obj, dict):

View File

@ -74,6 +74,6 @@ def test_identity_with_custom():
) )
assert identity.x_foo == "bar" assert identity.x_foo == "bar"
assert "x_foo" in identity.object_properties() assert "x_foo" in identity
# TODO: Add other examples # TODO: Add other examples

View File

@ -28,6 +28,7 @@ EXPECTED_INDICATOR_REPR = "Indicator(" + " ".join(
modified='2017-01-01T00:00:01.000Z', modified='2017-01-01T00:00:01.000Z',
pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']", pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
valid_from='1970-01-01T00:00:01Z', valid_from='1970-01-01T00:00:01Z',
revoked=False,
labels=['malicious-activity'] labels=['malicious-activity']
""".split(), """.split(),
) + ")" ) + ")"

View File

@ -21,7 +21,7 @@ EXPECTED_TLP_MARKING_DEFINITION = """{
EXPECTED_STATEMENT_MARKING_DEFINITION = """{ EXPECTED_STATEMENT_MARKING_DEFINITION = """{
"type": "marking-definition", "type": "marking-definition",
"id": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9", "id": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
"created": "2017-01-20T00:00:00Z", "created": "2017-01-20T00:00:00.000Z",
"definition_type": "statement", "definition_type": "statement",
"definition": { "definition": {
"statement": "Copyright 2016, Example Corp" "statement": "Copyright 2016, Example Corp"

View File

@ -1,3 +1,4 @@
import contextlib
import uuid import uuid
import pytest import pytest
@ -8,7 +9,9 @@ import stix2.registration
import stix2.registry import stix2.registry
import stix2.v21 import stix2.v21
from ...exceptions import DuplicateRegistrationError, InvalidValueError from ...exceptions import (
DuplicateRegistrationError, InvalidValueError, MissingPropertiesError,
)
from .constants import FAKE_TIME, IDENTITY_ID, MARKING_DEFINITION_ID from .constants import FAKE_TIME, IDENTITY_ID, MARKING_DEFINITION_ID
# Custom Properties in SDOs # Custom Properties in SDOs
@ -1675,6 +1678,194 @@ def test_registered_new_extension_marking_allow_custom_false():
'{"extension_type": "property-extension", "some_marking_field": "value"}}' in marking_serialized '{"extension_type": "property-extension", "some_marking_field": "value"}}' in marking_serialized
@contextlib.contextmanager
def _register_extension(ext, props):
ext_def_id = "extension-definition--" + str(uuid.uuid4())
stix2.v21.CustomExtension(
ext_def_id,
props,
)(ext)
try:
yield ext_def_id
finally:
# "unregister" the extension
del stix2.registry.STIX2_OBJ_MAPS["2.1"]["extensions"][ext_def_id]
def test_nested_ext_prop_meta():
class TestExt:
extension_type = "property-extension"
props = {
"intprop": stix2.properties.IntegerProperty(required=True),
"strprop": stix2.properties.StringProperty(
required=False, default=lambda: "foo",
),
}
with _register_extension(TestExt, props) as ext_def_id:
obj = stix2.v21.Identity(
name="test",
extensions={
ext_def_id: {
"extension_type": "property-extension",
"intprop": "1",
"strprop": 2,
},
},
)
assert obj.extensions[ext_def_id].extension_type == "property-extension"
assert obj.extensions[ext_def_id].intprop == 1
assert obj.extensions[ext_def_id].strprop == "2"
obj = stix2.v21.Identity(
name="test",
extensions={
ext_def_id: {
"extension_type": "property-extension",
"intprop": "1",
},
},
)
# Ensure default kicked in
assert obj.extensions[ext_def_id].strprop == "foo"
with pytest.raises(InvalidValueError):
stix2.v21.Identity(
name="test",
extensions={
ext_def_id: {
"extension_type": "property-extension",
# wrong value type
"intprop": "foo",
},
},
)
with pytest.raises(InvalidValueError):
stix2.v21.Identity(
name="test",
extensions={
ext_def_id: {
"extension_type": "property-extension",
# missing required property
"strprop": "foo",
},
},
)
with pytest.raises(InvalidValueError):
stix2.v21.Identity(
name="test",
extensions={
ext_def_id: {
"extension_type": "property-extension",
"intprop": 1,
# Use of undefined property
"foo": False,
},
},
)
with pytest.raises(InvalidValueError):
stix2.v21.Identity(
name="test",
extensions={
ext_def_id: {
# extension_type doesn't match with registration
"extension_type": "new-sdo",
"intprop": 1,
"strprop": "foo",
},
},
)
def test_toplevel_ext_prop_meta():
class TestExt:
extension_type = "toplevel-property-extension"
props = {
"intprop": stix2.properties.IntegerProperty(required=True),
"strprop": stix2.properties.StringProperty(
required=False, default=lambda: "foo",
),
}
with _register_extension(TestExt, props) as ext_def_id:
obj = stix2.v21.Identity(
name="test",
intprop="1",
strprop=2,
extensions={
ext_def_id: {
"extension_type": "toplevel-property-extension",
},
},
)
assert obj.extensions[ext_def_id].extension_type == "toplevel-property-extension"
assert obj.intprop == 1
assert obj.strprop == "2"
obj = stix2.v21.Identity(
name="test",
intprop=1,
extensions={
ext_def_id: {
"extension_type": "toplevel-property-extension",
},
},
)
# Ensure default kicked in
assert obj.strprop == "foo"
with pytest.raises(InvalidValueError):
stix2.v21.Identity(
name="test",
intprop="foo", # wrong value type
extensions={
ext_def_id: {
"extension_type": "toplevel-property-extension",
},
},
)
with pytest.raises(InvalidValueError):
stix2.v21.Identity(
name="test",
intprop=1,
extensions={
ext_def_id: {
"extension_type": "toplevel-property-extension",
# Use of undefined property
"foo": False,
},
},
)
with pytest.raises(MissingPropertiesError):
stix2.v21.Identity(
name="test",
strprop="foo", # missing required property
extensions={
ext_def_id: {
"extension_type": "toplevel-property-extension",
},
},
)
def test_allow_custom_propagation(): def test_allow_custom_propagation():
obj_dict = { obj_dict = {
"type": "bundle", "type": "bundle",

View File

@ -105,4 +105,3 @@ def test_extension_definition_with_custom():
) )
assert extension_definition.x_foo == "bar" assert extension_definition.x_foo == "bar"
assert "x_foo" in extension_definition.object_properties()

View File

@ -77,6 +77,5 @@ def test_identity_with_custom():
) )
assert identity.x_foo == "bar" assert identity.x_foo == "bar"
assert "x_foo" in identity.object_properties()
# TODO: Add other examples # TODO: Add other examples

View File

@ -78,4 +78,3 @@ def test_incident_with_custom():
) )
assert incident.x_foo == "bar" assert incident.x_foo == "bar"
assert "x_foo" in incident.object_properties()

View File

@ -30,7 +30,8 @@ EXPECTED_INDICATOR_REPR = "Indicator(" + " ".join(
pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']", pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
pattern_type='stix', pattern_type='stix',
pattern_version='2.1', pattern_version='2.1',
valid_from='1970-01-01T00:00:01Z' valid_from='1970-01-01T00:00:01Z',
revoked=False
""".split(), """.split(),
) + ")" ) + ")"

View File

@ -27,7 +27,8 @@ EXPECTED_LOCATION_1_REPR = "Location(" + " ".join(
created='2016-04-06T20:03:00.000Z', created='2016-04-06T20:03:00.000Z',
modified='2016-04-06T20:03:00.000Z', modified='2016-04-06T20:03:00.000Z',
latitude=48.8566, latitude=48.8566,
longitude=2.3522""".split(), longitude=2.3522,
revoked=False""".split(),
) + ")" ) + ")"
EXPECTED_LOCATION_2 = """{ EXPECTED_LOCATION_2 = """{
@ -47,7 +48,8 @@ EXPECTED_LOCATION_2_REPR = "Location(" + " ".join(
id='location--a6e9345f-5a15-4c29-8bb3-7dcc5d168d64', id='location--a6e9345f-5a15-4c29-8bb3-7dcc5d168d64',
created='2016-04-06T20:03:00.000Z', created='2016-04-06T20:03:00.000Z',
modified='2016-04-06T20:03:00.000Z', modified='2016-04-06T20:03:00.000Z',
region='northern-america'""".split(), region='northern-america',
revoked=False""".split(),
) + ")" ) + ")"

View File

@ -48,6 +48,7 @@ EXPECTED_OPINION_REPR = "Note(" + " ".join((
content='%s', content='%s',
authors=['John Doe'], authors=['John Doe'],
object_refs=['campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f'], object_refs=['campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f'],
revoked=False,
external_references=[ExternalReference(source_name='job-tracker', external_id='job-id-1234')] external_references=[ExternalReference(source_name='job-tracker', external_id='job-id-1234')]
""" % CONTENT """ % CONTENT
).split()) + ")" ).split()) + ")"

View File

@ -38,7 +38,8 @@ EXPECTED_OPINION_REPR = "Opinion(" + " ".join((
modified='2016-05-12T08:17:27.000Z', modified='2016-05-12T08:17:27.000Z',
explanation="%s", explanation="%s",
opinion='strongly-disagree', opinion='strongly-disagree',
object_refs=['relationship--16d2358f-3b0d-4c88-b047-0da2f7ed4471'] object_refs=['relationship--16d2358f-3b0d-4c88-b047-0da2f7ed4471'],
revoked=False
""" % EXPLANATION """ % EXPLANATION
).split()) + ")" ).split()) + ")"

View File

@ -19,20 +19,19 @@ from .base import (
) )
from .bundle import Bundle from .bundle import Bundle
from .common import ( from .common import (
TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, CustomMarking, TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, CustomExtension, CustomMarking,
ExtensionDefinition, ExternalReference, GranularMarking, KillChainPhase, ExtensionDefinition, ExternalReference, GranularMarking, KillChainPhase,
LanguageContent, MarkingDefinition, StatementMarking, TLPMarking, LanguageContent, MarkingDefinition, StatementMarking, TLPMarking,
) )
from .observables import ( from .observables import (
URL, AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, URL, AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem,
CustomExtension, CustomObservable, Directory, DomainName, EmailAddress, CustomObservable, Directory, DomainName, EmailAddress, EmailMessage,
EmailMessage, EmailMIMEComponent, File, HTTPRequestExt, ICMPExt, EmailMIMEComponent, File, HTTPRequestExt, ICMPExt, IPv4Address,
IPv4Address, IPv6Address, MACAddress, Mutex, NetworkTraffic, NTFSExt, IPv6Address, MACAddress, Mutex, NetworkTraffic, NTFSExt, PDFExt, Process,
PDFExt, Process, RasterImageExt, SocketExt, Software, TCPExt, RasterImageExt, SocketExt, Software, TCPExt, UNIXAccountExt, UserAccount,
UNIXAccountExt, UserAccount, WindowsPEBinaryExt, WindowsPEBinaryExt, WindowsPEOptionalHeaderType, WindowsPESection,
WindowsPEOptionalHeaderType, WindowsPESection, WindowsProcessExt, WindowsProcessExt, WindowsRegistryKey, WindowsRegistryValueType,
WindowsRegistryKey, WindowsRegistryValueType, WindowsServiceExt, WindowsServiceExt, X509Certificate, X509V3ExtensionsType,
X509Certificate, X509V3ExtensionsType,
) )
from .sdo import ( from .sdo import (
AttackPattern, Campaign, CourseOfAction, CustomObject, Grouping, Identity, AttackPattern, Campaign, CourseOfAction, CustomObject, Grouping, Identity,

View File

@ -2,7 +2,8 @@
from collections import OrderedDict from collections import OrderedDict
from ..custom import _custom_marking_builder from . import _Extension
from ..custom import _custom_extension_builder, _custom_marking_builder
from ..exceptions import InvalidValueError, PropertyPresenceError from ..exceptions import InvalidValueError, PropertyPresenceError
from ..markings import _MarkingsMixin from ..markings import _MarkingsMixin
from ..markings.utils import check_tlp_marking from ..markings.utils import check_tlp_marking
@ -139,6 +140,15 @@ class ExtensionDefinition(_STIXBase21):
]) ])
def CustomExtension(type='x-custom-ext', properties=None):
"""Custom STIX Object Extension decorator.
"""
def wrapper(cls):
return _custom_extension_builder(cls, type, properties, '2.1', _Extension)
return wrapper
class TLPMarking(_STIXBase21): class TLPMarking(_STIXBase21):
"""For more detailed information on this object's properties, see """For more detailed information on this object's properties, see
`the STIX 2.1 specification <https://docs.oasis-open.org/cti/stix/v2.1/cs02/stix-v2.1-cs02.html#_yd3ar14ekwrs>`__. `the STIX 2.1 specification <https://docs.oasis-open.org/cti/stix/v2.1/cs02/stix-v2.1-cs02.html#_yd3ar14ekwrs>`__.
@ -258,9 +268,7 @@ def CustomMarking(type='x-custom-marking', properties=None, extension_name=None)
""" """
def wrapper(cls): def wrapper(cls):
if extension_name: if extension_name:
from . import observables @CustomExtension(type=extension_name, properties=properties)
@observables.CustomExtension(type=extension_name, properties=properties)
class NameExtension: class NameExtension:
extension_type = 'property-extension' extension_type = 'property-extension'

View File

@ -6,10 +6,9 @@ _Observable and do not have a ``_type`` attribute.
""" """
from collections import OrderedDict from collections import OrderedDict
from collections.abc import Mapping
import itertools import itertools
from ..custom import _custom_extension_builder, _custom_observable_builder from ..custom import _custom_observable_builder
from ..exceptions import AtLeastOnePropertyError, DependentPropertiesError from ..exceptions import AtLeastOnePropertyError, DependentPropertiesError
from ..properties import ( from ..properties import (
BinaryProperty, BooleanProperty, DictionaryProperty, BinaryProperty, BooleanProperty, DictionaryProperty,
@ -19,9 +18,9 @@ from ..properties import (
TypeProperty, TypeProperty,
) )
from .base import _Extension, _Observable, _STIXBase21 from .base import _Extension, _Observable, _STIXBase21
from .common import GranularMarking from .common import CustomExtension, GranularMarking
from .vocab import ( from .vocab import (
ACCOUNT_TYPE, ENCRYPTION_ALGORITHM, EXTENSION_TYPE, HASHING_ALGORITHM, ACCOUNT_TYPE, ENCRYPTION_ALGORITHM, HASHING_ALGORITHM,
NETWORK_SOCKET_ADDRESS_FAMILY, NETWORK_SOCKET_TYPE, NETWORK_SOCKET_ADDRESS_FAMILY, NETWORK_SOCKET_TYPE,
WINDOWS_INTEGRITY_LEVEL, WINDOWS_PEBINARY_TYPE, WINDOWS_REGISTRY_DATATYPE, WINDOWS_INTEGRITY_LEVEL, WINDOWS_PEBINARY_TYPE, WINDOWS_REGISTRY_DATATYPE,
WINDOWS_SERVICE_START_TYPE, WINDOWS_SERVICE_STATUS, WINDOWS_SERVICE_TYPE, WINDOWS_SERVICE_START_TYPE, WINDOWS_SERVICE_STATUS, WINDOWS_SERVICE_TYPE,
@ -874,19 +873,23 @@ def CustomObservable(type='x-custom-observable', properties=None, id_contrib_pro
""" """
def wrapper(cls): def wrapper(cls):
_properties = list( _properties = list(
itertools.chain.from_iterable([ itertools.chain(
[('type', TypeProperty(type, spec_version='2.1'))], [
[('spec_version', StringProperty(fixed='2.1'))], ('type', TypeProperty(type, spec_version='2.1')),
[('id', IDProperty(type, spec_version='2.1'))], ('spec_version', StringProperty(fixed='2.1')),
('id', IDProperty(type, spec_version='2.1')),
],
properties, properties,
[('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1')))], [
[('granular_markings', ListProperty(GranularMarking))], ('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
[('defanged', BooleanProperty(default=lambda: False))], ('granular_markings', ListProperty(GranularMarking)),
[('extensions', ExtensionsProperty(spec_version='2.1'))], ('defanged', BooleanProperty(default=lambda: False)),
]), ('extensions', ExtensionsProperty(spec_version='2.1')),
],
),
) )
if extension_name: if extension_name:
@CustomExtension(type=extension_name, properties=properties) @CustomExtension(type=extension_name, properties={})
class NameExtension: class NameExtension:
extension_type = 'new-sco' extension_type = 'new-sco'
@ -896,27 +899,3 @@ def CustomObservable(type='x-custom-observable', properties=None, id_contrib_pro
cls.with_extension = extension_name cls.with_extension = extension_name
return _custom_observable_builder(cls, type, _properties, '2.1', _Observable, id_contrib_props) return _custom_observable_builder(cls, type, _properties, '2.1', _Observable, id_contrib_props)
return wrapper return wrapper
def CustomExtension(type='x-custom-observable-ext', properties=None):
"""Custom STIX Object Extension decorator.
"""
def wrapper(cls):
# Auto-create an "extension_type" property from the class attribute, if
# it exists.
extension_type = getattr(cls, "extension_type", None)
if extension_type:
extension_type_prop = EnumProperty(
EXTENSION_TYPE,
required=False,
fixed=extension_type,
)
if isinstance(properties, Mapping):
properties["extension_type"] = extension_type_prop
else:
properties.append(("extension_type", extension_type_prop))
return _custom_extension_builder(cls, type, properties, '2.1', _Extension)
return wrapper

View File

@ -1,13 +1,11 @@
"""STIX 2.1 Domain Objects.""" """STIX 2.1 Domain Objects."""
from collections import OrderedDict from collections import OrderedDict
import itertools
from urllib.parse import quote_plus from urllib.parse import quote_plus
import warnings import warnings
from stix2patterns.validator import run_validator from stix2patterns.validator import run_validator
from . import observables
from ..custom import _custom_object_builder from ..custom import _custom_object_builder
from ..exceptions import ( from ..exceptions import (
InvalidValueError, PropertyPresenceError, STIXDeprecationWarning, InvalidValueError, PropertyPresenceError, STIXDeprecationWarning,
@ -20,7 +18,9 @@ from ..properties import (
) )
from ..utils import NOW from ..utils import NOW
from .base import _DomainObject from .base import _DomainObject
from .common import ExternalReference, GranularMarking, KillChainPhase from .common import (
CustomExtension, ExternalReference, GranularMarking, KillChainPhase,
)
from .vocab import ( from .vocab import (
ATTACK_MOTIVATION, ATTACK_RESOURCE_LEVEL, GROUPING_CONTEXT, IDENTITY_CLASS, ATTACK_MOTIVATION, ATTACK_RESOURCE_LEVEL, GROUPING_CONTEXT, IDENTITY_CLASS,
IMPLEMENTATION_LANGUAGE, INDICATOR_TYPE, INDUSTRY_SECTOR, IMPLEMENTATION_LANGUAGE, INDICATOR_TYPE, INDUSTRY_SECTOR,
@ -833,32 +833,31 @@ def CustomObject(type='x-custom-type', properties=None, extension_name=None, is_
""" """
def wrapper(cls): def wrapper(cls):
extension_properties = [x for x in properties if not x[0].startswith('x_')] extension_properties = [x for x in properties if not x[0].startswith('x_')]
_properties = list( _properties = (
itertools.chain.from_iterable([ [
[ ('type', TypeProperty(type, spec_version='2.1')),
('type', TypeProperty(type, spec_version='2.1')), ('spec_version', StringProperty(fixed='2.1')),
('spec_version', StringProperty(fixed='2.1')), ('id', IDProperty(type, spec_version='2.1')),
('id', IDProperty(type, spec_version='2.1')), ('created_by_ref', ReferenceProperty(valid_types='identity', spec_version='2.1')),
('created_by_ref', ReferenceProperty(valid_types='identity', spec_version='2.1')), ('created', TimestampProperty(default=lambda: NOW, precision='millisecond', precision_constraint='min')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond', precision_constraint='min')), ('modified', TimestampProperty(default=lambda: NOW, precision='millisecond', precision_constraint='min')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond', precision_constraint='min')), ]
], + extension_properties
extension_properties, + [
[ ('revoked', BooleanProperty(default=lambda: False)),
('revoked', BooleanProperty(default=lambda: False)), ('labels', ListProperty(StringProperty)),
('labels', ListProperty(StringProperty)), ('confidence', IntegerProperty()),
('confidence', IntegerProperty()), ('lang', StringProperty()),
('lang', StringProperty()), ('external_references', ListProperty(ExternalReference)),
('external_references', ListProperty(ExternalReference)), ('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))), ('granular_markings', ListProperty(GranularMarking)),
('granular_markings', ListProperty(GranularMarking)), ('extensions', ExtensionsProperty(spec_version='2.1')),
('extensions', ExtensionsProperty(spec_version='2.1')), ]
], + sorted((x for x in properties if x[0].startswith('x_')), key=lambda x: x[0])
sorted([x for x in properties if x[0].startswith('x_')], key=lambda x: x[0]),
]),
) )
if extension_name: if extension_name:
@observables.CustomExtension(type=extension_name, properties=extension_properties) @CustomExtension(type=extension_name, properties={})
class NameExtension: class NameExtension:
if is_sdo: if is_sdo:
extension_type = 'new-sdo' extension_type = 'new-sdo'