Pass allow_custom when adding to ObservedData
parent
3f80c07342
commit
06e5a33639
|
@ -14,9 +14,9 @@ from .utils import _get_dict, get_class_hierarchy_names
|
|||
|
||||
class STIXObjectProperty(Property):
|
||||
|
||||
def __init__(self, allow_custom=False):
|
||||
def __init__(self, allow_custom=False, *args, **kwargs):
|
||||
self.allow_custom = allow_custom
|
||||
super(STIXObjectProperty, self).__init__()
|
||||
super(STIXObjectProperty, self).__init__(*args, **kwargs)
|
||||
|
||||
def clean(self, value):
|
||||
# Any STIX Object (SDO, SRO, or Marking Definition) can be added to
|
||||
|
|
|
@ -88,6 +88,24 @@ def test_parse_identity_custom_property(data):
|
|||
assert identity.foo == "bar"
|
||||
|
||||
|
||||
def test_custom_property_in_observed_data():
|
||||
artifact = stix2.File(
|
||||
allow_custom=True,
|
||||
name='test',
|
||||
x_foo='bar'
|
||||
)
|
||||
observed_data = stix2.ObservedData(
|
||||
allow_custom=True,
|
||||
first_observed="2015-12-21T19:00:00Z",
|
||||
last_observed="2015-12-21T19:00:00Z",
|
||||
number_observed=0,
|
||||
objects={"0": artifact},
|
||||
)
|
||||
|
||||
assert observed_data.objects['0'].x_foo == "bar"
|
||||
assert '"x_foo": "bar"' in str(observed_data)
|
||||
|
||||
|
||||
def test_custom_property_in_bundled_object():
|
||||
bundle = stix2.Bundle(IDENTITY_CUSTOM_PROP, allow_custom=True)
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import os
|
||||
|
||||
import stix2
|
||||
from stix2.workbench import (AttackPattern, Campaign, CourseOfAction,
|
||||
from stix2.workbench import (AttackPattern, Bundle, Campaign, CourseOfAction,
|
||||
ExternalReference, FileSystemSource, Filter,
|
||||
Identity, Indicator, IntrusionSet, Malware,
|
||||
MarkingDefinition, ObservedData, Relationship,
|
||||
|
@ -149,6 +149,12 @@ def test_workbench_get_all_vulnerabilities():
|
|||
assert resp[0].id == VULNERABILITY_ID
|
||||
|
||||
|
||||
def test_workbench_add_to_bundle():
|
||||
vuln = Vulnerability(**VULNERABILITY_KWARGS)
|
||||
bundle = Bundle(vuln)
|
||||
assert bundle.objects[0].name == 'Heartbleed'
|
||||
|
||||
|
||||
def test_workbench_relationships():
|
||||
rel = Relationship(INDICATOR_ID, 'indicates', MALWARE_ID)
|
||||
save(rel)
|
||||
|
|
|
@ -10,9 +10,10 @@ from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact,
|
|||
Directory, DomainName, EmailAddress, EmailMessage,
|
||||
EmailMIMEComponent, ExtensionsProperty, File,
|
||||
HTTPRequestExt, ICMPExt, IPv4Address, IPv6Address,
|
||||
MACAddress, Mutex, NetworkTraffic, NTFSExt, PDFExt,
|
||||
Process, RasterImageExt, SocketExt, Software, TCPExt,
|
||||
UNIXAccountExt, UserAccount, WindowsPEBinaryExt,
|
||||
MACAddress, Mutex, NetworkTraffic, NTFSExt,
|
||||
ObservableProperty, PDFExt, Process, RasterImageExt,
|
||||
SocketExt, Software, TCPExt, UNIXAccountExt,
|
||||
UserAccount, WindowsPEBinaryExt,
|
||||
WindowsPEOptionalHeaderType, WindowsPESection,
|
||||
WindowsProcessExt, WindowsRegistryKey,
|
||||
WindowsRegistryValueType, WindowsServiceExt,
|
||||
|
|
|
@ -24,6 +24,10 @@ class ObservableProperty(Property):
|
|||
"""Property for holding Cyber Observable Objects.
|
||||
"""
|
||||
|
||||
def __init__(self, allow_custom=False, *args, **kwargs):
|
||||
self.allow_custom = allow_custom
|
||||
super(ObservableProperty, self).__init__(*args, **kwargs)
|
||||
|
||||
def clean(self, value):
|
||||
try:
|
||||
dictified = _get_dict(value)
|
||||
|
@ -39,6 +43,9 @@ class ObservableProperty(Property):
|
|||
valid_refs = dict((k, v['type']) for (k, v) in dictified.items())
|
||||
|
||||
for key, obj in dictified.items():
|
||||
if self.allow_custom:
|
||||
parsed_obj = parse_observable(obj, valid_refs, allow_custom=True)
|
||||
else:
|
||||
parsed_obj = parse_observable(obj, valid_refs)
|
||||
dictified[key] = parsed_obj
|
||||
|
||||
|
|
|
@ -225,6 +225,12 @@ class ObservedData(STIXDomainObject):
|
|||
('granular_markings', ListProperty(GranularMarking)),
|
||||
])
|
||||
|
||||
def __init__(self, allow_custom=False, *args, **kwargs):
|
||||
if allow_custom:
|
||||
self._properties['objects'] = ObservableProperty(True)
|
||||
|
||||
super(ObservedData, self).__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class Report(STIXDomainObject):
|
||||
"""For more detailed information on this object's properties, see
|
||||
|
|
|
@ -49,6 +49,7 @@ from . import (AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, # n
|
|||
WindowsProcessExt, WindowsRegistryKey, WindowsRegistryValueType,
|
||||
WindowsServiceExt, X509Certificate, X509V3ExtenstionsType)
|
||||
from .datastore.filters import FilterSet
|
||||
from . import ObservableProperty
|
||||
|
||||
# Use an implicit MemoryStore
|
||||
_environ = Environment(store=MemoryStore())
|
||||
|
@ -110,14 +111,26 @@ def _related_wrapper(self, *args, **kwargs):
|
|||
return _environ.related_to(self, *args, **kwargs)
|
||||
|
||||
|
||||
def _observed_data_init(self, allow_custom=False, *args, **kwargs):
|
||||
if allow_custom:
|
||||
self._properties['objects'] = ObservableProperty(True)
|
||||
super(self.__class__, self).__init__(*args, **kwargs)
|
||||
|
||||
|
||||
def _constructor_wrapper(obj_type):
|
||||
# Use an intermediate wrapper class so the implicit environment will create objects that have our wrapper functions
|
||||
wrapped_type = type(obj_type.__name__, obj_type.__bases__, dict(
|
||||
class_dict = dict(
|
||||
created_by=_created_by_wrapper,
|
||||
relationships=_relationships_wrapper,
|
||||
related=_related_wrapper,
|
||||
**obj_type.__dict__
|
||||
))
|
||||
)
|
||||
|
||||
# Avoid TypeError about super() in ObservedData
|
||||
if 'ObservedData' in obj_type.__name__:
|
||||
class_dict['__init__'] = _observed_data_init
|
||||
|
||||
wrapped_type = type(obj_type.__name__, obj_type.__bases__, class_dict)
|
||||
|
||||
@staticmethod
|
||||
def new_constructor(cls, *args, **kwargs):
|
||||
|
|
Loading…
Reference in New Issue