diff --git a/stix2/__init__.py b/stix2/__init__.py index 5929986..725f257 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -28,7 +28,8 @@ # flake8: noqa -from .core import Bundle, _collect_stix2_obj_maps, _register_type, parse +from .core import _collect_stix2_obj_maps, _register_type, parse +from .v21 import * # This import will always be the latest STIX 2.X version from .datastore import CompositeDataSource from .datastore.filesystem import (FileSystemSink, FileSystemSource, FileSystemStore) @@ -59,7 +60,6 @@ from .patterns import (AndBooleanExpression, AndObservationExpression, StartStopQualifier, StringConstant, TimestampConstant, WithinQualifier) from .utils import new_version, revoke -from .v21 import * # This import will always be the latest STIX 2.X version from .version import __version__ _collect_stix2_obj_maps() diff --git a/stix2/core.py b/stix2/core.py index 7e928b0..9f2bd7c 100644 --- a/stix2/core.py +++ b/stix2/core.py @@ -1,72 +1,10 @@ -"""STIX 2.X Objects that are neither SDOs nor SROs.""" - -from collections import OrderedDict import importlib import pkgutil import stix2 from . import exceptions -from .base import _STIXBase -from .properties import IDProperty, ListProperty, Property, TypeProperty -from .utils import _get_dict, get_class_hierarchy_names - - -class STIXObjectProperty(Property): - - def __init__(self, allow_custom=False, *args, **kwargs): - self.allow_custom = allow_custom - super(STIXObjectProperty, self).__init__(*args, **kwargs) - - def clean(self, value): - # Any STIX Object (SDO, SRO, or Marking Definition) can be added to - # a bundle with no further checks. - if any(x in ('STIXDomainObject', 'STIXRelationshipObject', 'MarkingDefinition') - for x in get_class_hierarchy_names(value)): - return value - try: - dictified = _get_dict(value) - except ValueError: - raise ValueError("This property may only contain a dictionary or object") - if dictified == {}: - raise ValueError("This property may only contain a non-empty dictionary or object") - if 'type' in dictified and dictified['type'] == 'bundle': - raise ValueError('This property may not contain a Bundle object') - - if self.allow_custom: - parsed_obj = parse(dictified, allow_custom=True) - else: - parsed_obj = parse(dictified) - return parsed_obj - - -class Bundle(_STIXBase): - """For more detailed information on this object's properties, see - `the STIX 2.0 specification `__. - """ - - _type = 'bundle' - _properties = OrderedDict() - _properties.update([ - ('type', TypeProperty(_type)), - ('id', IDProperty(_type)), - ('spec_version', Property(fixed="2.1")), - ('objects', ListProperty(STIXObjectProperty)), - ]) - - def __init__(self, *args, **kwargs): - # Add any positional arguments to the 'objects' kwarg. - if args: - if isinstance(args[0], list): - kwargs['objects'] = args[0] + list(args[1:]) + kwargs.get('objects', []) - else: - kwargs['objects'] = list(args) + kwargs.get('objects', []) - - self.__allow_custom = kwargs.get('allow_custom', False) - self._properties['objects'].contained.allow_custom = kwargs.get('allow_custom', False) - - super(Bundle, self).__init__(**kwargs) - +from .utils import _get_dict STIX2_OBJ_MAPS = {} @@ -112,6 +50,9 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None): allow_custom (bool): Whether to allow custom properties as well unknown custom objects. Note that unknown custom objects cannot be parsed into STIX objects, and will be returned as is. Default: False. + version: If version can't be determined from stix_dict, use this + version of the STIX spec. If None, use the latest supported + version. Default: None Returns: An instantiated Python STIX object @@ -124,17 +65,24 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None): STIX objects that I dont know about ahead of time) """ - if not version: - # Use latest version - v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') - else: - v = 'v' + version.replace('.', '') - - OBJ_MAP = STIX2_OBJ_MAPS[v] - if 'type' not in stix_dict: raise exceptions.ParseError("Can't parse object with no 'type' property: %s" % str(stix_dict)) + if "spec_version" in stix_dict: + # For STIX 2.0, applies to bundles only. + # For STIX 2.1+, applies to SDOs, SROs, and markings only. + v = 'v' + stix_dict["spec_version"].replace('.', '') + elif stix_dict["type"] == "bundle": + # bundles without spec_version are ambiguous. + if version: + v = 'v' + version.replace('.', '') + else: + v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') + else: + v = 'v20' + + OBJ_MAP = STIX2_OBJ_MAPS[v] + try: obj_class = OBJ_MAP[stix_dict['type']] except KeyError: diff --git a/stix2/datastore/filesystem.py b/stix2/datastore/filesystem.py index c13b02c..1bf2a12 100644 --- a/stix2/datastore/filesystem.py +++ b/stix2/datastore/filesystem.py @@ -6,7 +6,8 @@ Python STIX 2.0 FileSystem Source/Sink import json import os -from stix2.core import Bundle, parse +from stix2.core import parse +from stix2 import Bundle from stix2.datastore import DataSink, DataSource, DataStoreMixin from stix2.datastore.filters import Filter, FilterSet, apply_common_filters from stix2.utils import deduplicate, get_class_hierarchy_names diff --git a/stix2/datastore/memory.py b/stix2/datastore/memory.py index c1d202d..490ac03 100644 --- a/stix2/datastore/memory.py +++ b/stix2/datastore/memory.py @@ -16,7 +16,8 @@ import json import os from stix2.base import _STIXBase -from stix2.core import Bundle, parse +from stix2.core import parse +from stix2 import Bundle from stix2.datastore import DataSink, DataSource, DataStoreMixin from stix2.datastore.filters import Filter, FilterSet, apply_common_filters @@ -286,7 +287,7 @@ class MemorySource(DataSource): if stix_data["type"] == "bundle": for stix_obj in stix_data["objects"]: - _add(self, stix_data=parse(stix_obj, allow_custom=self.allow_custom, version=stix_data["spec_version"])) + _add(self, stix_data=parse(stix_obj, allow_custom=self.allow_custom)) else: _add(self, stix_data=parse(stix_data, allow_custom=self.allow_custom, version=version)) load_from_file.__doc__ = MemoryStore.load_from_file.__doc__ diff --git a/stix2/datastore/taxii.py b/stix2/datastore/taxii.py index c815e12..f5a432f 100644 --- a/stix2/datastore/taxii.py +++ b/stix2/datastore/taxii.py @@ -4,7 +4,8 @@ Python STIX 2.x TAXIICollectionStore from requests.exceptions import HTTPError from stix2.base import _STIXBase -from stix2.core import Bundle, parse +from stix2.core import parse +from stix2 import Bundle from stix2.datastore import (DataSink, DataSource, DataSourceError, DataStoreMixin) from stix2.datastore.filters import Filter, FilterSet, apply_common_filters diff --git a/stix2/test/test_bundle.py b/stix2/test/test_bundle.py index 2d14654..55a3d98 100644 --- a/stix2/test/test_bundle.py +++ b/stix2/test/test_bundle.py @@ -3,11 +3,13 @@ import json import pytest import stix2 +import stix2.v20.sdo +import stix2.v21.bundle + EXPECTED_BUNDLE = """{ "type": "bundle", "id": "bundle--00000000-0000-0000-0000-000000000007", - "spec_version": "2.0", "objects": [ { "type": "indicator", @@ -22,13 +24,15 @@ EXPECTED_BUNDLE = """{ }, { "type": "malware", + "spec_version": "2.1", "id": "malware--00000000-0000-0000-0000-000000000003", "created": "2017-01-01T12:34:56.000Z", "modified": "2017-01-01T12:34:56.000Z", "name": "Cryptolocker", "labels": [ "ransomware" - ] + ], + "is_family": false }, { "type": "relationship", @@ -45,7 +49,6 @@ EXPECTED_BUNDLE = """{ EXPECTED_BUNDLE_DICT = { "type": "bundle", "id": "bundle--00000000-0000-0000-0000-000000000007", - "spec_version": "2.0", "objects": [ { "type": "indicator", @@ -60,13 +63,15 @@ EXPECTED_BUNDLE_DICT = { }, { "type": "malware", + "spec_version": "2.1", "id": "malware--00000000-0000-0000-0000-000000000003", "created": "2017-01-01T12:34:56.000Z", "modified": "2017-01-01T12:34:56.000Z", "name": "Cryptolocker", "labels": [ "ransomware" - ] + ], + "is_family": False }, { "type": "relationship", @@ -86,7 +91,6 @@ def test_empty_bundle(): assert bundle.type == "bundle" assert bundle.id.startswith("bundle--") - assert bundle.spec_version == "2.0" with pytest.raises(AttributeError): assert bundle.objects @@ -111,16 +115,6 @@ def test_bundle_id_must_start_with_bundle(): assert str(excinfo.value) == "Invalid value for Bundle 'id': must start with 'bundle--'." -def test_bundle_with_wrong_spec_version(): - with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo: - stix2.Bundle(spec_version="1.2") - - assert excinfo.value.cls == stix2.Bundle - assert excinfo.value.prop_name == "spec_version" - assert excinfo.value.reason == "must equal '2.0'." - assert str(excinfo.value) == "Invalid value for Bundle 'spec_version': must equal '2.0'." - - def test_create_bundle1(indicator, malware, relationship): bundle = stix2.Bundle(objects=[indicator, malware, relationship]) @@ -178,14 +172,14 @@ def test_create_bundle_invalid(indicator, malware, relationship): assert excinfo.value.reason == 'This property may not contain a Bundle object' -@pytest.mark.parametrize("version", ["2.0"]) +@pytest.mark.parametrize("version", ["2.1"]) def test_parse_bundle(version): bundle = stix2.parse(EXPECTED_BUNDLE, version=version) assert bundle.type == "bundle" assert bundle.id.startswith("bundle--") - assert bundle.spec_version == "2.0" - assert type(bundle.objects[0]) is stix2.Indicator + # TODO: update this to a STIX 2.1 indicator + assert type(bundle.objects[0]) is stix2.v20.sdo.Indicator assert bundle.objects[0].type == 'indicator' assert bundle.objects[1].type == 'malware' assert bundle.objects[2].type == 'relationship' @@ -208,7 +202,7 @@ def test_parse_unknown_type(): def test_stix_object_property(): - prop = stix2.core.STIXObjectProperty() + prop = stix2.v21.bundle.STIXObjectProperty() identity = stix2.Identity(name="test", identity_class="individual") assert prop.clean(identity) is identity diff --git a/stix2/test/test_custom.py b/stix2/test/test_custom.py index 8ad49d8..27a8c5b 100644 --- a/stix2/test/test_custom.py +++ b/stix2/test/test_custom.py @@ -1,6 +1,8 @@ import pytest import stix2 +import stix2.base +import stix2.v20.sdo from .constants import FAKE_TIME, MARKING_DEFINITION_ID @@ -93,7 +95,8 @@ def test_identity_custom_property_allowed(): def test_parse_identity_custom_property(data): with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo: identity = stix2.parse(data) - assert excinfo.value.cls == stix2.Identity + # TODO: update to create and check a STIX 2.1 Identity object + assert excinfo.value.cls == stix2.v20.sdo.Identity assert excinfo.value.properties == ['foo'] assert "Unexpected properties for" in str(excinfo.value) @@ -358,8 +361,8 @@ def test_parse_custom_object_type(): "property1": "something" }""" - nt = stix2.parse(nt_string) - assert nt.property1 == 'something' + nt = stix2.parse(nt_string, allow_custom=True) + assert nt["property1"] == 'something' def test_parse_unregistered_custom_object_type(): @@ -535,7 +538,7 @@ def test_parse_custom_observable_object(): }""" nt = stix2.parse_observable(nt_string, []) - assert isinstance(nt, stix2.core._STIXBase) + assert isinstance(nt, stix2.base._STIXBase) assert nt.property1 == 'something' @@ -553,7 +556,7 @@ def test_parse_unregistered_custom_observable_object(): assert parsed_custom['property1'] == 'something' with pytest.raises(AttributeError) as excinfo: assert parsed_custom.property1 == 'something' - assert not isinstance(parsed_custom, stix2.core._STIXBase) + assert not isinstance(parsed_custom, stix2.base._STIXBase) def test_parse_unregistered_custom_observable_object_with_no_type(): @@ -844,7 +847,7 @@ def test_parse_observable_with_unregistered_custom_extension(): parsed_ob = stix2.parse_observable(input_str, allow_custom=True) assert parsed_ob['extensions']['x-foobar-ext']['property1'] == 'foo' - assert not isinstance(parsed_ob['extensions']['x-foobar-ext'], stix2.core._STIXBase) + assert not isinstance(parsed_ob['extensions']['x-foobar-ext'], stix2.base._STIXBase) def test_register_custom_object(): diff --git a/stix2/test/test_datastore_filesystem.py b/stix2/test/test_datastore_filesystem.py index 104014c..49cbcc1 100644 --- a/stix2/test/test_datastore_filesystem.py +++ b/stix2/test/test_datastore_filesystem.py @@ -220,7 +220,6 @@ def test_filesystem_sink_add_stix_bundle_dict(fs_sink, fs_source): bund = { "type": "bundle", "id": "bundle--112211b6-1112-4fb0-111b-b111107ca70a", - "spec_version": "2.1", "objects": [ { "name": "Atilla", @@ -264,7 +263,7 @@ def test_filesystem_sink_add_json_stix_object(fs_sink, fs_source): def test_filesystem_sink_json_stix_bundle(fs_sink, fs_source): # add json-encoded stix bundle bund2 = '{"type": "bundle", "id": "bundle--332211b6-1132-4fb0-111b-b111107ca70a",' \ - ' "spec_version": "2.1", "objects": [{"type": "campaign", "id": "campaign--155155b6-1112-4fb0-111b-b111107ca70a",' \ + ' "objects": [{"type": "campaign", "id": "campaign--155155b6-1112-4fb0-111b-b111107ca70a",' \ ' "created":"2017-05-31T21:31:53.197755Z", "name": "Spartacus", "objective": "Oppressive regimes of Africa and Middle East"}]}' fs_sink.add(bund2) @@ -348,8 +347,8 @@ def test_filesystem_store_query_single_filter(fs_store): def test_filesystem_store_empty_query(fs_store): results = fs_store.query() # returns all assert len(results) == 26 - assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [obj.id for obj in results] - assert "marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168" in [obj.id for obj in results] + assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [obj["id"] for obj in results] + assert "marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168" in [obj["id"] for obj in results] def test_filesystem_store_query_multiple_filters(fs_store): @@ -450,8 +449,8 @@ def test_filesystem_custom_object(fs_store): fs_store.add(newobj) newobj_r = fs_store.get(newobj.id) - assert newobj_r.id == newobj.id - assert newobj_r.property1 == 'something' + assert newobj_r["id"] == newobj["id"] + assert newobj_r["property1"] == 'something' # remove dir shutil.rmtree(os.path.join(FS_PATH, "x-new-obj"), True) diff --git a/stix2/test/test_datastore_filters.py b/stix2/test/test_datastore_filters.py index 252b8eb..8ed82f3 100644 --- a/stix2/test/test_datastore_filters.py +++ b/stix2/test/test_datastore_filters.py @@ -9,6 +9,7 @@ stix_objs = [ "created": "2017-01-27T13:49:53.997Z", "description": "\n\nTITLE:\n\tPoison Ivy", "id": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111", + "spec_version": "2.1", "labels": [ "remote-access-trojan" ], diff --git a/stix2/test/test_datastore_taxii.py b/stix2/test/test_datastore_taxii.py index e432d37..8cc5033 100644 --- a/stix2/test/test_datastore_taxii.py +++ b/stix2/test/test_datastore_taxii.py @@ -224,7 +224,6 @@ def test_add_dict_bundle_object(collection): ta = { "type": "bundle", "id": "bundle--860ccc8d-56c9-4fda-9384-84276fb52fb1", - "spec_version": "2.1", "objects": [ { "type": "threat-actor", diff --git a/stix2/test/test_environment.py b/stix2/test/test_environment.py index 6c55d9e..a5166b7 100644 --- a/stix2/test/test_environment.py +++ b/stix2/test/test_environment.py @@ -190,6 +190,7 @@ def test_parse_malware(): env = stix2.Environment() data = """{ "type": "malware", + "spec_version": "2.1", "id": "malware--fedcba98-7654-3210-fedc-ba9876543210", "created": "2017-01-01T12:34:56.000Z", "modified": "2017-01-01T12:34:56.000Z", diff --git a/stix2/test/test_location.py b/stix2/test/test_location.py index 71d7877..5a05753 100644 --- a/stix2/test/test_location.py +++ b/stix2/test/test_location.py @@ -10,6 +10,7 @@ from .constants import LOCATION_ID EXPECTED_LOCATION_1 = """{ "type": "location", + "spec_version": "2.1", "id": "location--a6e9345f-5a15-4c29-8bb3-7dcc5d168d64", "created": "2016-04-06T20:03:00.000Z", "modified": "2016-04-06T20:03:00.000Z", @@ -19,6 +20,7 @@ EXPECTED_LOCATION_1 = """{ EXPECTED_LOCATION_1_REPR = "Location(" + " ".join(""" type='location', + spec_version='2.1', id='location--a6e9345f-5a15-4c29-8bb3-7dcc5d168d64', created='2016-04-06T20:03:00.000Z', modified='2016-04-06T20:03:00.000Z', @@ -27,6 +29,7 @@ EXPECTED_LOCATION_1_REPR = "Location(" + " ".join(""" EXPECTED_LOCATION_2 = """{ "type": "location", + "spec_version": "2.1", "id": "location--a6e9345f-5a15-4c29-8bb3-7dcc5d168d64", "created": "2016-04-06T20:03:00.000Z", "modified": "2016-04-06T20:03:00.000Z", @@ -36,6 +39,7 @@ EXPECTED_LOCATION_2 = """{ EXPECTED_LOCATION_2_REPR = "Location(" + " ".join(""" type='location', + spec_version='2.1', id='location--a6e9345f-5a15-4c29-8bb3-7dcc5d168d64', created='2016-04-06T20:03:00.000Z', modified='2016-04-06T20:03:00.000Z', @@ -63,6 +67,7 @@ def test_location_with_some_required_properties(): EXPECTED_LOCATION_2, { "type": "location", + "spec_version": "2.1", "id": "location--a6e9345f-5a15-4c29-8bb3-7dcc5d168d64", "created": "2016-04-06T20:03:00.000Z", "modified": "2016-04-06T20:03:00.000Z", diff --git a/stix2/test/test_malware.py b/stix2/test/test_malware.py index ca74ffe..cf14c19 100644 --- a/stix2/test/test_malware.py +++ b/stix2/test/test_malware.py @@ -110,6 +110,7 @@ def test_invalid_kwarg_to_malware(): EXPECTED_MALWARE, { "type": "malware", + "spec_version": "2.1", "id": "malware--fedcba98-7654-3210-fedc-ba9876543210", "created": "2016-05-12T08:17:27.000Z", "modified": "2016-05-12T08:17:27.000Z", diff --git a/stix2/test/test_opinion.py b/stix2/test/test_opinion.py index 7de415a..3156ea7 100644 --- a/stix2/test/test_opinion.py +++ b/stix2/test/test_opinion.py @@ -16,6 +16,7 @@ DESCRIPTION = ('This doesn\'t seem like it is feasible. We\'ve seen how ' EXPECTED_OPINION = """{ "type": "opinion", + "spec_version": "2.1", "id": "opinion--b01efc25-77b4-4003-b18b-f6e24b5cd9f7", "created": "2016-05-12T08:17:27.000Z", "modified": "2016-05-12T08:17:27.000Z", @@ -28,6 +29,7 @@ EXPECTED_OPINION = """{ EXPECTED_OPINION_REPR = "Opinion(" + " ".join((""" type='opinion', + spec_version='2.1', id='opinion--b01efc25-77b4-4003-b18b-f6e24b5cd9f7', created='2016-05-12T08:17:27.000Z', modified='2016-05-12T08:17:27.000Z', @@ -58,6 +60,7 @@ def test_opinion_with_required_properties(): EXPECTED_OPINION, { "type": "opinion", + "spec_version": "2.1", "id": "opinion--b01efc25-77b4-4003-b18b-f6e24b5cd9f7", "created": "2016-05-12T08:17:27.000Z", "modified": "2016-05-12T08:17:27.000Z", diff --git a/stix2/test/test_workbench.py b/stix2/test/test_workbench.py index d436261..5fdc2e9 100644 --- a/stix2/test/test_workbench.py +++ b/stix2/test/test_workbench.py @@ -1,7 +1,7 @@ import os import stix2 -from stix2.workbench import (AttackPattern, Bundle, Campaign, CourseOfAction, +from stix2.workbench import (AttackPattern, Campaign, CourseOfAction, ExternalReference, FileSystemSource, Filter, Identity, Indicator, IntrusionSet, Malware, MarkingDefinition, ObservedData, Relationship, @@ -14,6 +14,7 @@ from stix2.workbench import (AttackPattern, Bundle, Campaign, CourseOfAction, set_default_creator, set_default_external_refs, set_default_object_marking_refs, threat_actors, tools, vulnerabilities) +from stix2 import Bundle from .constants import (ATTACK_PATTERN_ID, ATTACK_PATTERN_KWARGS, CAMPAIGN_ID, CAMPAIGN_KWARGS, COURSE_OF_ACTION_ID, @@ -190,7 +191,8 @@ def test_workbench_related(): def test_workbench_related_with_filters(): - malware = Malware(labels=["ransomware"], name="CryptorBit", created_by_ref=IDENTITY_ID) + malware = Malware(labels=["ransomware"], name="CryptorBit", created_by_ref=IDENTITY_ID, + is_family=False) rel = Relationship(malware.id, 'variant-of', MALWARE_ID) save([malware, rel]) diff --git a/stix2/v20/__init__.py b/stix2/v20/__init__.py index 9d7efcc..e86269f 100644 --- a/stix2/v20/__init__.py +++ b/stix2/v20/__init__.py @@ -1,7 +1,7 @@ # flake8: noqa -from ..core import Bundle +from .bundle import Bundle from .common import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, CustomMarking, ExternalReference, GranularMarking, KillChainPhase, MarkingDefinition, StatementMarking, TLPMarking) diff --git a/stix2/v20/bundle.py b/stix2/v20/bundle.py new file mode 100644 index 0000000..420b305 --- /dev/null +++ b/stix2/v20/bundle.py @@ -0,0 +1,77 @@ +from collections import OrderedDict + +from stix2 import parse +from stix2.base import _STIXBase +from stix2.properties import TypeProperty, IDProperty, StringProperty, \ + ListProperty, Property +from stix2.utils import get_class_hierarchy_names, _get_dict + + +class STIXObjectProperty(Property): + + def __init__(self, allow_custom=False, *args, **kwargs): + self.allow_custom = allow_custom + super(STIXObjectProperty, self).__init__(*args, **kwargs) + + def clean(self, value): + # Any STIX Object (SDO, SRO, or Marking Definition) can be added to + # a bundle with no further checks. + if any(x in ('STIXDomainObject', 'STIXRelationshipObject', 'MarkingDefinition') + for x in get_class_hierarchy_names(value)): + # A simple "is this a spec version 2.1+ object" test. For now, + # limit 2.0 bundles to 2.0 objects. It's not possible yet to + # have validation co-constraints among properties, e.g. have + # validation here depend on the value of another property + # (spec_version). So this is a hack, and not technically spec- + # compliant. + if "spec_version" in value: + raise ValueError("Spec version 2.0 bundles don't yet support " + "containing objects of a different spec " + "version.") + return value + try: + dictified = _get_dict(value) + except ValueError: + raise ValueError("This property may only contain a dictionary or object") + if dictified == {}: + raise ValueError("This property may only contain a non-empty dictionary or object") + if 'type' in dictified and dictified['type'] == 'bundle': + raise ValueError('This property may not contain a Bundle object') + if "spec_version" in dictified: + # See above comment regarding spec_version. + raise ValueError("Spec version 2.0 bundles don't yet support " + "containing objects of a different spec version.") + + parsed_obj = parse(dictified, allow_custom=self.allow_custom) + + return parsed_obj + + +class Bundle(_STIXBase): + """For more detailed information on this object's properties, see + `the STIX 2.0 specification `__. + """ + + _type = 'bundle' + _properties = OrderedDict() + _properties.update([ + ('type', TypeProperty(_type)), + ('id', IDProperty(_type)), + # Not technically correct: STIX 2.0 spec doesn't say spec_version must + # have this value, but it's all we support for now. + ('spec_version', StringProperty(fixed="2.0")), + ('objects', ListProperty(STIXObjectProperty)), + ]) + + def __init__(self, *args, **kwargs): + # Add any positional arguments to the 'objects' kwarg. + if args: + if isinstance(args[0], list): + kwargs['objects'] = args[0] + list(args[1:]) + kwargs.get('objects', []) + else: + kwargs['objects'] = list(args) + kwargs.get('objects', []) + + self.__allow_custom = kwargs.get('allow_custom', False) + self._properties['objects'].contained.allow_custom = kwargs.get('allow_custom', False) + + super(Bundle, self).__init__(**kwargs) diff --git a/stix2/v21/__init__.py b/stix2/v21/__init__.py index f0d51d9..7fdd6fb 100644 --- a/stix2/v21/__init__.py +++ b/stix2/v21/__init__.py @@ -1,7 +1,7 @@ # flake8: noqa -from ..core import Bundle +from .bundle import Bundle from .common import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, CustomMarking, ExternalReference, GranularMarking, KillChainPhase, LanguageContent, MarkingDefinition, StatementMarking, diff --git a/stix2/v21/bundle.py b/stix2/v21/bundle.py new file mode 100644 index 0000000..b86c1ac --- /dev/null +++ b/stix2/v21/bundle.py @@ -0,0 +1,60 @@ +from collections import OrderedDict + +from stix2 import parse +from stix2.base import _STIXBase +from stix2.properties import TypeProperty, IDProperty, ListProperty, Property +from stix2.utils import get_class_hierarchy_names, _get_dict + + +class STIXObjectProperty(Property): + + def __init__(self, allow_custom=False, *args, **kwargs): + self.allow_custom = allow_custom + super(STIXObjectProperty, self).__init__(*args, **kwargs) + + def clean(self, value): + # Any STIX Object (SDO, SRO, or Marking Definition) can be added to + # a bundle with no further checks. + if any(x in ('STIXDomainObject', 'STIXRelationshipObject', 'MarkingDefinition') + for x in get_class_hierarchy_names(value)): + return value + try: + dictified = _get_dict(value) + except ValueError: + raise ValueError("This property may only contain a dictionary or object") + if dictified == {}: + raise ValueError("This property may only contain a non-empty dictionary or object") + if 'type' in dictified and dictified['type'] == 'bundle': + raise ValueError('This property may not contain a Bundle object') + + parsed_obj = parse(dictified, allow_custom=self.allow_custom) + + return parsed_obj + + +class Bundle(_STIXBase): + """For more detailed information on this object's properties, see + TODO: Update this to a STIX 2.1 link. + `the STIX 2.0 specification `__. + """ + + _type = 'bundle' + _properties = OrderedDict() + _properties.update([ + ('type', TypeProperty(_type)), + ('id', IDProperty(_type)), + ('objects', ListProperty(STIXObjectProperty)), + ]) + + def __init__(self, *args, **kwargs): + # Add any positional arguments to the 'objects' kwarg. + if args: + if isinstance(args[0], list): + kwargs['objects'] = args[0] + list(args[1:]) + kwargs.get('objects', []) + else: + kwargs['objects'] = list(args) + kwargs.get('objects', []) + + self.__allow_custom = kwargs.get('allow_custom', False) + self._properties['objects'].contained.allow_custom = kwargs.get('allow_custom', False) + + super(Bundle, self).__init__(**kwargs) diff --git a/stix2/v21/sdo.py b/stix2/v21/sdo.py index b859560..1207a22 100644 --- a/stix2/v21/sdo.py +++ b/stix2/v21/sdo.py @@ -206,6 +206,7 @@ class Location(STIXDomainObject): _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), + ('spec_version', StringProperty(fixed='2.1')), ('id', IDProperty(_type)), ('created_by_ref', ReferenceProperty(type="identity")), ('created', TimestampProperty(default=lambda: NOW, precision='millisecond')), @@ -368,6 +369,7 @@ class Opinion(STIXDomainObject): _properties = OrderedDict() _properties.update([ ('type', TypeProperty(_type)), + ('spec_version', StringProperty(fixed='2.1')), ('id', IDProperty(_type)), ('created_by_ref', ReferenceProperty(type="identity")), ('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),