Merge pull request #483 from chisholm/is_functions

add is_sdo() et al functions
pull/1/head
Chris Lenk 2021-01-28 22:59:46 -05:00 committed by GitHub
commit 5e2d888d1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 987 additions and 177 deletions

View File

@ -4,7 +4,7 @@ import copy
from . import registry
from .exceptions import ParseError
from .utils import _get_dict
from .utils import _get_dict, detect_spec_version
def parse(data, allow_custom=False, version=None):
@ -42,47 +42,6 @@ def parse(data, allow_custom=False, version=None):
return obj
def _detect_spec_version(stix_dict):
"""
Given a dict representing a STIX object, try to detect what spec version
it is likely to comply with.
:param stix_dict: A dict with some STIX content. Must at least have a
"type" property.
:return: A string in "vXX" format, where "XX" indicates the spec version,
e.g. "v20", "v21", etc.
"""
obj_type = stix_dict["type"]
if 'spec_version' in stix_dict:
# For STIX 2.0, applies to bundles only.
# For STIX 2.1+, applies to SCOs, SDOs, SROs, and markings only.
v = 'v' + stix_dict['spec_version'].replace('.', '')
elif "id" not in stix_dict:
# Only 2.0 SCOs don't have ID properties
v = "v20"
elif obj_type == 'bundle':
# Bundle without a spec_version property: must be 2.1. But to
# future-proof, use max version over all contained SCOs, with 2.1
# minimum.
v = max(
"v21",
max(
_detect_spec_version(obj) for obj in stix_dict["objects"]
),
)
elif obj_type in registry.STIX2_OBJ_MAPS["v21"]["observables"]:
# Non-bundle object with an ID and without spec_version. Could be a
# 2.1 SCO or 2.0 SDO/SRO/marking. Check for 2.1 SCO...
v = "v21"
else:
# Not a 2.1 SCO; must be a 2.0 object.
v = "v20"
return v
def dict_to_stix2(stix_dict, allow_custom=False, version=None):
"""convert dictionary to full python-stix2 object
@ -115,25 +74,19 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None):
if 'type' not in stix_dict:
raise ParseError("Can't parse object with no 'type' property: %s" % str(stix_dict))
if version:
# If the version argument was passed, override other approaches.
v = 'v' + version.replace('.', '')
else:
v = _detect_spec_version(stix_dict)
if not version:
version = detect_spec_version(stix_dict)
OBJ_MAP = dict(
registry.STIX2_OBJ_MAPS[v]['objects'],
**registry.STIX2_OBJ_MAPS[v]['observables']
)
obj_type = stix_dict["type"]
obj_class = registry.class_for_type(obj_type, version, "objects") \
or registry.class_for_type(obj_type, version, "observables")
try:
obj_class = OBJ_MAP[stix_dict['type']]
except KeyError:
if not obj_class:
if allow_custom:
# flag allows for unknown custom objects too, but will not
# be parsed into STIX object, returned as is
return stix_dict
raise ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % stix_dict['type'])
raise ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % obj_type)
return obj_class(allow_custom=allow_custom, **stix_dict)
@ -168,16 +121,12 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None):
obj['_valid_refs'] = _valid_refs or []
if version:
# If the version argument was passed, override other approaches.
v = 'v' + version.replace('.', '')
else:
v = _detect_spec_version(obj)
if not version:
version = detect_spec_version(obj)
try:
OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[v]['observables']
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
except KeyError:
obj_type = obj["type"]
obj_class = registry.class_for_type(obj_type, version, "observables")
if not obj_class:
if allow_custom:
# flag allows for unknown custom objects too, but will not
# be parsed into STIX observable object, just returned as is

View File

@ -503,14 +503,14 @@ class ReferenceProperty(Property):
possible_prefix = value[:value.index('--')]
if self.valid_types:
ref_valid_types = enumerate_types(self.valid_types, 'v' + self.spec_version.replace(".", ""))
ref_valid_types = enumerate_types(self.valid_types, self.spec_version)
if possible_prefix in ref_valid_types:
required_prefix = possible_prefix
else:
raise ValueError("The type-specifying prefix '%s' for this property is not valid" % (possible_prefix))
elif self.invalid_types:
ref_invalid_types = enumerate_types(self.invalid_types, 'v' + self.spec_version.replace(".", ""))
ref_invalid_types = enumerate_types(self.invalid_types, self.spec_version)
if possible_prefix not in ref_invalid_types:
required_prefix = possible_prefix
@ -655,9 +655,7 @@ class ExtensionsProperty(DictionaryProperty):
except ValueError:
raise ValueError("The extensions property must contain a dictionary")
v = 'v' + self.spec_version.replace('.', '')
specific_type_map = STIX2_OBJ_MAPS[v]['observable-extensions'].get(self.enclosing_type, {})
specific_type_map = STIX2_OBJ_MAPS[self.spec_version]['observable-extensions'].get(self.enclosing_type, {})
for key, subvalue in dictified.items():
if key in specific_type_map:
cls = specific_type_map[key]

View File

@ -31,18 +31,15 @@ def _register_object(new_type, version=DEFAULT_VERSION):
properties = new_type._properties
if not version:
version = DEFAULT_VERSION
if version == "2.1":
for prop_name, prop in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character" % prop_name)
if version:
v = 'v' + version.replace('.', '')
else:
# Use default version (latest) if no version was provided.
v = 'v' + DEFAULT_VERSION.replace('.', '')
OBJ_MAP = registry.STIX2_OBJ_MAPS[v]['objects']
OBJ_MAP = registry.STIX2_OBJ_MAPS[version]['objects']
if new_type._type in OBJ_MAP.keys():
raise DuplicateRegistrationError("STIX Object", new_type._type)
OBJ_MAP[new_type._type] = new_type
@ -61,6 +58,9 @@ def _register_marking(new_marking, version=DEFAULT_VERSION):
mark_type = new_marking._type
properties = new_marking._properties
if not version:
version = DEFAULT_VERSION
_validate_type(mark_type, version)
if version == "2.1":
@ -68,13 +68,7 @@ def _register_marking(new_marking, version=DEFAULT_VERSION):
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
if version:
v = 'v' + version.replace('.', '')
else:
# Use default version (latest) if no version was provided.
v = 'v' + DEFAULT_VERSION.replace('.', '')
OBJ_MAP_MARKING = registry.STIX2_OBJ_MAPS[v]['markings']
OBJ_MAP_MARKING = registry.STIX2_OBJ_MAPS[version]['markings']
if mark_type in OBJ_MAP_MARKING.keys():
raise DuplicateRegistrationError("STIX Marking", mark_type)
OBJ_MAP_MARKING[mark_type] = new_marking
@ -91,6 +85,9 @@ def _register_observable(new_observable, version=DEFAULT_VERSION):
"""
properties = new_observable._properties
if not version:
version = DEFAULT_VERSION
if version == "2.0":
# If using STIX2.0, check properties ending in "_ref/s" are ObjectReferenceProperties
for prop_name, prop in properties.items():
@ -130,13 +127,7 @@ def _register_observable(new_observable, version=DEFAULT_VERSION):
"is not a ListProperty containing ReferenceProperty." % prop_name,
)
if version:
v = 'v' + version.replace('.', '')
else:
# Use default version (latest) if no version was provided.
v = 'v' + DEFAULT_VERSION.replace('.', '')
OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[v]['observables']
OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[version]['observables']
if new_observable._type in OBJ_MAP_OBSERVABLE.keys():
raise DuplicateRegistrationError("Cyber Observable", new_observable._type)
OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable
@ -182,8 +173,6 @@ def _register_observable_extension(
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
v = 'v' + version.replace('.', '')
try:
observable_type = observable._type
except AttributeError:
@ -192,8 +181,8 @@ def _register_observable_extension(
"created with the @CustomObservable decorator.",
)
OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[v]['observables']
EXT_MAP = registry.STIX2_OBJ_MAPS[v]['observable-extensions']
OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[version]['observables']
EXT_MAP = registry.STIX2_OBJ_MAPS[version]['observable-extensions']
try:
if ext_type in EXT_MAP[observable_type].keys():

View File

@ -7,6 +7,20 @@ import re
STIX2_OBJ_MAPS = {}
def _stix_vid_to_version(stix_vid):
"""
Convert a python package name representing a STIX version in the form "vXX"
to the dotted style used in the public APIs of this library, "X.X".
:param stix_vid: A package name in the form "vXX"
:return: A STIX version in dotted style
"""
assert len(stix_vid) >= 3
stix_version = stix_vid[1] + "." + stix_vid[2:]
return stix_version
def _collect_stix2_mappings():
"""Navigate the package once and retrieve all object mapping dicts for each
v2X package. Includes OBJ_MAP, OBJ_MAP_OBSERVABLE, EXT_MAP."""
@ -16,13 +30,51 @@ def _collect_stix2_mappings():
prefix = str(top_level_module.__name__) + '.'
for module_loader, name, is_pkg in pkgutil.walk_packages(path=path, prefix=prefix):
ver = name.split('.')[1]
stix_vid = name.split('.')[1]
if re.match(r'^stix2\.v2[0-9]$', name) and is_pkg:
ver = _stix_vid_to_version(stix_vid)
mod = importlib.import_module(name, str(top_level_module.__name__))
STIX2_OBJ_MAPS[ver] = {}
STIX2_OBJ_MAPS[ver]['objects'] = mod.OBJ_MAP
STIX2_OBJ_MAPS[ver]['observables'] = mod.OBJ_MAP_OBSERVABLE
STIX2_OBJ_MAPS[ver]['observable-extensions'] = mod.EXT_MAP
elif re.match(r'^stix2\.v2[0-9]\.common$', name) and is_pkg is False:
ver = _stix_vid_to_version(stix_vid)
mod = importlib.import_module(name, str(top_level_module.__name__))
STIX2_OBJ_MAPS[ver]['markings'] = mod.OBJ_MAP_MARKING
def class_for_type(stix_type, stix_version, category=None):
"""
Get the registered class which implements a particular STIX type for a
particular STIX version.
:param stix_type: A STIX type as a string
:param stix_version: A STIX version as a string, e.g. "2.1"
:param category: An optional "category" value, which is just used directly
as a second key after the STIX version, and depends on how the types
are internally categorized. This would be useful if the same STIX type
is used to mean two different things within the same STIX version. So
it's unlikely to be necessary. Pass None to just search all the
categories and return the first class found.
:return: A registered python class which implements the given STIX type, or
None if one is not found.
"""
cls = None
cat_map = STIX2_OBJ_MAPS.get(stix_version)
if cat_map:
if category:
class_map = cat_map.get(category)
if class_map:
cls = class_map.get(stix_type)
else:
cls = cat_map["objects"].get(stix_type) \
or cat_map["observables"].get(stix_type) \
or cat_map["markings"].get(stix_type)
# Left "observable-extensions" out; it has a different
# substructure. A version->category->type lookup would result
# in another map, not a class. So it doesn't fit the pattern.
return cls

View File

@ -2,7 +2,7 @@ from __future__ import unicode_literals
import pytest
from stix2.parsing import _detect_spec_version
from stix2.utils import detect_spec_version
@pytest.mark.parametrize(
@ -17,7 +17,7 @@ from stix2.parsing import _detect_spec_version
"name": "alice",
"identity_class": "individual",
},
"v20",
"2.0",
),
(
{
@ -29,14 +29,14 @@ from stix2.parsing import _detect_spec_version
"target_ref": "identity--ba18dde2-56d3-4a34-aa0b-fc56f5be568f",
"relationship_type": "targets",
},
"v20",
"2.0",
),
(
{
"type": "file",
"name": "notes.txt",
},
"v20",
"2.0",
),
(
{
@ -48,7 +48,7 @@ from stix2.parsing import _detect_spec_version
"statement": "Copyright (c) ACME Corp.",
},
},
"v20",
"2.0",
),
(
{
@ -75,7 +75,7 @@ from stix2.parsing import _detect_spec_version
},
],
},
"v20",
"2.0",
),
# STIX 2.1 examples
(
@ -87,7 +87,7 @@ from stix2.parsing import _detect_spec_version
"modified": "2001-07-01T09:33:17.000Z",
"name": "alice",
},
"v21",
"2.1",
),
(
{
@ -100,7 +100,7 @@ from stix2.parsing import _detect_spec_version
"target_ref": "identity--ba18dde2-56d3-4a34-aa0b-fc56f5be568f",
"relationship_type": "targets",
},
"v21",
"2.1",
),
(
{
@ -109,7 +109,7 @@ from stix2.parsing import _detect_spec_version
"spec_version": "2.1",
"name": "notes.txt",
},
"v21",
"2.1",
),
(
{
@ -117,7 +117,7 @@ from stix2.parsing import _detect_spec_version
"id": "file--5eef3404-6a94-4db3-9a1a-5684cbea0dfe",
"name": "notes.txt",
},
"v21",
"2.1",
),
(
{
@ -131,7 +131,7 @@ from stix2.parsing import _detect_spec_version
"tlp": "green",
},
},
"v21",
"2.1",
),
(
{
@ -153,7 +153,7 @@ from stix2.parsing import _detect_spec_version
},
],
},
"v21",
"2.1",
),
# Mixed spec examples
(
@ -180,7 +180,7 @@ from stix2.parsing import _detect_spec_version
},
],
},
"v21",
"2.1",
),
(
{
@ -202,11 +202,11 @@ from stix2.parsing import _detect_spec_version
},
],
},
"v21",
"2.1",
),
],
)
def test_spec_version_detect(obj_dict, expected_ver):
detected_ver = _detect_spec_version(obj_dict)
detected_ver = detect_spec_version(obj_dict)
assert detected_ver == expected_ver

View File

@ -0,0 +1,262 @@
import pytest
import stix2.utils
###
# Tests using types/behaviors common to STIX 2.0 and 2.1.
###
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
@pytest.mark.parametrize(
"type_", [
"attack-pattern",
"campaign",
"course-of-action",
"identity",
"indicator",
"intrusion-set",
"malware",
"observed-data",
"report",
"threat-actor",
"tool",
"vulnerability",
],
)
def test_is_sdo(type_, stix_version):
assert stix2.utils.is_sdo(type_, stix_version)
id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert stix2.utils.is_sdo(id_, stix_version)
assert stix2.utils.is_stix_type(
type_, stix_version, stix2.utils.STIXTypeClass.SDO,
)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
@pytest.mark.parametrize(
"type_", [
"relationship",
"sighting",
"marking-definition",
"bundle",
"language-content",
"ipv4-addr",
"foo",
],
)
def test_is_not_sdo(type_, stix_version):
assert not stix2.utils.is_sdo(type_, stix_version)
id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert not stix2.utils.is_sdo(id_, stix_version)
d = {
"type": type_,
}
assert not stix2.utils.is_sdo(d, stix_version)
assert not stix2.utils.is_stix_type(
type_, stix_version, stix2.utils.STIXTypeClass.SDO,
)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
@pytest.mark.parametrize(
"type_", [
"artifact",
"autonomous-system",
"directory",
"domain-name",
"email-addr",
"email-message",
"file",
"ipv4-addr",
"ipv6-addr",
"mac-addr",
"mutex",
"network-traffic",
"process",
"software",
"url",
"user-account",
"windows-registry-key",
"x509-certificate",
],
)
def test_is_sco(type_, stix_version):
assert stix2.utils.is_sco(type_, stix_version)
id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert stix2.utils.is_sco(id_, stix_version)
assert stix2.utils.is_stix_type(
type_, stix_version, stix2.utils.STIXTypeClass.SCO,
)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
@pytest.mark.parametrize(
"type_", [
"identity",
"sighting",
"marking-definition",
"bundle",
"language-content",
"foo",
],
)
def test_is_not_sco(type_, stix_version):
assert not stix2.utils.is_sco(type_, stix_version)
id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert not stix2.utils.is_sco(id_, stix_version)
d = {
"type": type_,
}
assert not stix2.utils.is_sco(d, stix_version)
assert not stix2.utils.is_stix_type(
type_, stix_version, stix2.utils.STIXTypeClass.SCO,
)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
@pytest.mark.parametrize(
"type_", [
"relationship",
"sighting",
],
)
def test_is_sro(type_, stix_version):
assert stix2.utils.is_sro(type_, stix_version)
id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert stix2.utils.is_sro(id_, stix_version)
assert stix2.utils.is_stix_type(
type_, stix_version, stix2.utils.STIXTypeClass.SRO,
)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
@pytest.mark.parametrize(
"type_", [
"identity",
"marking-definition",
"bundle",
"language-content",
"ipv4-addr",
"foo",
],
)
def test_is_not_sro(type_, stix_version):
assert not stix2.utils.is_sro(type_, stix_version)
id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert not stix2.utils.is_sro(id_, stix_version)
d = {
"type": type_,
}
assert not stix2.utils.is_sro(d, stix_version)
assert not stix2.utils.is_stix_type(
type_, stix_version, stix2.utils.STIXTypeClass.SRO,
)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
def test_is_marking(stix_version):
assert stix2.utils.is_marking("marking-definition", stix_version)
id_ = "marking-definition--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert stix2.utils.is_marking(id_, stix_version)
assert stix2.utils.is_stix_type(
"marking-definition", stix_version, "marking-definition",
)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
@pytest.mark.parametrize(
"type_", [
"identity",
"bundle",
"language-content",
"ipv4-addr",
"foo",
],
)
def test_is_not_marking(type_, stix_version):
assert not stix2.utils.is_marking(type_, stix_version)
id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert not stix2.utils.is_marking(id_, stix_version)
d = {
"type": type_,
}
assert not stix2.utils.is_marking(d, stix_version)
assert not stix2.utils.is_stix_type(
type_, stix_version, "marking-definition",
)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
@pytest.mark.parametrize(
"type_", [
"identity",
"relationship",
"sighting",
"marking-definition",
"bundle",
"ipv4-addr",
],
)
def test_is_object(type_, stix_version):
assert stix2.utils.is_object(type_, stix_version)
id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert stix2.utils.is_object(id_, stix_version)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
def test_is_not_object(stix_version):
assert not stix2.utils.is_object("foo", stix_version)
id_ = "foo--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert not stix2.utils.is_object(id_, stix_version)
d = {
"type": "foo",
}
assert not stix2.utils.is_object(d, stix_version)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
def test_is_stix_type(stix_version):
assert not stix2.utils.is_stix_type(
"foo", stix_version, stix2.utils.STIXTypeClass.SDO, "foo",
)
assert stix2.utils.is_stix_type(
"bundle", stix_version, "foo", "bundle",
)
assert stix2.utils.is_stix_type(
"identity", stix_version,
stix2.utils.STIXTypeClass.SDO,
stix2.utils.STIXTypeClass.SRO,
)
assert stix2.utils.is_stix_type(
"software", stix_version,
stix2.utils.STIXTypeClass.SDO,
stix2.utils.STIXTypeClass.SCO,
)

View File

@ -1044,9 +1044,8 @@ def test_register_custom_object_with_version():
}
cust_obj_1 = stix2.parsing.dict_to_stix2(custom_obj_1, version='2.0')
v = 'v20'
assert cust_obj_1.type in stix2.registry.STIX2_OBJ_MAPS[v]['objects']
assert cust_obj_1.type in stix2.registry.STIX2_OBJ_MAPS['2.0']['objects']
# spec_version is not in STIX 2.0, and is required in 2.1, so this
# suffices as a test for a STIX 2.0 object.
assert "spec_version" not in cust_obj_1
@ -1076,9 +1075,8 @@ class NewObservable2(object):
def test_register_observable_with_version():
custom_obs = NewObservable2(property1="Test Observable")
v = 'v20'
assert custom_obs.type in stix2.registry.STIX2_OBJ_MAPS[v]['observables']
assert custom_obs.type in stix2.registry.STIX2_OBJ_MAPS['2.0']['observables']
def test_register_duplicate_observable_with_version():
@ -1101,10 +1099,9 @@ def test_register_marking_with_version():
)
class NewObj2():
pass
v = 'v20'
no = NewObj2(property1='something')
assert no._type in stix2.registry.STIX2_OBJ_MAPS[v]['markings']
assert no._type in stix2.registry.STIX2_OBJ_MAPS['2.0']['markings']
def test_register_observable_extension_with_version():
@ -1116,10 +1113,9 @@ def test_register_observable_extension_with_version():
class SomeCustomExtension2:
pass
v = 'v20'
example = SomeCustomExtension2(keys='test123')
assert example._type in stix2.registry.STIX2_OBJ_MAPS[v]['observable-extensions']['user-account']
assert example._type in stix2.registry.STIX2_OBJ_MAPS['2.0']['observable-extensions']['user-account']
def test_register_duplicate_observable_extension():

View File

@ -73,7 +73,6 @@ def test_register_marking_with_version():
_properties = OrderedDict()
registration._register_marking(NewMarking1, version='2.0')
v = 'v20'
assert NewMarking1._type in registry.STIX2_OBJ_MAPS[v]['markings']
assert v in str(registry.STIX2_OBJ_MAPS[v]['markings'][NewMarking1._type])
assert NewMarking1._type in registry.STIX2_OBJ_MAPS['2.0']['markings']
assert 'v20' in str(registry.STIX2_OBJ_MAPS['2.0']['markings'][NewMarking1._type])

View File

@ -237,3 +237,146 @@ def test_find_property_index(object, tuple_to_find, expected_index):
)
def test_iterate_over_values(dict_value, tuple_to_find, expected_index):
assert stix2.serialization._find_property_in_seq(dict_value.values(), *tuple_to_find) == expected_index
@pytest.mark.parametrize(
"type_", [
"attack-pattern",
"campaign",
"course-of-action",
"identity",
"indicator",
"intrusion-set",
"malware",
"observed-data",
"report",
"threat-actor",
"tool",
"vulnerability",
],
)
def test_is_sdo_dict(type_):
d = {
"type": type_,
}
assert stix2.utils.is_sdo(d, "2.0")
@pytest.mark.parametrize(
"dict_", [
{"type": "software", "spec_version": "2.1"},
{"type": "software"},
{"type": "identity", "spec_version": "2.1"},
{"type": "marking-definition", "spec_version": "2.1"},
{"type": "marking-definition"},
{"type": "bundle", "spec_version": "2.1"},
{"type": "bundle"},
{"type": "language-content", "spec_version": "2.1"},
{"type": "language-content"},
{"type": "relationship", "spec_version": "2.1"},
{"type": "relationship"},
{"type": "foo", "spec_version": "2.1"},
{"type": "foo"},
],
)
def test_is_not_sdo_dict(dict_):
assert not stix2.utils.is_sdo(dict_, "2.0")
def test_is_sco_dict():
d = {
"type": "file",
}
assert stix2.utils.is_sco(d, "2.0")
@pytest.mark.parametrize(
"dict_", [
{"type": "identity"},
{"type": "identity", "spec_version": "2.1"},
{"type": "software", "spec_version": "2.1"},
{"type": "marking-definition", "spec_version": "2.1"},
{"type": "marking-definition"},
{"type": "bundle", "spec_version": "2.1"},
{"type": "bundle"},
{"type": "language-content", "spec_version": "2.1"},
{"type": "language-content"},
{"type": "relationship", "spec_version": "2.1"},
{"type": "relationship"},
{"type": "foo", "spec_version": "2.1"},
{"type": "foo"},
],
)
def test_is_not_sco_dict(dict_):
assert not stix2.utils.is_sco(dict_, "2.0")
@pytest.mark.parametrize(
"dict_", [
{"type": "relationship"},
{"type": "sighting"},
],
)
def test_is_sro_dict(dict_):
assert stix2.utils.is_sro(dict_, "2.0")
@pytest.mark.parametrize(
"dict_", [
{"type": "identity", "spec_version": "2.1"},
{"type": "identity"},
{"type": "software", "spec_version": "2.1"},
{"type": "software"},
{"type": "marking-definition", "spec_version": "2.1"},
{"type": "marking-definition"},
{"type": "bundle", "spec_version": "2.1"},
{"type": "bundle"},
{"type": "language-content", "spec_version": "2.1"},
{"type": "language-content"},
{"type": "relationship", "spec_version": "2.1"},
{"type": "sighting", "spec_version": "2.1"},
{"type": "foo", "spec_version": "2.1"},
{"type": "foo"},
],
)
def test_is_not_sro_dict(dict_):
assert not stix2.utils.is_sro(dict_, "2.0")
@pytest.mark.parametrize(
"dict_", [
{"type": "identity"},
{"type": "software"},
{"type": "marking-definition"},
{
"type": "bundle",
"id": "bundle--8f431680-6278-4767-ba43-5edb682d7086",
"spec_version": "2.0",
"objects": [
{"type": "identity"},
{"type": "software"},
{"type": "marking-definition"},
],
},
],
)
def test_is_object_dict(dict_):
assert stix2.utils.is_object(dict_, "2.0")
@pytest.mark.parametrize(
"dict_", [
{"type": "identity", "spec_version": "2.1"},
{"type": "software", "spec_version": "2.1"},
{"type": "marking-definition", "spec_version": "2.1"},
{"type": "bundle", "spec_version": "2.1"},
{"type": "language-content", "spec_version": "2.1"},
{"type": "relationship", "spec_version": "2.1"},
{"type": "sighting", "spec_version": "2.1"},
{"type": "foo", "spec_version": "2.1"},
{"type": "foo"},
],
)
def test_is_not_object_dict(dict_):
assert not stix2.utils.is_object(dict_, "2.0")

View File

@ -1265,9 +1265,8 @@ def test_register_custom_object_with_version():
}
cust_obj_1 = stix2.parsing.dict_to_stix2(custom_obj_1, version='2.1')
v = 'v21'
assert cust_obj_1.type in stix2.registry.STIX2_OBJ_MAPS[v]['objects']
assert cust_obj_1.type in stix2.registry.STIX2_OBJ_MAPS['2.1']['objects']
assert cust_obj_1.spec_version == "2.1"
@ -1295,9 +1294,8 @@ class NewObservable3(object):
def test_register_observable():
custom_obs = NewObservable3(property1="Test Observable")
v = 'v21'
assert custom_obs.type in stix2.registry.STIX2_OBJ_MAPS[v]['observables']
assert custom_obs.type in stix2.registry.STIX2_OBJ_MAPS['2.1']['observables']
def test_register_duplicate_observable():
@ -1323,10 +1321,9 @@ def test_register_observable_custom_extension():
pass
example = NewExtension2(property1="Hi there")
v = 'v21'
assert 'domain-name' in stix2.registry.STIX2_OBJ_MAPS[v]['observables']
assert example._type in stix2.registry.STIX2_OBJ_MAPS[v]['observable-extensions']['domain-name']
assert 'domain-name' in stix2.registry.STIX2_OBJ_MAPS['2.1']['observables']
assert example._type in stix2.registry.STIX2_OBJ_MAPS['2.1']['observable-extensions']['domain-name']
def test_register_duplicate_observable_extension():

View File

@ -78,10 +78,9 @@ def test_register_marking_with_version():
_properties = OrderedDict()
registration._register_marking(NewMarking1, version='2.1')
v = 'v21'
assert NewMarking1._type in registry.STIX2_OBJ_MAPS[v]['markings']
assert v in str(registry.STIX2_OBJ_MAPS[v]['markings'][NewMarking1._type])
assert NewMarking1._type in registry.STIX2_OBJ_MAPS['2.1']['markings']
assert 'v21' in str(registry.STIX2_OBJ_MAPS['2.1']['markings'][NewMarking1._type])
@pytest.mark.xfail(reason="The default version is not 2.1", condition=DEFAULT_VERSION != "2.1")
@ -92,7 +91,6 @@ def test_register_marking_with_no_version():
_properties = OrderedDict()
registration._register_marking(NewMarking2)
v = 'v21'
assert NewMarking2._type in registry.STIX2_OBJ_MAPS[v]['markings']
assert v in str(registry.STIX2_OBJ_MAPS[v]['markings'][NewMarking2._type])
assert NewMarking2._type in registry.STIX2_OBJ_MAPS['2.1']['markings']
assert 'v21' in str(registry.STIX2_OBJ_MAPS['2.1']['markings'][NewMarking2._type])

View File

@ -241,3 +241,153 @@ def test_find_property_index(object, tuple_to_find, expected_index):
)
def test_iterate_over_values(dict_value, tuple_to_find, expected_index):
assert stix2.serialization._find_property_in_seq(dict_value.values(), *tuple_to_find) == expected_index
@pytest.mark.parametrize(
"type_", [
"attack-pattern",
"campaign",
"course-of-action",
"identity",
"indicator",
"intrusion-set",
"malware",
"observed-data",
"report",
"threat-actor",
"tool",
"vulnerability",
# New in 2.1
"grouping",
"infrastructure",
"location",
"malware-analysis",
"note",
"opinion",
],
)
def test_is_sdo_dict(type_):
d = {
"type": type_,
"spec_version": "2.1",
}
assert stix2.utils.is_sdo(d, "2.1")
@pytest.mark.parametrize(
"dict_", [
{"type": "software", "spec_version": "2.1"},
{"type": "software"},
{"type": "identity"},
{"type": "marking-definition", "spec_version": "2.1"},
{"type": "marking-definition"},
{"type": "bundle", "spec_version": "2.1"},
{"type": "bundle"},
{"type": "language-content", "spec_version": "2.1"},
{"type": "language-content"},
{"type": "relationship", "spec_version": "2.1"},
{"type": "relationship"},
{"type": "foo", "spec_version": "2.1"},
],
)
def test_is_not_sdo_dict(dict_):
assert not stix2.utils.is_sdo(dict_, "2.1")
def test_is_sco_dict():
d = {
"type": "file",
"spec_version": "2.1",
}
assert stix2.utils.is_sco(d, "2.1")
@pytest.mark.parametrize(
"dict_", [
{"type": "identity"},
{"type": "identity", "spec_version": "2.1"},
{"type": "software"},
{"type": "marking-definition", "spec_version": "2.1"},
{"type": "marking-definition"},
{"type": "bundle", "spec_version": "2.1"},
{"type": "bundle"},
{"type": "language-content", "spec_version": "2.1"},
{"type": "language-content"},
{"type": "relationship", "spec_version": "2.1"},
{"type": "relationship"},
{"type": "foo", "spec_version": "2.1"},
],
)
def test_is_not_sco_dict(dict_):
assert not stix2.utils.is_sco(dict_, "2.1")
@pytest.mark.parametrize(
"dict_", [
{"type": "relationship", "spec_version": "2.1"},
{"type": "sighting", "spec_version": "2.1"},
],
)
def test_is_sro_dict(dict_):
assert stix2.utils.is_sro(dict_, "2.1")
@pytest.mark.parametrize(
"dict_", [
{"type": "identity", "spec_version": "2.1"},
{"type": "identity"},
{"type": "software", "spec_version": "2.1"},
{"type": "software"},
{"type": "marking-definition", "spec_version": "2.1"},
{"type": "marking-definition"},
{"type": "bundle", "spec_version": "2.1"},
{"type": "bundle"},
{"type": "language-content", "spec_version": "2.1"},
{"type": "language-content"},
{"type": "relationship"},
{"type": "sighting"},
{"type": "foo", "spec_version": "2.1"},
],
)
def test_is_not_sro_dict(dict_):
assert not stix2.utils.is_sro(dict_, "2.1")
@pytest.mark.parametrize(
"dict_", [
{"type": "identity", "spec_version": "2.1"},
{"type": "software", "spec_version": "2.1"},
{"type": "marking-definition", "spec_version": "2.1"},
{"type": "language-content", "spec_version": "2.1"},
{
"type": "bundle",
"id": "bundle--8f431680-6278-4767-ba43-5edb682d7086",
"objects": [
{"type": "identity", "spec_version": "2.1"},
{"type": "software", "spec_version": "2.1"},
{"type": "marking-definition", "spec_version": "2.1"},
{"type": "language-content", "spec_version": "2.1"},
],
},
],
)
def test_is_object_dict(dict_):
assert stix2.utils.is_object(dict_, "2.1")
@pytest.mark.parametrize(
"dict_", [
{"type": "identity"},
{"type": "software"},
{"type": "marking-definition"},
{"type": "bundle"},
{"type": "language-content"},
{"type": "relationship"},
{"type": "sighting"},
{"type": "foo"},
],
)
def test_is_not_object_dict(dict_):
assert not stix2.utils.is_object(dict_, "2.1")

View File

@ -346,6 +346,38 @@ def test_version_sco_with_custom():
assert revoked_obj.revoked
def test_version_sco_id_contributing_properties():
file_sco_obj = stix2.v21.File(
name="data.txt",
created="1973-11-23T02:31:37Z",
modified="1991-05-13T19:24:57Z",
revoked=False,
allow_custom=True,
)
with pytest.raises(stix2.exceptions.UnmodifiablePropertyError) as e:
stix2.versioning.new_version(file_sco_obj, name="foo.dat")
assert e.value.unchangable_properties == {"name"}
def test_version_sco_id_contributing_properties_dict():
file_sco_dict = {
"type": "file",
"id": "file--c27c572c-2e17-5ce1-817e-67bb97629a56",
"spec_version": "2.1",
"name": "data.txt",
"created": "1973-11-23T02:31:37Z",
"modified": "1991-05-13T19:24:57Z",
"revoked": False,
}
with pytest.raises(stix2.exceptions.UnmodifiablePropertyError) as e:
stix2.versioning.new_version(file_sco_dict, name="foo.dat")
assert e.value.unchangable_properties == {"name"}
def test_version_disable_custom():
m = stix2.v21.Malware(
name="foo", description="Steals your identity!", is_family=False,

View File

@ -1,5 +1,6 @@
"""Utility functions and classes for the STIX2 library."""
import collections.abc
import datetime as dt
import enum
import json
@ -8,7 +9,8 @@ import re
import pytz
import six
import stix2
import stix2.registry as mappings
import stix2.version
# Sentinel value for properties that should be set to the current time.
# We can't use the standard 'default' approach, since if there are multiple
@ -313,18 +315,262 @@ def get_type_from_id(stix_id):
return stix_id.split('--', 1)[0]
def is_marking(obj_or_id):
"""Determines whether the given object or object ID is/is for a marking
definition.
def detect_spec_version(stix_dict):
"""
Given a dict representing a STIX object, try to detect what spec version
it is likely to comply with.
:param obj_or_id: A STIX object or object ID as a string.
:return: True if a marking definition, False otherwise.
:param stix_dict: A dict with some STIX content. Must at least have a
"type" property.
:return: A STIX version in "X.Y" format
"""
if isinstance(obj_or_id, (stix2.base._STIXBase, dict)):
result = obj_or_id["type"] == "marking-definition"
obj_type = stix_dict["type"]
if 'spec_version' in stix_dict:
# For STIX 2.0, applies to bundles only.
# For STIX 2.1+, applies to SCOs, SDOs, SROs, and markings only.
v = stix_dict['spec_version']
elif "id" not in stix_dict:
# Only 2.0 SCOs don't have ID properties
v = "2.0"
elif obj_type == 'bundle':
# Bundle without a spec_version property: must be 2.1. But to
# future-proof, use max version over all contained SCOs, with 2.1
# minimum.
v = max(
"2.1",
max(
detect_spec_version(obj) for obj in stix_dict["objects"]
),
)
elif obj_type in mappings.STIX2_OBJ_MAPS["2.1"]["observables"]:
# Non-bundle object with an ID and without spec_version. Could be a
# 2.1 SCO or 2.0 SDO/SRO/marking. Check for 2.1 SCO...
v = "2.1"
else:
# it's a string ID
result = obj_or_id.startswith("marking-definition--")
# Not a 2.1 SCO; must be a 2.0 object.
v = "2.0"
return v
def _stix_type_of(value):
"""
Get a STIX type from the given value: if a STIX ID is passed, the type
prefix is extracted; if string which is not a STIX ID is passed, it is
assumed to be a STIX type and is returned; otherwise it is assumed to be a
mapping with a "type" property, and the value of that property is returned.
:param value: A mapping with a "type" property, or a STIX ID or type
as a string
:return: A STIX type
"""
if isinstance(value, str):
if "--" in value:
type_ = get_type_from_id(value)
else:
type_ = value
else:
type_ = value["type"]
return type_
def is_sdo(value, stix_version=stix2.version.DEFAULT_VERSION):
"""
Determine whether the given object, type, or ID is/is for an SDO of the
given STIX version. If value is a type or ID, this just checks whether
the type was registered as an SDO in the given STIX version. If a mapping,
*simple* STIX version inference is additionally done on the value, and the
result is checked against stix_version. It does not attempt to fully
validate the value.
:param value: A mapping with a "type" property, or a STIX ID or type
as a string
:param stix_version: A STIX version as a string
:return: True if the type of the given value is an SDO type of the given
version; False if not
"""
result = True
if isinstance(value, collections.abc.Mapping):
value_stix_version = detect_spec_version(value)
if value_stix_version != stix_version:
result = False
if result:
cls_maps = mappings.STIX2_OBJ_MAPS[stix_version]
type_ = _stix_type_of(value)
result = type_ in cls_maps["objects"] and type_ not in {
"relationship", "sighting", "marking-definition", "bundle",
"language-content",
}
return result
def is_sco(value, stix_version=stix2.version.DEFAULT_VERSION):
"""
Determine whether the given object, type, or ID is/is for an SCO of the
given STIX version. If value is a type or ID, this just checks whether
the type was registered as an SCO in the given STIX version. If a mapping,
*simple* STIX version inference is additionally done on the value, and the
result is checked against stix_version. It does not attempt to fully
validate the value.
:param value: A mapping with a "type" property, or a STIX ID or type
as a string
:param stix_version: A STIX version as a string
:return: True if the type of the given value is an SCO type of the given
version; False if not
"""
result = True
if isinstance(value, collections.abc.Mapping):
value_stix_version = detect_spec_version(value)
if value_stix_version != stix_version:
result = False
if result:
cls_maps = mappings.STIX2_OBJ_MAPS[stix_version]
type_ = _stix_type_of(value)
result = type_ in cls_maps["observables"]
return result
def is_sro(value, stix_version=stix2.version.DEFAULT_VERSION):
"""
Determine whether the given object, type, or ID is/is for an SRO of the
given STIX version. If value is a type or ID, this just checks whether
the type is "sighting" or "relationship". If a mapping, *simple* STIX
version inference is additionally done on the value, and the result is
checked against stix_version. It does not attempt to fully validate the
value.
:param value: A mapping with a "type" property, or a STIX ID or type
as a string
:param stix_version: A STIX version as a string
:return: True if the type of the given value is an SRO type of the given
version; False if not
"""
result = True
if isinstance(value, collections.abc.Mapping):
value_stix_version = detect_spec_version(value)
if value_stix_version != stix_version:
result = False
if result:
# No need to check registration in this case
type_ = _stix_type_of(value)
result = type_ in ("sighting", "relationship")
return result
def is_object(value, stix_version=stix2.version.DEFAULT_VERSION):
"""
Determine whether an object, type, or ID is/is for any STIX object. This
includes all SDOs, SCOs, meta-objects, and bundle. If value is a type or
ID, this just checks whether the type was registered in the given STIX
version. If a mapping, *simple* STIX version inference is additionally
done on the value, and the result is checked against stix_version. It does
not attempt to fully validate the value.
:param value: A mapping with a "type" property, or a STIX ID or type
as a string
:param stix_version: A STIX version as a string
:return: True if the type of the given value is a valid STIX type with
respect to the given STIX version; False if not
"""
result = True
if isinstance(value, collections.abc.Mapping):
value_stix_version = detect_spec_version(value)
if value_stix_version != stix_version:
result = False
if result:
cls_maps = mappings.STIX2_OBJ_MAPS[stix_version]
type_ = _stix_type_of(value)
result = type_ in cls_maps["observables"] \
or type_ in cls_maps["objects"]
return result
def is_marking(value, stix_version=stix2.version.DEFAULT_VERSION):
"""
Determine whether the given object, type, or ID is/is for an marking
definition of the given STIX version. If value is a type or ID, this just
checks whether the type is "marking-definition". If a mapping, *simple*
STIX version inference is additionally done on the value, and the result
is checked against stix_version. It does not attempt to fully validate the
value.
:param value: A STIX object, object ID, or type as a string.
:param stix_version: A STIX version as a string
:return: True if the value is/is for a marking definition, False otherwise.
"""
result = True
if isinstance(value, collections.abc.Mapping):
value_stix_version = detect_spec_version(value)
if value_stix_version != stix_version:
result = False
if result:
# No need to check registration in this case
type_ = _stix_type_of(value)
result = type_ == "marking-definition"
return result
class STIXTypeClass(enum.Enum):
"""
Represents different classes of STIX type.
"""
SDO = 0
SCO = 1
SRO = 2
def is_stix_type(value, stix_version=stix2.version.DEFAULT_VERSION, *types):
"""
Determine whether the type of the given value satisfies the given
constraints. 'types' must contain STIX types as strings, and/or the
STIXTypeClass enum values. STIX types imply an exact match constraint;
STIXTypeClass enum values imply a more general constraint, that the object
or type be in that class of STIX type. These constraints are implicitly
OR'd together.
:param value: A mapping with a "type" property, or a STIX ID or type
as a string
:param stix_version: A STIX version as a string
:param types: A sequence of STIX type strings or STIXTypeClass enum values
:return: True if the object or type satisfies the constraints; False if not
"""
for type_ in types:
if type_ is STIXTypeClass.SDO:
result = is_sdo(value, stix_version)
elif type_ is STIXTypeClass.SCO:
result = is_sco(value, stix_version)
elif type_ is STIXTypeClass.SRO:
result = is_sro(value, stix_version)
else:
# Assume a string STIX type is given instead of a class enum,
# and just check for exact match.
obj_type = _stix_type_of(value)
result = obj_type == type_ and is_object(value, stix_version)
if result:
break
else:
result = False
return result

View File

@ -1,16 +1,17 @@
"""STIX2 core versioning methods."""
from collections.abc import Mapping
import copy
import datetime as dt
import itertools
import uuid
import six
from six.moves.collections_abc import Mapping
import stix2.base
import stix2.registry
from stix2.utils import get_timestamp, parse_into_datetime
from stix2.utils import (
detect_spec_version, get_timestamp, is_sco, is_sdo, is_sro,
parse_into_datetime,
)
import stix2.v20
from .exceptions import (
@ -74,58 +75,47 @@ def _is_versionable(data):
"""
is_versionable = False
is_21 = False
stix_vid = None
stix_version = None
if isinstance(data, Mapping):
# First, determine spec version. It's easy for our stix2 objects; more
# work for dicts.
is_21 = False
if isinstance(data, stix2.base._STIXBase) and \
not isinstance(data, stix2.v20._STIXBase20):
# (is_21 means 2.1 or later; try not to be 2.1-specific)
is_21 = True
if isinstance(data, stix2.v20._STIXBase20):
stix_version = "2.0"
elif isinstance(data, stix2.v21._STIXBase21):
stix_version = "2.1"
elif isinstance(data, dict):
stix_vid = stix2.parsing._detect_spec_version(data)
is_21 = stix_vid != "v20"
stix_version = detect_spec_version(data)
# Then, determine versionability.
if six.PY2:
# dumb python2 compatibility: map.keys() returns a list, not a set!
# six.viewkeys() compatibility function uses dict.viewkeys() on
# python2, which is not a Mapping mixin method, so that doesn't
# work either (for our stix2 objects).
keys = set(data)
else:
keys = data.keys()
# This should be sufficient for STIX objects; maybe we get lucky with
# dicts here but probably not.
if keys >= _VERSIONING_PROPERTIES:
if data.keys() >= _VERSIONING_PROPERTIES:
is_versionable = True
# Tougher to handle dicts. We need to consider STIX version, map to a
# registered class, and from that get a more complete picture of its
# properties.
elif isinstance(data, dict):
class_maps = stix2.registry.STIX2_OBJ_MAPS[stix_vid]
obj_type = data["type"]
if obj_type in class_maps["objects"]:
if is_sdo(obj_type, stix_version) or is_sro(obj_type, stix_version):
# Should we bother checking properties for SDOs/SROs?
# They were designed to be versionable.
is_versionable = True
elif obj_type in class_maps["observables"]:
elif is_sco(obj_type, stix_version):
# but do check SCOs
cls = class_maps["observables"][obj_type]
cls = stix2.registry.class_for_type(
obj_type, stix_version, "observables",
)
is_versionable = _VERSIONING_PROPERTIES.issubset(
cls._properties,
)
return is_versionable, is_21
return is_versionable, stix_version
def new_version(data, allow_custom=None, **kwargs):
@ -144,7 +134,7 @@ def new_version(data, allow_custom=None, **kwargs):
:return: The new object.
"""
is_versionable, is_21 = _is_versionable(data)
is_versionable, stix_version = _is_versionable(data)
if not is_versionable:
raise ValueError(
@ -165,10 +155,17 @@ def new_version(data, allow_custom=None, **kwargs):
# probably were). That would imply an ID change, which is not allowed
# across versions.
sco_locked_props = []
if is_21 and isinstance(data, stix2.base._Observable):
if is_sco(data, "2.1"):
uuid_ = uuid.UUID(data["id"][-36:])
if uuid_.variant == uuid.RFC_4122 and uuid_.version == 5:
sco_locked_props = data._id_contributing_properties
if isinstance(data, stix2.base._Observable):
cls = data.__class__
else:
cls = stix2.registry.class_for_type(
data["type"], stix_version, "observables",
)
sco_locked_props = cls._id_contributing_properties
unchangable_properties = set()
for prop in itertools.chain(STIX_UNMOD_PROPERTIES, sco_locked_props):
@ -179,7 +176,7 @@ def new_version(data, allow_custom=None, **kwargs):
# Different versioning precision rules in STIX 2.0 vs 2.1, so we need
# to know which rules to apply.
precision_constraint = "min" if is_21 else "exact"
precision_constraint = "min" if stix_version == "2.1" else "exact"
cls = type(data)
if 'modified' not in kwargs:
@ -189,7 +186,9 @@ def new_version(data, allow_custom=None, **kwargs):
)
new_modified = get_timestamp()
new_modified = _fudge_modified(old_modified, new_modified, is_21)
new_modified = _fudge_modified(
old_modified, new_modified, stix_version == "2.1",
)
kwargs['modified'] = new_modified