diff --git a/stix2/core.py b/stix2/core.py index 0b222a9..0c8b7b0 100644 --- a/stix2/core.py +++ b/stix2/core.py @@ -14,9 +14,9 @@ from .utils import _get_dict, get_class_hierarchy_names class STIXObjectProperty(Property): - def __init__(self, allow_custom=False): + def __init__(self, allow_custom=False, *args, **kwargs): self.allow_custom = allow_custom - super(STIXObjectProperty, self).__init__() + super(STIXObjectProperty, self).__init__(*args, **kwargs) def clean(self, value): # Any STIX Object (SDO, SRO, or Marking Definition) can be added to diff --git a/stix2/test/test_custom.py b/stix2/test/test_custom.py index 0e004a8..6bf7fb2 100644 --- a/stix2/test/test_custom.py +++ b/stix2/test/test_custom.py @@ -88,6 +88,24 @@ def test_parse_identity_custom_property(data): assert identity.foo == "bar" +def test_custom_property_in_observed_data(): + artifact = stix2.File( + allow_custom=True, + name='test', + x_foo='bar' + ) + observed_data = stix2.ObservedData( + allow_custom=True, + first_observed="2015-12-21T19:00:00Z", + last_observed="2015-12-21T19:00:00Z", + number_observed=0, + objects={"0": artifact}, + ) + + assert observed_data.objects['0'].x_foo == "bar" + assert '"x_foo": "bar"' in str(observed_data) + + def test_custom_property_in_bundled_object(): bundle = stix2.Bundle(IDENTITY_CUSTOM_PROP, allow_custom=True) diff --git a/stix2/test/test_workbench.py b/stix2/test/test_workbench.py index 7857eb2..501d11c 100644 --- a/stix2/test/test_workbench.py +++ b/stix2/test/test_workbench.py @@ -1,7 +1,7 @@ import os import stix2 -from stix2.workbench import (AttackPattern, Campaign, CourseOfAction, +from stix2.workbench import (AttackPattern, Bundle, Campaign, CourseOfAction, ExternalReference, FileSystemSource, Filter, Identity, Indicator, IntrusionSet, Malware, MarkingDefinition, ObservedData, Relationship, @@ -149,6 +149,12 @@ def test_workbench_get_all_vulnerabilities(): assert resp[0].id == VULNERABILITY_ID +def test_workbench_add_to_bundle(): + vuln = Vulnerability(**VULNERABILITY_KWARGS) + bundle = Bundle(vuln) + assert bundle.objects[0].name == 'Heartbleed' + + def test_workbench_relationships(): rel = Relationship(INDICATOR_ID, 'indicates', MALWARE_ID) save(rel) diff --git a/stix2/v20/__init__.py b/stix2/v20/__init__.py index 888e1ca..9d7efcc 100644 --- a/stix2/v20/__init__.py +++ b/stix2/v20/__init__.py @@ -10,9 +10,10 @@ from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, Directory, DomainName, EmailAddress, EmailMessage, EmailMIMEComponent, ExtensionsProperty, File, HTTPRequestExt, ICMPExt, IPv4Address, IPv6Address, - MACAddress, Mutex, NetworkTraffic, NTFSExt, PDFExt, - Process, RasterImageExt, SocketExt, Software, TCPExt, - UNIXAccountExt, UserAccount, WindowsPEBinaryExt, + MACAddress, Mutex, NetworkTraffic, NTFSExt, + ObservableProperty, PDFExt, Process, RasterImageExt, + SocketExt, Software, TCPExt, UNIXAccountExt, + UserAccount, WindowsPEBinaryExt, WindowsPEOptionalHeaderType, WindowsPESection, WindowsProcessExt, WindowsRegistryKey, WindowsRegistryValueType, WindowsServiceExt, diff --git a/stix2/v20/observables.py b/stix2/v20/observables.py index 6c65e8e..7bce53e 100644 --- a/stix2/v20/observables.py +++ b/stix2/v20/observables.py @@ -24,6 +24,10 @@ class ObservableProperty(Property): """Property for holding Cyber Observable Objects. """ + def __init__(self, allow_custom=False, *args, **kwargs): + self.allow_custom = allow_custom + super(ObservableProperty, self).__init__(*args, **kwargs) + def clean(self, value): try: dictified = _get_dict(value) @@ -39,7 +43,10 @@ class ObservableProperty(Property): valid_refs = dict((k, v['type']) for (k, v) in dictified.items()) for key, obj in dictified.items(): - parsed_obj = parse_observable(obj, valid_refs) + if self.allow_custom: + parsed_obj = parse_observable(obj, valid_refs, allow_custom=True) + else: + parsed_obj = parse_observable(obj, valid_refs) dictified[key] = parsed_obj return dictified diff --git a/stix2/v20/sdo.py b/stix2/v20/sdo.py index d7e9954..9ca1599 100644 --- a/stix2/v20/sdo.py +++ b/stix2/v20/sdo.py @@ -225,6 +225,12 @@ class ObservedData(STIXDomainObject): ('granular_markings', ListProperty(GranularMarking)), ]) + def __init__(self, allow_custom=False, *args, **kwargs): + if allow_custom: + self._properties['objects'] = ObservableProperty(True) + + super(ObservedData, self).__init__(*args, **kwargs) + class Report(STIXDomainObject): """For more detailed information on this object's properties, see diff --git a/stix2/workbench.py b/stix2/workbench.py index 3697c63..c559f40 100644 --- a/stix2/workbench.py +++ b/stix2/workbench.py @@ -49,6 +49,7 @@ from . import (AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, # n WindowsProcessExt, WindowsRegistryKey, WindowsRegistryValueType, WindowsServiceExt, X509Certificate, X509V3ExtenstionsType) from .datastore.filters import FilterSet +from . import ObservableProperty # Use an implicit MemoryStore _environ = Environment(store=MemoryStore()) @@ -110,14 +111,26 @@ def _related_wrapper(self, *args, **kwargs): return _environ.related_to(self, *args, **kwargs) +def _observed_data_init(self, allow_custom=False, *args, **kwargs): + if allow_custom: + self._properties['objects'] = ObservableProperty(True) + super(self.__class__, self).__init__(*args, **kwargs) + + def _constructor_wrapper(obj_type): # Use an intermediate wrapper class so the implicit environment will create objects that have our wrapper functions - wrapped_type = type(obj_type.__name__, obj_type.__bases__, dict( + class_dict = dict( created_by=_created_by_wrapper, relationships=_relationships_wrapper, related=_related_wrapper, **obj_type.__dict__ - )) + ) + + # Avoid TypeError about super() in ObservedData + if 'ObservedData' in obj_type.__name__: + class_dict['__init__'] = _observed_data_init + + wrapped_type = type(obj_type.__name__, obj_type.__bases__, class_dict) @staticmethod def new_constructor(cls, *args, **kwargs):