Change versioning module to use some of the is_* utility
functions. Changed some ">= 2.1" stix version semantics to be "== 2.1", because we don't have any version >= 2.1, so they are currently equivalent, and the is_*() functions don't support STIX version ranges. They only support exact versions. We can look at this again if a newer STIX version ever emerges. Also added a class_for_type() function to the registry module, which was useful for the versioning module changes described above. I thought that function would be helpful in the parsing module, to simplify code there, so I changed that module a bit to use it.pull/1/head
parent
fe2330af07
commit
473e7d0068
|
@ -77,19 +77,16 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None):
|
|||
if not version:
|
||||
version = detect_spec_version(stix_dict)
|
||||
|
||||
OBJ_MAP = dict(
|
||||
registry.STIX2_OBJ_MAPS[version]['objects'],
|
||||
**registry.STIX2_OBJ_MAPS[version]['observables']
|
||||
)
|
||||
obj_type = stix_dict["type"]
|
||||
obj_class = registry.class_for_type(obj_type, version, "objects") \
|
||||
or registry.class_for_type(obj_type, version, "observables")
|
||||
|
||||
try:
|
||||
obj_class = OBJ_MAP[stix_dict['type']]
|
||||
except KeyError:
|
||||
if not obj_class:
|
||||
if allow_custom:
|
||||
# flag allows for unknown custom objects too, but will not
|
||||
# be parsed into STIX object, returned as is
|
||||
return stix_dict
|
||||
raise ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % stix_dict['type'])
|
||||
raise ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % obj_type)
|
||||
|
||||
return obj_class(allow_custom=allow_custom, **stix_dict)
|
||||
|
||||
|
@ -127,10 +124,9 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None):
|
|||
if not version:
|
||||
version = detect_spec_version(obj)
|
||||
|
||||
try:
|
||||
OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[version]['observables']
|
||||
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
|
||||
except KeyError:
|
||||
obj_type = obj["type"]
|
||||
obj_class = registry.class_for_type(obj_type, version, "observables")
|
||||
if not obj_class:
|
||||
if allow_custom:
|
||||
# flag allows for unknown custom objects too, but will not
|
||||
# be parsed into STIX observable object, just returned as is
|
||||
|
|
|
@ -42,3 +42,39 @@ def _collect_stix2_mappings():
|
|||
ver = _stix_vid_to_version(stix_vid)
|
||||
mod = importlib.import_module(name, str(top_level_module.__name__))
|
||||
STIX2_OBJ_MAPS[ver]['markings'] = mod.OBJ_MAP_MARKING
|
||||
|
||||
|
||||
def class_for_type(stix_type, stix_version, category=None):
|
||||
"""
|
||||
Get the registered class which implements a particular STIX type for a
|
||||
particular STIX version.
|
||||
|
||||
:param stix_type: A STIX type as a string
|
||||
:param stix_version: A STIX version as a string, e.g. "2.1"
|
||||
:param category: An optional "category" value, which is just used directly
|
||||
as a second key after the STIX version, and depends on how the types
|
||||
are internally categorized. This would be useful if the same STIX type
|
||||
is used to mean two different things within the same STIX version. So
|
||||
it's unlikely to be necessary. Pass None to just search all the
|
||||
categories and return the first class found.
|
||||
:return: A registered python class which implements the given STIX type, or
|
||||
None if one is not found.
|
||||
"""
|
||||
cls = None
|
||||
|
||||
cat_map = STIX2_OBJ_MAPS.get(stix_version)
|
||||
if cat_map:
|
||||
if category:
|
||||
class_map = cat_map.get(category)
|
||||
if class_map:
|
||||
cls = class_map.get(stix_type)
|
||||
else:
|
||||
cls = cat_map["objects"].get(stix_type) \
|
||||
or cat_map["observables"].get(stix_type) \
|
||||
or cat_map["markings"].get(stix_type)
|
||||
|
||||
# Left "observable-extensions" out; it has a different
|
||||
# substructure. A version->category->type lookup would result
|
||||
# in another map, not a class. So it doesn't fit the pattern.
|
||||
|
||||
return cls
|
||||
|
|
|
@ -10,7 +10,10 @@ from six.moves.collections_abc import Mapping
|
|||
|
||||
import stix2.base
|
||||
import stix2.registry
|
||||
from stix2.utils import get_timestamp, parse_into_datetime, detect_spec_version
|
||||
from stix2.utils import (
|
||||
get_timestamp, parse_into_datetime, detect_spec_version,
|
||||
is_sdo, is_sro, is_sco
|
||||
)
|
||||
import stix2.v20
|
||||
|
||||
from .exceptions import (
|
||||
|
@ -74,7 +77,6 @@ def _is_versionable(data):
|
|||
"""
|
||||
|
||||
is_versionable = False
|
||||
is_21 = False
|
||||
stix_version = None
|
||||
|
||||
if isinstance(data, Mapping):
|
||||
|
@ -82,13 +84,12 @@ def _is_versionable(data):
|
|||
# First, determine spec version. It's easy for our stix2 objects; more
|
||||
# work for dicts.
|
||||
is_21 = False
|
||||
if isinstance(data, stix2.base._STIXBase) and \
|
||||
not isinstance(data, stix2.v20._STIXBase20):
|
||||
# (is_21 means 2.1 or later; try not to be 2.1-specific)
|
||||
is_21 = True
|
||||
if isinstance(data, stix2.v20._STIXBase20):
|
||||
stix_version = "2.0"
|
||||
elif isinstance(data, stix2.v21._STIXBase21):
|
||||
stix_version = "2.1"
|
||||
elif isinstance(data, dict):
|
||||
stix_version = detect_spec_version(data)
|
||||
is_21 = stix_version != "2.0"
|
||||
|
||||
# Then, determine versionability.
|
||||
|
||||
|
@ -110,22 +111,23 @@ def _is_versionable(data):
|
|||
# registered class, and from that get a more complete picture of its
|
||||
# properties.
|
||||
elif isinstance(data, dict):
|
||||
class_maps = stix2.registry.STIX2_OBJ_MAPS[stix_version]
|
||||
obj_type = data["type"]
|
||||
|
||||
if obj_type in class_maps["objects"]:
|
||||
if is_sdo(obj_type, stix_version) or is_sro(obj_type, stix_version):
|
||||
# Should we bother checking properties for SDOs/SROs?
|
||||
# They were designed to be versionable.
|
||||
is_versionable = True
|
||||
|
||||
elif obj_type in class_maps["observables"]:
|
||||
elif is_sco(obj_type, stix_version):
|
||||
# but do check SCOs
|
||||
cls = class_maps["observables"][obj_type]
|
||||
cls = stix2.registry.class_for_type(
|
||||
obj_type, stix_version, "observables"
|
||||
)
|
||||
is_versionable = _VERSIONING_PROPERTIES.issubset(
|
||||
cls._properties,
|
||||
)
|
||||
|
||||
return is_versionable, is_21
|
||||
return is_versionable, stix_version
|
||||
|
||||
|
||||
def new_version(data, allow_custom=None, **kwargs):
|
||||
|
@ -144,7 +146,7 @@ def new_version(data, allow_custom=None, **kwargs):
|
|||
:return: The new object.
|
||||
"""
|
||||
|
||||
is_versionable, is_21 = _is_versionable(data)
|
||||
is_versionable, stix_version = _is_versionable(data)
|
||||
|
||||
if not is_versionable:
|
||||
raise ValueError(
|
||||
|
@ -165,10 +167,13 @@ def new_version(data, allow_custom=None, **kwargs):
|
|||
# probably were). That would imply an ID change, which is not allowed
|
||||
# across versions.
|
||||
sco_locked_props = []
|
||||
if is_21 and isinstance(data, stix2.base._Observable):
|
||||
if is_sco(data, "2.1"):
|
||||
cls = stix2.registry.class_for_type(
|
||||
data["type"], stix_version, "observables"
|
||||
)
|
||||
uuid_ = uuid.UUID(data["id"][-36:])
|
||||
if uuid_.variant == uuid.RFC_4122 and uuid_.version == 5:
|
||||
sco_locked_props = data._id_contributing_properties
|
||||
sco_locked_props = cls._id_contributing_properties
|
||||
|
||||
unchangable_properties = set()
|
||||
for prop in itertools.chain(STIX_UNMOD_PROPERTIES, sco_locked_props):
|
||||
|
@ -179,7 +184,7 @@ def new_version(data, allow_custom=None, **kwargs):
|
|||
|
||||
# Different versioning precision rules in STIX 2.0 vs 2.1, so we need
|
||||
# to know which rules to apply.
|
||||
precision_constraint = "min" if is_21 else "exact"
|
||||
precision_constraint = "min" if stix_version == "2.1" else "exact"
|
||||
|
||||
cls = type(data)
|
||||
if 'modified' not in kwargs:
|
||||
|
@ -189,7 +194,9 @@ def new_version(data, allow_custom=None, **kwargs):
|
|||
)
|
||||
|
||||
new_modified = get_timestamp()
|
||||
new_modified = _fudge_modified(old_modified, new_modified, is_21)
|
||||
new_modified = _fudge_modified(
|
||||
old_modified, new_modified, stix_version == "2.1"
|
||||
)
|
||||
|
||||
kwargs['modified'] = new_modified
|
||||
|
||||
|
|
Loading…
Reference in New Issue