diff --git a/stix2/core.py b/stix2/core.py index 0d1fee5..c9282d2 100644 --- a/stix2/core.py +++ b/stix2/core.py @@ -8,7 +8,7 @@ import re import stix2 from .base import _STIXBase -from .exceptions import ParseError +from .exceptions import DuplicateObjectRegistrationError, ParseError from .markings import _MarkingsMixin from .utils import _get_dict @@ -217,6 +217,8 @@ def _register_object(new_type, version=None): v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') OBJ_MAP = STIX2_OBJ_MAPS[v]['objects'] + if new_type._type in OBJ_MAP.keys(): + raise DuplicateObjectRegistrationError("An object with type '%s' already exists and cannot be created again." % new_type._type) OBJ_MAP[new_type._type] = new_type @@ -255,6 +257,8 @@ def _register_observable(new_observable, version=None): v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables'] + if new_observable._type in OBJ_MAP_OBSERVABLE.keys(): + raise DuplicateObjectRegistrationError("An observable with type '%s' already exists and cannot be created again." % new_observable._type) OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable diff --git a/stix2/test/v20/test_core.py b/stix2/test/v20/test_core.py index d2efa22..2808ba5 100644 --- a/stix2/test/v20/test_core.py +++ b/stix2/test/v20/test_core.py @@ -68,17 +68,6 @@ def test_parse_observable_with_no_version(): assert v in str(obs_obj.__class__) -def test_register_object_with_version(): - bundle = core.dict_to_stix2(BUNDLE, version='2.0') - core._register_object(bundle.objects[0].__class__, version='2.0') - v = 'v20' - - assert bundle.objects[0].type in core.STIX2_OBJ_MAPS[v]['objects'] - # spec_version is not in STIX 2.0, and is required in 2.1, so this - # suffices as a test for a STIX 2.0 object. - assert "spec_version" not in bundle.objects[0] - - def test_register_marking_with_version(): core._register_marking(stix2.v20.TLP_WHITE.__class__, version='2.0') v = 'v20' @@ -97,44 +86,6 @@ def test_register_marking_with_no_version(): assert v in str(stix2.v20.TLP_WHITE.__class__) -def test_register_observable_with_version(): - observed_data = stix2.v20.ObservedData( - id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", - created_by_ref=IDENTITY_ID, - created="2016-04-06T19:58:16.000Z", - modified="2016-04-06T19:58:16.000Z", - first_observed="2015-12-21T19:00:00Z", - last_observed="2015-12-21T19:00:00Z", - number_observed=50, - objects={ - "0": { - "name": "foo.exe", - "type": "file", - "extensions": { - "ntfs-ext": { - "alternate_data_streams": [ - { - "name": "second.stream", - "size": 25536, - }, - ], - }, - }, - }, - "1": { - "type": "directory", - "path": "/usr/home", - "contains_refs": ["0"], - }, - }, - ) - core._register_observable(observed_data.objects['0'].__class__, version='2.0') - v = 'v20' - - assert observed_data.objects['0'].type in core.STIX2_OBJ_MAPS[v]['observables'] - assert v in str(observed_data.objects['0'].__class__) - - def test_register_observable_extension_with_version(): observed_data = stix2.v20.ObservedData( id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", diff --git a/stix2/test/v20/test_custom.py b/stix2/test/v20/test_custom.py index ce1aac3..8bc6ddd 100644 --- a/stix2/test/v20/test_custom.py +++ b/stix2/test/v20/test_custom.py @@ -1,6 +1,7 @@ import pytest import stix2 +from stix2 import core import stix2.v20 from ...exceptions import InvalidValueError @@ -449,7 +450,7 @@ def test_custom_observable_raises_exception(): def test_custom_observable_object_no_init_1(): @stix2.v20.CustomObservable( - 'x-new-observable', [ + 'x-new-observable-1', [ ('property1', stix2.properties.StringProperty()), ], ) @@ -1014,3 +1015,69 @@ def test_custom_object_nested_dictionary(data): ) assert data == str(example) + + +@stix2.v20.CustomObject( + 'x-new-type-2', [ + ('property1', stix2.properties.StringProperty()), + ('property2', stix2.properties.IntegerProperty()), + ], +) +class NewType2(object): + pass + + +def test_register_custom_object_with_version(): + custom_obj_1 = { + "type": "x-new-type-2", + "id": "x-new-type-2--00000000-0000-4000-8000-000000000007", + } + + cust_obj_1 = core.dict_to_stix2(custom_obj_1, version='2.0') + v = 'v20' + + assert cust_obj_1.type in core.STIX2_OBJ_MAPS[v]['objects'] + # spec_version is not in STIX 2.0, and is required in 2.1, so this + # suffices as a test for a STIX 2.0 object. + assert "spec_version" not in cust_obj_1 + + +def test_register_duplicate_object_with_version(): + with pytest.raises(stix2.exceptions.DuplicateObjectRegistrationError) as excinfo: + @stix2.v20.CustomObject( + 'x-new-type-2', [ + ('property1', stix2.properties.StringProperty()), + ('property2', stix2.properties.IntegerProperty()), + ], + ) + class NewType2(object): + pass + assert "An object with type 'x-new-type-2' already exists and cannot be created again." in str(excinfo.value) + + +@stix2.v20.CustomObservable( + 'x-new-observable-2', [ + ('property1', stix2.properties.StringProperty()), + ], +) +class NewObservable2(object): + pass + + +def test_register_observable_with_version(): + custom_obs = NewObservable2(property1="Test Observable") + v = 'v20' + + assert custom_obs.type in core.STIX2_OBJ_MAPS[v]['observables'] + + +def test_register_duplicate_observable_with_version(): + with pytest.raises(stix2.exceptions.DuplicateObjectRegistrationError) as excinfo: + @stix2.v20.CustomObservable( + 'x-new-observable-2', [ + ('property1', stix2.properties.StringProperty()), + ], + ) + class NewObservable2(object): + pass + assert "An observable with type 'x-new-observable-2' already exists and cannot be created again." in str(excinfo.value) diff --git a/stix2/test/v20/test_datastore_filesystem.py b/stix2/test/v20/test_datastore_filesystem.py index 317f927..b7e9aca 100644 --- a/stix2/test/v20/test_datastore_filesystem.py +++ b/stix2/test/v20/test_datastore_filesystem.py @@ -501,6 +501,7 @@ def test_filesystem_store_query_single_filter(fs_store): def test_filesystem_store_empty_query(fs_store): results = fs_store.query() # returns all + print (results) assert len(results) == 30 assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [obj.id for obj in results] assert "marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168" in [obj.id for obj in results] @@ -515,6 +516,7 @@ def test_filesystem_store_query_multiple_filters(fs_store): def test_filesystem_store_query_dont_include_type_folder(fs_store): results = fs_store.query(stix2.Filter("type", "!=", "tool")) + print (results) assert len(results) == 28 @@ -635,7 +637,7 @@ def test_filesystem_object_with_custom_property_in_bundle(fs_store): def test_filesystem_custom_object(fs_store): @stix2.v20.CustomObject( - 'x-new-obj', [ + 'x-new-obj-2', [ ('property1', stix2.properties.StringProperty(required=True)), ], ) @@ -650,7 +652,7 @@ def test_filesystem_custom_object(fs_store): assert newobj_r["property1"] == 'something' # remove dir - shutil.rmtree(os.path.join(FS_PATH, "x-new-obj"), True) + shutil.rmtree(os.path.join(FS_PATH, "x-new-obj-2"), True) def test_relationships(rel_fs_store): diff --git a/stix2/test/v20/test_datastore_memory.py b/stix2/test/v20/test_datastore_memory.py index 7852746..28d8e52 100644 --- a/stix2/test/v20/test_datastore_memory.py +++ b/stix2/test/v20/test_datastore_memory.py @@ -329,7 +329,7 @@ def test_memory_store_object_with_custom_property_in_bundle(mem_store): def test_memory_store_custom_object(mem_store): @CustomObject( - 'x-new-obj', [ + 'x-new-obj-3', [ ('property1', properties.StringProperty(required=True)), ], ) diff --git a/stix2/test/v20/test_properties.py b/stix2/test/v20/test_properties.py index f71d829..1d1474a 100644 --- a/stix2/test/v20/test_properties.py +++ b/stix2/test/v20/test_properties.py @@ -392,7 +392,7 @@ def test_dictionary_property_invalid(d): def test_property_list_of_dictionary(): @stix2.v20.CustomObject( - 'x-new-obj', [ + 'x-new-obj-4', [ ('property1', ListProperty(DictionaryProperty(spec_version="2.0"), required=True)), ], ) diff --git a/stix2/test/v21/test_core.py b/stix2/test/v21/test_core.py index 2018395..f9f9de6 100644 --- a/stix2/test/v21/test_core.py +++ b/stix2/test/v21/test_core.py @@ -73,15 +73,6 @@ def test_parse_observable_with_no_version(): assert v in str(obs_obj.__class__) -def test_register_object_with_version(): - bundle = core.dict_to_stix2(BUNDLE, version='2.1') - core._register_object(bundle.objects[0].__class__) - v = 'v21' - - assert bundle.objects[0].type in core.STIX2_OBJ_MAPS[v]['objects'] - assert bundle.objects[0].spec_version == "2.1" - - def test_register_marking_with_version(): core._register_marking(stix2.v21.TLP_WHITE.__class__, version='2.1') v = 'v21' @@ -100,44 +91,6 @@ def test_register_marking_with_no_version(): assert v in str(stix2.v21.TLP_WHITE.__class__) -def test_register_observable_with_default_version(): - observed_data = stix2.v21.ObservedData( - id=OBSERVED_DATA_ID, - created_by_ref=IDENTITY_ID, - created="2016-04-06T19:58:16.000Z", - modified="2016-04-06T19:58:16.000Z", - first_observed="2015-12-21T19:00:00Z", - last_observed="2015-12-21T19:00:00Z", - number_observed=50, - objects={ - "0": { - "name": "foo.exe", - "type": "file", - "extensions": { - "ntfs-ext": { - "alternate_data_streams": [ - { - "name": "second.stream", - "size": 25536, - }, - ], - }, - }, - }, - "1": { - "type": "directory", - "path": "/usr/home", - "contains_refs": ["file--420bc087-8b53-5ae9-8210-20d27d5e96c8"], - }, - }, - ) - core._register_observable(observed_data.objects['0'].__class__) - v = 'v21' - - assert observed_data.objects['0'].type in core.STIX2_OBJ_MAPS[v]['observables'] - assert v in str(observed_data.objects['0'].__class__) - - def test_register_observable_extension_with_default_version(): observed_data = stix2.v21.ObservedData( id=OBSERVED_DATA_ID, diff --git a/stix2/test/v21/test_custom.py b/stix2/test/v21/test_custom.py index b46288d..9bd0afa 100644 --- a/stix2/test/v21/test_custom.py +++ b/stix2/test/v21/test_custom.py @@ -455,7 +455,7 @@ def test_custom_observable_raises_exception(): def test_custom_observable_object_no_init_1(): @stix2.v21.CustomObservable( - 'x-new-observable', [ + 'x-new-observable-2', [ ('property1', stix2.properties.StringProperty()), ], ) @@ -1068,3 +1068,68 @@ def test_custom_object_nested_dictionary(data): ) assert data == str(example) + + +@stix2.v21.CustomObject( + 'x-new-type-2', [ + ('property1', stix2.properties.StringProperty()), + ('property2', stix2.properties.IntegerProperty()), + ], +) +class NewType3(object): + pass + + +def test_register_custom_object_with_version(): + custom_obj_1 = { + "type": "x-new-type-2", + "id": "x-new-type-2--00000000-0000-4000-8000-000000000007", + "spec_version": "2.1", + } + + cust_obj_1 = stix2.core.dict_to_stix2(custom_obj_1, version='2.1') + v = 'v21' + + assert cust_obj_1.type in stix2.core.STIX2_OBJ_MAPS[v]['objects'] + assert cust_obj_1.spec_version == "2.1" + + +def test_register_duplicate_object_with_version(): + with pytest.raises(stix2.exceptions.DuplicateObjectRegistrationError) as excinfo: + @stix2.v21.CustomObject( + 'x-new-type-2', [ + ('property1', stix2.properties.StringProperty()), + ('property2', stix2.properties.IntegerProperty()), + ], + ) + class NewType2(object): + pass + assert "An object with type 'x-new-type-2' already exists and cannot be created again." in str(excinfo.value) + + +@stix2.v21.CustomObservable( + 'x-new-observable-3', [ + ('property1', stix2.properties.StringProperty()), + ], +) +class NewObservable3(object): + pass + + +def test_register_observable_with_version(): + custom_obs = NewObservable3(property1="Test Observable") + v = 'v21' + + assert custom_obs.type in stix2.core.STIX2_OBJ_MAPS[v]['observables'] + + +def test_register_duplicate_observable_with_version(): + with pytest.raises(stix2.exceptions.DuplicateObjectRegistrationError) as excinfo: + @stix2.v21.CustomObservable( + 'x-new-observable-2', [ + ('property1', stix2.properties.StringProperty()), + ], + ) + class NewObservable2(object): + pass + assert "An observable with type 'x-new-observable-2' already exists and cannot be created again." in str(excinfo.value) diff --git a/stix2/test/v21/test_datastore_filesystem.py b/stix2/test/v21/test_datastore_filesystem.py index 3eb8aaa..123fd7a 100644 --- a/stix2/test/v21/test_datastore_filesystem.py +++ b/stix2/test/v21/test_datastore_filesystem.py @@ -656,7 +656,7 @@ def test_filesystem_object_with_custom_property_in_bundle(fs_store): def test_filesystem_custom_object(fs_store): @stix2.v21.CustomObject( - 'x-new-obj', [ + 'x-new-obj-2', [ ('property1', stix2.properties.StringProperty(required=True)), ], ) @@ -671,7 +671,7 @@ def test_filesystem_custom_object(fs_store): assert newobj_r["property1"] == 'something' # remove dir - shutil.rmtree(os.path.join(FS_PATH, "x-new-obj"), True) + shutil.rmtree(os.path.join(FS_PATH, "x-new-obj-2"), True) def test_relationships(rel_fs_store): diff --git a/stix2/test/v21/test_datastore_memory.py b/stix2/test/v21/test_datastore_memory.py index e07943c..60f577e 100644 --- a/stix2/test/v21/test_datastore_memory.py +++ b/stix2/test/v21/test_datastore_memory.py @@ -344,7 +344,7 @@ def test_memory_store_object_with_custom_property_in_bundle(mem_store): def test_memory_store_custom_object(mem_store): @CustomObject( - 'x-new-obj', [ + 'x-new-obj-3', [ ('property1', properties.StringProperty(required=True)), ], ) diff --git a/stix2/test/v21/test_properties.py b/stix2/test/v21/test_properties.py index 50bce17..31dd941 100644 --- a/stix2/test/v21/test_properties.py +++ b/stix2/test/v21/test_properties.py @@ -404,7 +404,7 @@ def test_dictionary_property_invalid(d): def test_property_list_of_dictionary(): @stix2.v21.CustomObject( - 'x-new-obj', [ + 'x-new-obj-4', [ ('property1', ListProperty(DictionaryProperty(spec_version='2.1'), required=True)), ], )