Merge pull request #178 from oasis-open/176-allow_custom

Allow Custom when adding objects to ObservedData or extensions to a Cyber Observable
stix2.0
Greg Back 2018-05-16 11:33:08 -05:00 committed by GitHub
commit b536af8e48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 178 additions and 18 deletions

View File

@ -264,10 +264,11 @@ class _Observable(_STIXBase):
def __init__(self, **kwargs):
# the constructor might be called independently of an observed data object
if '_valid_refs' in kwargs:
self._STIXBase__valid_refs = kwargs.pop('_valid_refs')
else:
self._STIXBase__valid_refs = []
self._STIXBase__valid_refs = kwargs.pop('_valid_refs', [])
self.__allow_custom = kwargs.get('allow_custom', False)
self._properties['extensions'].allow_custom = kwargs.get('allow_custom', False)
super(_Observable, self).__init__(**kwargs)
def _check_ref(self, ref, prop, prop_name):

View File

@ -14,9 +14,9 @@ from .utils import _get_dict, get_class_hierarchy_names
class STIXObjectProperty(Property):
def __init__(self, allow_custom=False):
def __init__(self, allow_custom=False, *args, **kwargs):
self.allow_custom = allow_custom
super(STIXObjectProperty, self).__init__()
super(STIXObjectProperty, self).__init__(*args, **kwargs)
def clean(self, value):
# Any STIX Object (SDO, SRO, or Marking Definition) can be added to
@ -62,9 +62,8 @@ class Bundle(_STIXBase):
else:
kwargs['objects'] = list(args) + kwargs.get('objects', [])
allow_custom = kwargs.get('allow_custom', False)
if allow_custom:
self._properties['objects'] = ListProperty(STIXObjectProperty(True))
self.__allow_custom = kwargs.get('allow_custom', False)
self._properties['objects'].allow_custom = kwargs.get('allow_custom', False)
super(Bundle, self).__init__(**kwargs)

View File

@ -95,6 +95,81 @@ def test_custom_property_in_bundled_object():
assert '"x_foo": "bar"' in str(bundle)
def test_custom_property_in_observed_data():
artifact = stix2.File(
allow_custom=True,
name='test',
x_foo='bar'
)
observed_data = stix2.ObservedData(
allow_custom=True,
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=0,
objects={"0": artifact},
)
assert observed_data.objects['0'].x_foo == "bar"
assert '"x_foo": "bar"' in str(observed_data)
def test_custom_property_object_in_observable_extension():
ntfs = stix2.NTFSExt(
allow_custom=True,
sid=1,
x_foo='bar',
)
artifact = stix2.File(
name='test',
extensions={'ntfs-ext': ntfs},
)
observed_data = stix2.ObservedData(
allow_custom=True,
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=0,
objects={"0": artifact},
)
assert observed_data.objects['0'].extensions['ntfs-ext'].x_foo == "bar"
assert '"x_foo": "bar"' in str(observed_data)
def test_custom_property_dict_in_observable_extension():
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
artifact = stix2.File(
name='test',
extensions={
'ntfs-ext': {
'sid': 1,
'x_foo': 'bar',
}
},
)
artifact = stix2.File(
allow_custom=True,
name='test',
extensions={
'ntfs-ext': {
'allow_custom': True,
'sid': 1,
'x_foo': 'bar',
}
},
)
observed_data = stix2.ObservedData(
allow_custom=True,
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=0,
objects={"0": artifact},
)
assert observed_data.objects['0'].extensions['ntfs-ext'].x_foo == "bar"
assert '"x_foo": "bar"' in str(observed_data)
def test_identity_custom_property_revoke():
identity = IDENTITY_CUSTOM_PROP.revoke()
assert identity.x_foo == "bar"
@ -542,6 +617,7 @@ def test_custom_extension():
def test_custom_extension_wrong_observable_type():
# NewExtension is an extension of DomainName, not File
ext = NewExtension(property1='something')
with pytest.raises(ValueError) as excinfo:
stix2.File(name="abc.txt",

View File

@ -1,7 +1,7 @@
import os
import stix2
from stix2.workbench import (AttackPattern, Campaign, CourseOfAction,
from stix2.workbench import (AttackPattern, Bundle, Campaign, CourseOfAction,
ExternalReference, FileSystemSource, Filter,
Identity, Indicator, IntrusionSet, Malware,
MarkingDefinition, ObservedData, Relationship,
@ -149,6 +149,12 @@ def test_workbench_get_all_vulnerabilities():
assert resp[0].id == VULNERABILITY_ID
def test_workbench_add_to_bundle():
vuln = Vulnerability(**VULNERABILITY_KWARGS)
bundle = Bundle(vuln)
assert bundle.objects[0].name == 'Heartbleed'
def test_workbench_relationships():
rel = Relationship(INDICATOR_ID, 'indicates', MALWARE_ID)
save(rel)
@ -260,3 +266,49 @@ def test_default_object_marking_refs():
campaign = Campaign(**CAMPAIGN_KWARGS)
assert campaign.object_marking_refs[0] == mark_def.id
def test_workbench_custom_property_object_in_observable_extension():
ntfs = stix2.NTFSExt(
allow_custom=True,
sid=1,
x_foo='bar',
)
artifact = stix2.File(
name='test',
extensions={'ntfs-ext': ntfs},
)
observed_data = ObservedData(
allow_custom=True,
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=0,
objects={"0": artifact},
)
assert observed_data.objects['0'].extensions['ntfs-ext'].x_foo == "bar"
assert '"x_foo": "bar"' in str(observed_data)
def test_workbench_custom_property_dict_in_observable_extension():
artifact = stix2.File(
allow_custom=True,
name='test',
extensions={
'ntfs-ext': {
'allow_custom': True,
'sid': 1,
'x_foo': 'bar',
}
},
)
observed_data = ObservedData(
allow_custom=True,
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=0,
objects={"0": artifact},
)
assert observed_data.objects['0'].extensions['ntfs-ext'].x_foo == "bar"
assert '"x_foo": "bar"' in str(observed_data)

View File

@ -10,9 +10,10 @@ from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact,
Directory, DomainName, EmailAddress, EmailMessage,
EmailMIMEComponent, ExtensionsProperty, File,
HTTPRequestExt, ICMPExt, IPv4Address, IPv6Address,
MACAddress, Mutex, NetworkTraffic, NTFSExt, PDFExt,
Process, RasterImageExt, SocketExt, Software, TCPExt,
UNIXAccountExt, UserAccount, WindowsPEBinaryExt,
MACAddress, Mutex, NetworkTraffic, NTFSExt,
ObservableProperty, PDFExt, Process, RasterImageExt,
SocketExt, Software, TCPExt, UNIXAccountExt,
UserAccount, WindowsPEBinaryExt,
WindowsPEOptionalHeaderType, WindowsPESection,
WindowsProcessExt, WindowsRegistryKey,
WindowsRegistryValueType, WindowsServiceExt,

View File

@ -24,6 +24,10 @@ class ObservableProperty(Property):
"""Property for holding Cyber Observable Objects.
"""
def __init__(self, allow_custom=False, *args, **kwargs):
self.allow_custom = allow_custom
super(ObservableProperty, self).__init__(*args, **kwargs)
def clean(self, value):
try:
dictified = _get_dict(value)
@ -39,7 +43,10 @@ class ObservableProperty(Property):
valid_refs = dict((k, v['type']) for (k, v) in dictified.items())
for key, obj in dictified.items():
parsed_obj = parse_observable(obj, valid_refs)
if self.allow_custom:
parsed_obj = parse_observable(obj, valid_refs, allow_custom=True)
else:
parsed_obj = parse_observable(obj, valid_refs)
dictified[key] = parsed_obj
return dictified
@ -49,7 +56,8 @@ class ExtensionsProperty(DictionaryProperty):
"""Property for representing extensions on Observable objects.
"""
def __init__(self, enclosing_type=None, required=False):
def __init__(self, allow_custom=False, enclosing_type=None, required=False):
self.allow_custom = allow_custom
self.enclosing_type = enclosing_type
super(ExtensionsProperty, self).__init__(required)
@ -71,8 +79,13 @@ class ExtensionsProperty(DictionaryProperty):
if key in specific_type_map:
cls = specific_type_map[key]
if type(subvalue) is dict:
dictified[key] = cls(**subvalue)
if self.allow_custom:
subvalue['allow_custom'] = True
dictified[key] = cls(**subvalue)
else:
dictified[key] = cls(**subvalue)
elif type(subvalue) is cls:
# If already an instance of an _Extension class, assume it's valid
dictified[key] = subvalue
else:
raise ValueError("Cannot determine extension type.")

View File

@ -225,6 +225,12 @@ class ObservedData(STIXDomainObject):
('granular_markings', ListProperty(GranularMarking)),
])
def __init__(self, *args, **kwargs):
self.__allow_custom = kwargs.get('allow_custom', False)
self._properties['objects'].allow_custom = kwargs.get('allow_custom', False)
super(ObservedData, self).__init__(*args, **kwargs)
class Report(STIXDomainObject):
"""For more detailed information on this object's properties, see

View File

@ -110,14 +110,26 @@ def _related_wrapper(self, *args, **kwargs):
return _environ.related_to(self, *args, **kwargs)
def _observed_data_init(self, *args, **kwargs):
self.__allow_custom = kwargs.get('allow_custom', False)
self._properties['objects'].allow_custom = kwargs.get('allow_custom', False)
super(self.__class__, self).__init__(*args, **kwargs)
def _constructor_wrapper(obj_type):
# Use an intermediate wrapper class so the implicit environment will create objects that have our wrapper functions
wrapped_type = type(obj_type.__name__, obj_type.__bases__, dict(
class_dict = dict(
created_by=_created_by_wrapper,
relationships=_relationships_wrapper,
related=_related_wrapper,
**obj_type.__dict__
))
)
# Avoid TypeError about super() in ObservedData
if 'ObservedData' in obj_type.__name__:
class_dict['__init__'] = _observed_data_init
wrapped_type = type(obj_type.__name__, obj_type.__bases__, class_dict)
@staticmethod
def new_constructor(cls, *args, **kwargs):