Pass allow_custom to ExtensionsProperty

Also fix bug caused by _properties being a class variable rather than an
instance variable. If you created an object with allow_custom,
allow_custom would be set for all future instances.
stix2.0
Chris Lenk 2018-05-16 12:14:33 -04:00
parent 06e5a33639
commit 69c31ca3fc
7 changed files with 129 additions and 20 deletions

View File

@ -264,10 +264,11 @@ class _Observable(_STIXBase):
def __init__(self, **kwargs):
# the constructor might be called independently of an observed data object
if '_valid_refs' in kwargs:
self._STIXBase__valid_refs = kwargs.pop('_valid_refs')
else:
self._STIXBase__valid_refs = []
self._STIXBase__valid_refs = kwargs.pop('_valid_refs', [])
self.__allow_custom = kwargs.get('allow_custom', False)
self._properties['extensions'].allow_custom = kwargs.get('allow_custom', False)
super(_Observable, self).__init__(**kwargs)
def _check_ref(self, ref, prop, prop_name):

View File

@ -62,9 +62,8 @@ class Bundle(_STIXBase):
else:
kwargs['objects'] = list(args) + kwargs.get('objects', [])
allow_custom = kwargs.get('allow_custom', False)
if allow_custom:
self._properties['objects'] = ListProperty(STIXObjectProperty(True))
self.__allow_custom = kwargs.get('allow_custom', False)
self._properties['objects'].allow_custom = kwargs.get('allow_custom', False)
super(Bundle, self).__init__(**kwargs)

View File

@ -88,6 +88,13 @@ def test_parse_identity_custom_property(data):
assert identity.foo == "bar"
def test_custom_property_in_bundled_object():
bundle = stix2.Bundle(IDENTITY_CUSTOM_PROP, allow_custom=True)
assert bundle.objects[0].x_foo == "bar"
assert '"x_foo": "bar"' in str(bundle)
def test_custom_property_in_observed_data():
artifact = stix2.File(
allow_custom=True,
@ -106,11 +113,61 @@ def test_custom_property_in_observed_data():
assert '"x_foo": "bar"' in str(observed_data)
def test_custom_property_in_bundled_object():
bundle = stix2.Bundle(IDENTITY_CUSTOM_PROP, allow_custom=True)
def test_custom_property_object_in_observable_extension():
ntfs = stix2.NTFSExt(
allow_custom=True,
sid=1,
x_foo='bar',
)
artifact = stix2.File(
name='test',
extensions={'ntfs-ext': ntfs},
)
observed_data = stix2.ObservedData(
allow_custom=True,
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=0,
objects={"0": artifact},
)
assert bundle.objects[0].x_foo == "bar"
assert '"x_foo": "bar"' in str(bundle)
assert observed_data.objects['0'].extensions['ntfs-ext'].x_foo == "bar"
assert '"x_foo": "bar"' in str(observed_data)
def test_custom_property_dict_in_observable_extension():
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
artifact = stix2.File(
name='test',
extensions={
'ntfs-ext': {
'sid': 1,
'x_foo': 'bar',
}
},
)
artifact = stix2.File(
allow_custom=True,
name='test',
extensions={
'ntfs-ext': {
'allow_custom': True,
'sid': 1,
'x_foo': 'bar',
}
},
)
observed_data = stix2.ObservedData(
allow_custom=True,
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=0,
objects={"0": artifact},
)
assert observed_data.objects['0'].extensions['ntfs-ext'].x_foo == "bar"
assert '"x_foo": "bar"' in str(observed_data)
def test_identity_custom_property_revoke():
@ -560,6 +617,7 @@ def test_custom_extension():
def test_custom_extension_wrong_observable_type():
# NewExtension is an extension of DomainName, not File
ext = NewExtension(property1='something')
with pytest.raises(ValueError) as excinfo:
stix2.File(name="abc.txt",

View File

@ -266,3 +266,49 @@ def test_default_object_marking_refs():
campaign = Campaign(**CAMPAIGN_KWARGS)
assert campaign.object_marking_refs[0] == mark_def.id
def test_workbench_custom_property_object_in_observable_extension():
ntfs = stix2.NTFSExt(
allow_custom=True,
sid=1,
x_foo='bar',
)
artifact = stix2.File(
name='test',
extensions={'ntfs-ext': ntfs},
)
observed_data = ObservedData(
allow_custom=True,
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=0,
objects={"0": artifact},
)
assert observed_data.objects['0'].extensions['ntfs-ext'].x_foo == "bar"
assert '"x_foo": "bar"' in str(observed_data)
def test_workbench_custom_property_dict_in_observable_extension():
artifact = stix2.File(
allow_custom=True,
name='test',
extensions={
'ntfs-ext': {
'allow_custom': True,
'sid': 1,
'x_foo': 'bar',
}
},
)
observed_data = ObservedData(
allow_custom=True,
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=0,
objects={"0": artifact},
)
assert observed_data.objects['0'].extensions['ntfs-ext'].x_foo == "bar"
assert '"x_foo": "bar"' in str(observed_data)

View File

@ -56,7 +56,8 @@ class ExtensionsProperty(DictionaryProperty):
"""Property for representing extensions on Observable objects.
"""
def __init__(self, enclosing_type=None, required=False):
def __init__(self, allow_custom=False, enclosing_type=None, required=False):
self.allow_custom = allow_custom
self.enclosing_type = enclosing_type
super(ExtensionsProperty, self).__init__(required)
@ -78,8 +79,13 @@ class ExtensionsProperty(DictionaryProperty):
if key in specific_type_map:
cls = specific_type_map[key]
if type(subvalue) is dict:
dictified[key] = cls(**subvalue)
if self.allow_custom:
subvalue['allow_custom'] = True
dictified[key] = cls(**subvalue)
else:
dictified[key] = cls(**subvalue)
elif type(subvalue) is cls:
# If already an instance of an _Extension class, assume it's valid
dictified[key] = subvalue
else:
raise ValueError("Cannot determine extension type.")

View File

@ -225,9 +225,9 @@ class ObservedData(STIXDomainObject):
('granular_markings', ListProperty(GranularMarking)),
])
def __init__(self, allow_custom=False, *args, **kwargs):
if allow_custom:
self._properties['objects'] = ObservableProperty(True)
def __init__(self, *args, **kwargs):
self.__allow_custom = kwargs.get('allow_custom', False)
self._properties['objects'].allow_custom = kwargs.get('allow_custom', False)
super(ObservedData, self).__init__(*args, **kwargs)

View File

@ -49,7 +49,6 @@ from . import (AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, # n
WindowsProcessExt, WindowsRegistryKey, WindowsRegistryValueType,
WindowsServiceExt, X509Certificate, X509V3ExtenstionsType)
from .datastore.filters import FilterSet
from . import ObservableProperty
# Use an implicit MemoryStore
_environ = Environment(store=MemoryStore())
@ -111,9 +110,9 @@ def _related_wrapper(self, *args, **kwargs):
return _environ.related_to(self, *args, **kwargs)
def _observed_data_init(self, allow_custom=False, *args, **kwargs):
if allow_custom:
self._properties['objects'] = ObservableProperty(True)
def _observed_data_init(self, *args, **kwargs):
self.__allow_custom = kwargs.get('allow_custom', False)
self._properties['objects'].allow_custom = kwargs.get('allow_custom', False)
super(self.__class__, self).__init__(*args, **kwargs)