Add duplicate checking to markings and observable extensions, and fix some tests and add some tests. Fixes #363

master
Desai, Kartikey H 2020-03-27 14:58:18 -04:00
parent a7e9a7dde5
commit c911cff97f
6 changed files with 129 additions and 115 deletions

View File

@ -8,7 +8,7 @@ import re
import stix2 import stix2
from .base import _Observable, _STIXBase from .base import _Observable, _STIXBase
from .exceptions import DuplicateObjectRegistrationError, ParseError from .exceptions import DuplicateRegistrationError, ParseError
from .markings import _MarkingsMixin from .markings import _MarkingsMixin
from .utils import SCO21_EXT_REGEX, TYPE_REGEX, _get_dict from .utils import SCO21_EXT_REGEX, TYPE_REGEX, _get_dict
@ -218,7 +218,7 @@ def _register_object(new_type, version=None):
OBJ_MAP = STIX2_OBJ_MAPS[v]['objects'] OBJ_MAP = STIX2_OBJ_MAPS[v]['objects']
if new_type._type in OBJ_MAP.keys(): if new_type._type in OBJ_MAP.keys():
raise DuplicateObjectRegistrationError("An object with type '%s' already exists and cannot be created again." % new_type._type) raise DuplicateRegistrationError("STIX Object", new_type._type)
OBJ_MAP[new_type._type] = new_type OBJ_MAP[new_type._type] = new_type
@ -238,6 +238,8 @@ def _register_marking(new_marking, version=None):
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
OBJ_MAP_MARKING = STIX2_OBJ_MAPS[v]['markings'] OBJ_MAP_MARKING = STIX2_OBJ_MAPS[v]['markings']
if new_marking._type in OBJ_MAP_MARKING.keys():
raise DuplicateRegistrationError("STIX Marking", new_marking._type)
OBJ_MAP_MARKING[new_marking._type] = new_marking OBJ_MAP_MARKING[new_marking._type] = new_marking
@ -258,7 +260,7 @@ def _register_observable(new_observable, version=None):
OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables'] OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables']
if new_observable._type in OBJ_MAP_OBSERVABLE.keys(): if new_observable._type in OBJ_MAP_OBSERVABLE.keys():
raise DuplicateObjectRegistrationError("An observable with type '%s' already exists and cannot be created again." % new_observable._type) raise DuplicateRegistrationError("Cyber Observable", new_observable._type)
OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable
@ -323,6 +325,11 @@ def _register_observable_extension(
EXT_MAP = STIX2_OBJ_MAPS[v]['observable-extensions'] EXT_MAP = STIX2_OBJ_MAPS[v]['observable-extensions']
try: try:
try:
if ext_type in EXT_MAP[observable_type].keys():
raise DuplicateRegistrationError("Observable Extension", ext_type)
except AttributeError:
pass
EXT_MAP[observable_type][ext_type] = new_extension EXT_MAP[observable_type][ext_type] = new_extension
except KeyError: except KeyError:
if observable_type not in OBJ_MAP_OBSERVABLE: if observable_type not in OBJ_MAP_OBSERVABLE:

View File

@ -235,8 +235,14 @@ class STIXDeprecationWarning(DeprecationWarning):
pass pass
class DuplicateObjectRegistrationError(STIXError): class DuplicateRegistrationError(STIXError):
"""An object (or observable) with the same type as an existing object (or observable) is being registered""" """A STIX object with the same type as an existing object is being registered"""
def __init__(self, msg): def __init__(self, obj_type, reg_obj_type):
super(DuplicateObjectRegistrationError, self).__init__(msg) super(DuplicateRegistrationError, self).__init__()
self.obj_type = obj_type
self.reg_obj_type = reg_obj_type
def __str__(self):
msg = "A(n) {0} with type '{1}' already exists and cannot be registered again"
return msg.format(self.obj_type, self.reg_obj_type)

View File

@ -3,8 +3,6 @@ import pytest
import stix2 import stix2
from stix2 import core, exceptions from stix2 import core, exceptions
from .constants import IDENTITY_ID
BUNDLE = { BUNDLE = {
"type": "bundle", "type": "bundle",
"spec_version": "2.0", "spec_version": "2.0",
@ -74,54 +72,3 @@ def test_register_marking_with_version():
assert stix2.v20.TLP_WHITE.definition._type in core.STIX2_OBJ_MAPS[v]['markings'] assert stix2.v20.TLP_WHITE.definition._type in core.STIX2_OBJ_MAPS[v]['markings']
assert v in str(stix2.v20.TLP_WHITE.__class__) assert v in str(stix2.v20.TLP_WHITE.__class__)
@pytest.mark.xfail(reason="The default version is no longer 2.0", condition=stix2.DEFAULT_VERSION != "2.0")
def test_register_marking_with_no_version():
# Uses default version (2.0 in this case)
core._register_marking(stix2.v20.TLP_WHITE.__class__)
v = 'v20'
assert stix2.v20.TLP_WHITE.definition._type in core.STIX2_OBJ_MAPS[v]['markings']
assert v in str(stix2.v20.TLP_WHITE.__class__)
def test_register_observable_extension_with_version():
observed_data = stix2.v20.ObservedData(
id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
created_by_ref=IDENTITY_ID,
created="2016-04-06T19:58:16.000Z",
modified="2016-04-06T19:58:16.000Z",
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=50,
objects={
"0": {
"name": "foo.exe",
"type": "file",
"extensions": {
"ntfs-ext": {
"alternate_data_streams": [
{
"name": "second.stream",
"size": 25536,
},
],
},
},
},
"1": {
"type": "directory",
"path": "/usr/home",
"contains_refs": ["0"],
},
},
)
core._register_observable_extension(observed_data.objects['0'], observed_data.objects['0'].extensions['ntfs-ext'].__class__, version='2.0')
v = 'v20'
assert observed_data.objects['0'].type in core.STIX2_OBJ_MAPS[v]['observables']
assert v in str(observed_data.objects['0'].__class__)
assert observed_data.objects['0'].extensions['ntfs-ext']._type in core.STIX2_OBJ_MAPS[v]['observable-extensions']['file']
assert v in str(observed_data.objects['0'].extensions['ntfs-ext'].__class__)

View File

@ -4,7 +4,7 @@ import stix2
from stix2 import core from stix2 import core
import stix2.v20 import stix2.v20
from ...exceptions import DuplicateObjectRegistrationError, InvalidValueError from ...exceptions import DuplicateRegistrationError, InvalidValueError
from .constants import FAKE_TIME, IDENTITY_ID, MARKING_DEFINITION_ID from .constants import FAKE_TIME, IDENTITY_ID, MARKING_DEFINITION_ID
IDENTITY_CUSTOM_PROP = stix2.v20.Identity( IDENTITY_CUSTOM_PROP = stix2.v20.Identity(
@ -1040,7 +1040,7 @@ def test_register_custom_object_with_version():
def test_register_duplicate_object_with_version(): def test_register_duplicate_object_with_version():
with pytest.raises(DuplicateObjectRegistrationError) as excinfo: with pytest.raises(DuplicateRegistrationError) as excinfo:
@stix2.v20.CustomObject( @stix2.v20.CustomObject(
'x-new-type-2', [ 'x-new-type-2', [
('property1', stix2.properties.StringProperty()), ('property1', stix2.properties.StringProperty()),
@ -1049,7 +1049,7 @@ def test_register_duplicate_object_with_version():
) )
class NewType2(object): class NewType2(object):
pass pass
assert "An object with type 'x-new-type-2' already exists and cannot be created again." in str(excinfo.value) assert "cannot be registered again" in str(excinfo.value)
@stix2.v20.CustomObservable( @stix2.v20.CustomObservable(
@ -1069,7 +1069,7 @@ def test_register_observable_with_version():
def test_register_duplicate_observable_with_version(): def test_register_duplicate_observable_with_version():
with pytest.raises(DuplicateObjectRegistrationError) as excinfo: with pytest.raises(DuplicateRegistrationError) as excinfo:
@stix2.v20.CustomObservable( @stix2.v20.CustomObservable(
'x-new-observable-2', [ 'x-new-observable-2', [
('property1', stix2.properties.StringProperty()), ('property1', stix2.properties.StringProperty()),
@ -1077,4 +1077,59 @@ def test_register_duplicate_observable_with_version():
) )
class NewObservable2(object): class NewObservable2(object):
pass pass
assert "An observable with type 'x-new-observable-2' already exists and cannot be created again." in str(excinfo.value) assert "cannot be registered again" in str(excinfo.value)
@pytest.mark.xfail(reason="The default version is no longer 2.0", condition=stix2.DEFAULT_VERSION != "2.0")
def test_register_marking_with_no_version():
@stix2.v20.CustomMarking(
'x-new-obj-2', [
('property1', stix2.properties.StringProperty(required=True)),
],
)
class NewObj2():
pass
v = 'v20'
no = NewObj2(property1='something')
assert no._type in core.STIX2_OBJ_MAPS[v]['markings']
def test_register_observable_extension_with_version():
@stix2.v20.CustomExtension(
stix2.v20.UserAccount, 'some-extension-2', [
('keys', stix2.properties.StringProperty(required=True)),
],
)
class SomeCustomExtension2:
pass
v = 'v20'
example = SomeCustomExtension2(keys='test123')
assert example._type in core.STIX2_OBJ_MAPS[v]['observable-extensions']['user-account']
def test_register_duplicate_observable_extension():
with pytest.raises(DuplicateRegistrationError) as excinfo:
@stix2.v20.CustomExtension(
stix2.v20.UserAccount, 'some-extension-2', [
('property1', stix2.properties.StringProperty(required=True)),
('property2', stix2.properties.IntegerProperty()),
],
)
class NewExtension2():
pass
assert "cannot be registered again" in str(excinfo.value)
def test_register_duplicate_marking():
with pytest.raises(DuplicateRegistrationError) as excinfo:
@stix2.v20.CustomMarking(
'x-new-obj-2', [
('property1', stix2.properties.StringProperty(required=True)),
],
)
class NewObj2():
pass
assert "cannot be registered again" in str(excinfo.value)

View File

@ -3,8 +3,6 @@ import pytest
import stix2 import stix2
from stix2 import core, exceptions from stix2 import core, exceptions
from .constants import IDENTITY_ID, OBSERVED_DATA_ID
BUNDLE = { BUNDLE = {
"type": "bundle", "type": "bundle",
"id": "bundle--00000000-0000-4000-8000-000000000007", "id": "bundle--00000000-0000-4000-8000-000000000007",
@ -89,44 +87,3 @@ def test_register_marking_with_no_version():
assert stix2.v21.TLP_WHITE.definition._type in core.STIX2_OBJ_MAPS[v]['markings'] assert stix2.v21.TLP_WHITE.definition._type in core.STIX2_OBJ_MAPS[v]['markings']
assert v in str(stix2.v21.TLP_WHITE.__class__) assert v in str(stix2.v21.TLP_WHITE.__class__)
def test_register_observable_extension_with_default_version():
observed_data = stix2.v21.ObservedData(
id=OBSERVED_DATA_ID,
created_by_ref=IDENTITY_ID,
created="2016-04-06T19:58:16.000Z",
modified="2016-04-06T19:58:16.000Z",
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=50,
objects={
"0": {
"name": "foo.exe",
"type": "file",
"extensions": {
"ntfs-ext": {
"alternate_data_streams": [
{
"name": "second.stream",
"size": 25536,
},
],
},
},
},
"1": {
"type": "directory",
"path": "/usr/home",
"contains_refs": ["file--420bc087-8b53-5ae9-8210-20d27d5e96c8"],
},
},
)
core._register_observable_extension(observed_data.objects['0'], observed_data.objects['0'].extensions['ntfs-ext'].__class__)
v = 'v21'
assert observed_data.objects['0'].type in core.STIX2_OBJ_MAPS[v]['observables']
assert v in str(observed_data.objects['0'].__class__)
assert observed_data.objects['0'].extensions['ntfs-ext']._type in core.STIX2_OBJ_MAPS[v]['observable-extensions']['file']
assert v in str(observed_data.objects['0'].extensions['ntfs-ext'].__class__)

View File

@ -6,7 +6,7 @@ import stix2
import stix2.base import stix2.base
import stix2.v21 import stix2.v21
from ...exceptions import DuplicateObjectRegistrationError, InvalidValueError from ...exceptions import DuplicateRegistrationError, InvalidValueError
from .constants import FAKE_TIME, IDENTITY_ID, MARKING_DEFINITION_ID from .constants import FAKE_TIME, IDENTITY_ID, MARKING_DEFINITION_ID
IDENTITY_CUSTOM_PROP = stix2.v21.Identity( IDENTITY_CUSTOM_PROP = stix2.v21.Identity(
@ -1092,7 +1092,7 @@ def test_register_custom_object_with_version():
def test_register_duplicate_object_with_version(): def test_register_duplicate_object_with_version():
with pytest.raises(DuplicateObjectRegistrationError) as excinfo: with pytest.raises(DuplicateRegistrationError) as excinfo:
@stix2.v21.CustomObject( @stix2.v21.CustomObject(
'x-new-type-2', [ 'x-new-type-2', [
('property1', stix2.properties.StringProperty()), ('property1', stix2.properties.StringProperty()),
@ -1101,7 +1101,7 @@ def test_register_duplicate_object_with_version():
) )
class NewType2(object): class NewType2(object):
pass pass
assert "An object with type 'x-new-type-2' already exists and cannot be created again." in str(excinfo.value) assert "cannot be registered again" in str(excinfo.value)
@stix2.v21.CustomObservable( @stix2.v21.CustomObservable(
@ -1113,15 +1113,15 @@ class NewObservable3(object):
pass pass
def test_register_observable_with_version(): def test_register_observable():
custom_obs = NewObservable3(property1="Test Observable") custom_obs = NewObservable3(property1="Test Observable")
v = 'v21' v = 'v21'
assert custom_obs.type in stix2.core.STIX2_OBJ_MAPS[v]['observables'] assert custom_obs.type in stix2.core.STIX2_OBJ_MAPS[v]['observables']
def test_register_duplicate_observable_with_version(): def test_register_duplicate_observable():
with pytest.raises(DuplicateObjectRegistrationError) as excinfo: with pytest.raises(DuplicateRegistrationError) as excinfo:
@stix2.v21.CustomObservable( @stix2.v21.CustomObservable(
'x-new-observable-2', [ 'x-new-observable-2', [
('property1', stix2.properties.StringProperty()), ('property1', stix2.properties.StringProperty()),
@ -1129,4 +1129,46 @@ def test_register_duplicate_observable_with_version():
) )
class NewObservable2(object): class NewObservable2(object):
pass pass
assert "An observable with type 'x-new-observable-2' already exists and cannot be created again." in str(excinfo.value) assert "cannot be registered again" in str(excinfo.value)
def test_register_observable_custom_extension():
@stix2.v21.CustomExtension(
stix2.v21.DomainName, 'x-new-2-ext', [
('property1', stix2.properties.StringProperty(required=True)),
('property2', stix2.properties.IntegerProperty()),
],
)
class NewExtension2():
pass
example = NewExtension2(property1="Hi there")
v = 'v21'
assert 'domain-name' in stix2.core.STIX2_OBJ_MAPS[v]['observables']
assert example._type in stix2.core.STIX2_OBJ_MAPS[v]['observable-extensions']['domain-name']
def test_register_duplicate_observable_extension():
with pytest.raises(DuplicateRegistrationError) as excinfo:
@stix2.v21.CustomExtension(
stix2.v21.DomainName, 'x-new-2-ext', [
('property1', stix2.properties.StringProperty(required=True)),
('property2', stix2.properties.IntegerProperty()),
],
)
class NewExtension2():
pass
assert "cannot be registered again" in str(excinfo.value)
def test_register_duplicate_marking():
with pytest.raises(DuplicateRegistrationError) as excinfo:
@stix2.v21.CustomMarking(
'x-new-obj', [
('property1', stix2.properties.StringProperty(required=True)),
],
)
class NewObj2():
pass
assert "cannot be registered again" in str(excinfo.value)