Merge branch 'master' of github.com:oasis-open/cti-python-stix2 into dev-extensions-proposal

pull/1/head
Emmanuelle Vargas-Gonzalez 2021-02-18 16:04:30 -05:00
commit a8b6fa2100
29 changed files with 1677 additions and 519 deletions

View File

@ -24,8 +24,6 @@
# flake8: noqa
DEFAULT_VERSION = '2.1' # Default version will always be the latest STIX 2.X version
from .confidence import scales
from .datastore import CompositeDataSource
from .datastore.filesystem import (
@ -41,7 +39,7 @@ from .markings import (
add_markings, clear_markings, get_markings, is_marked, remove_markings,
set_markings,
)
from .parsing import _collect_stix2_mappings, parse, parse_observable
from .parsing import parse, parse_observable
from .patterns import (
AndBooleanExpression, AndObservationExpression, BasicObjectPathComponent,
BinaryConstant, BooleanConstant, EqualityComparisonExpression,
@ -57,8 +55,9 @@ from .patterns import (
RepeatQualifier, StartStopQualifier, StringConstant, TimestampConstant,
WithinQualifier,
)
from .registry import _collect_stix2_mappings
from .v21 import * # This import will always be the latest STIX 2.X version
from .version import __version__
from .version import DEFAULT_VERSION, __version__
from .versioning import new_version, revoke
_collect_stix2_mappings()

View File

@ -3,9 +3,9 @@ from collections import OrderedDict
import six
from .base import _cls_init
from .parsing import (
_get_extension_class, _register_extension, _register_marking,
_register_object, _register_observable,
from .registration import (
_register_marking, _register_object, _register_observable,
_register_observable_extension,
)

View File

@ -9,26 +9,22 @@
|
"""
import stix2
from stix2.equivalence.pattern.compare.observation import (
observation_expression_cmp,
from ... import pattern_visitor
from ...version import DEFAULT_VERSION
from .compare.observation import observation_expression_cmp
from .transform import ChainTransformer, SettleTransformer
from .transform.observation import (
AbsorptionTransformer, DNFTransformer, FlattenTransformer,
NormalizeComparisonExpressionsTransformer, OrderDedupeTransformer,
)
from stix2.equivalence.pattern.transform import (
ChainTransformer, SettleTransformer,
)
from stix2.equivalence.pattern.transform.observation import (
AbsorptionTransformer, CanonicalizeComparisonExpressionsTransformer,
DNFTransformer, FlattenTransformer, OrderDedupeTransformer,
)
import stix2.pattern_visitor
# Lazy-initialize
_pattern_canonicalizer = None
_pattern_normalizer = None
def _get_pattern_canonicalizer():
def _get_pattern_normalizer():
"""
Get a canonicalization transformer for STIX patterns.
Get a normalization transformer for STIX patterns.
Returns:
The transformer
@ -37,11 +33,11 @@ def _get_pattern_canonicalizer():
# The transformers are either stateless or contain no state which changes
# with each use. So we can setup the transformers once and keep reusing
# them.
global _pattern_canonicalizer
global _pattern_normalizer
if not _pattern_canonicalizer:
canonicalize_comp_expr = \
CanonicalizeComparisonExpressionsTransformer()
if not _pattern_normalizer:
normalize_comp_expr = \
NormalizeComparisonExpressionsTransformer()
obs_expr_flatten = FlattenTransformer()
obs_expr_order = OrderDedupeTransformer()
@ -53,15 +49,15 @@ def _get_pattern_canonicalizer():
obs_dnf = DNFTransformer()
_pattern_canonicalizer = ChainTransformer(
canonicalize_comp_expr,
_pattern_normalizer = ChainTransformer(
normalize_comp_expr,
obs_settle_simplify, obs_dnf, obs_settle_simplify,
)
return _pattern_canonicalizer
return _pattern_normalizer
def equivalent_patterns(pattern1, pattern2, stix_version=stix2.DEFAULT_VERSION):
def equivalent_patterns(pattern1, pattern2, stix_version=DEFAULT_VERSION):
"""
Determine whether two STIX patterns are semantically equivalent.
@ -74,29 +70,29 @@ def equivalent_patterns(pattern1, pattern2, stix_version=stix2.DEFAULT_VERSION):
Returns:
True if the patterns are semantically equivalent; False if not
"""
patt_ast1 = stix2.pattern_visitor.create_pattern_object(
patt_ast1 = pattern_visitor.create_pattern_object(
pattern1, version=stix_version,
)
patt_ast2 = stix2.pattern_visitor.create_pattern_object(
patt_ast2 = pattern_visitor.create_pattern_object(
pattern2, version=stix_version,
)
pattern_canonicalizer = _get_pattern_canonicalizer()
canon_patt1, _ = pattern_canonicalizer.transform(patt_ast1)
canon_patt2, _ = pattern_canonicalizer.transform(patt_ast2)
pattern_normalizer = _get_pattern_normalizer()
norm_patt1, _ = pattern_normalizer.transform(patt_ast1)
norm_patt2, _ = pattern_normalizer.transform(patt_ast2)
result = observation_expression_cmp(canon_patt1, canon_patt2)
result = observation_expression_cmp(norm_patt1, norm_patt2)
return result == 0
def find_equivalent_patterns(
search_pattern, patterns, stix_version=stix2.DEFAULT_VERSION,
search_pattern, patterns, stix_version=DEFAULT_VERSION,
):
"""
Find patterns from a sequence which are equivalent to a given pattern.
This is more efficient than using equivalent_patterns() in a loop, because
it doesn't re-canonicalize the search pattern over and over. This works
it doesn't re-normalize the search pattern over and over. This works
on an input iterable and is implemented as a generator of matches. So you
can "stream" patterns in and matching patterns will be streamed out.
@ -109,23 +105,23 @@ def find_equivalent_patterns(
Returns:
A generator iterator producing the semantically equivalent patterns
"""
search_pattern_ast = stix2.pattern_visitor.create_pattern_object(
search_pattern_ast = pattern_visitor.create_pattern_object(
search_pattern, version=stix_version,
)
pattern_canonicalizer = _get_pattern_canonicalizer()
canon_search_pattern_ast, _ = pattern_canonicalizer.transform(
pattern_normalizer = _get_pattern_normalizer()
norm_search_pattern_ast, _ = pattern_normalizer.transform(
search_pattern_ast,
)
for pattern in patterns:
pattern_ast = stix2.pattern_visitor.create_pattern_object(
pattern_ast = pattern_visitor.create_pattern_object(
pattern, version=stix_version,
)
canon_pattern_ast, _ = pattern_canonicalizer.transform(pattern_ast)
norm_pattern_ast, _ = pattern_normalizer.transform(pattern_ast)
result = observation_expression_cmp(
canon_search_pattern_ast, canon_pattern_ast,
norm_search_pattern_ast, norm_pattern_ast,
)
if result == 0:

View File

@ -346,7 +346,7 @@ def comparison_expression_cmp(expr1, expr2):
"""
Compare two comparison expressions. This is sensitive to the order of the
expressions' sub-components. To achieve an order-insensitive comparison,
the ASTs must be canonically ordered first.
the sub-component ASTs must be ordered first.
Args:
expr1: The first comparison expression

View File

@ -62,7 +62,7 @@ def observation_expression_cmp(expr1, expr2):
"""
Compare two observation expression ASTs. This is sensitive to the order of
the expressions' sub-components. To achieve an order-insensitive
comparison, the ASTs must be canonically ordered first.
comparison, the sub-component ASTs must be ordered first.
Args:
expr1: The first observation expression

View File

@ -46,7 +46,7 @@ def _dupe_ast(ast):
elif isinstance(ast, _ComparisonExpression):
# Change this to create a dupe, if we ever need to change simple
# comparison expressions as part of canonicalization.
# comparison expressions as part of normalization.
result = ast
else:
@ -147,9 +147,8 @@ class OrderDedupeTransformer(
ComparisonExpressionTransformer,
):
"""
Canonically order the children of all nodes in the AST. Because the
deduping algorithm is based on sorted data, this transformation also does
deduping.
Order the children of all nodes in the AST. Because the deduping algorithm
is based on sorted data, this transformation also does deduping.
E.g.:
A and A => A

View File

@ -234,7 +234,7 @@ class OrderDedupeTransformer(
ObservationExpressionTransformer,
):
"""
Canonically order AND/OR expressions, and dedupe ORs. E.g.:
Order AND/OR expressions, and dedupe ORs. E.g.:
A or A => A
B or A => A or B
@ -282,6 +282,7 @@ class AbsorptionTransformer(
A or (A and B) = A
A or (A followedby B) = A
A or (B followedby A) = A
Other variants do not hold for observation expressions.
"""
@ -435,28 +436,35 @@ class DNFTransformer(ObservationExpressionTransformer):
A and (B or C) => (A and B) or (A and C)
A followedby (B or C) => (A followedby B) or (A followedby C)
(A or B) followedby C => (A followedby C) or (B followedby C)
"""
def __transform(self, ast):
root_type = type(ast) # will be AST class for AND or FOLLOWEDBY
changed = False
or_children = []
other_children = []
for child in ast.operands:
if isinstance(child, OrObservationExpression):
or_children.append(child.operands)
else:
other_children.append(child)
# If no OR children, nothing to do
if any(
isinstance(child, OrObservationExpression)
for child in ast.operands
):
# When we distribute FOLLOWEDBY over OR, it is important to
# preserve the original FOLLOWEDBY order! We don't need to do that
# for AND, but we do it anyway because it doesn't hurt, and we can
# use the same code for both.
iterables = []
for child in ast.operands:
if isinstance(child, OrObservationExpression):
iterables.append(child.operands)
else:
iterables.append((child,))
if or_children:
root_type = type(ast) # will be AST class for AND or FOLLOWEDBY
distributed_children = [
root_type([
_dupe_ast(sub_ast) for sub_ast in itertools.chain(
other_children, prod_seq,
prod_seq,
)
])
for prod_seq in itertools.product(*or_children)
for prod_seq in itertools.product(*iterables)
]
# Need to recursively continue to distribute AND/FOLLOWEDBY over OR
@ -470,6 +478,7 @@ class DNFTransformer(ObservationExpressionTransformer):
else:
result = ast
changed = False
return result, changed
@ -480,11 +489,11 @@ class DNFTransformer(ObservationExpressionTransformer):
return self.__transform(ast)
class CanonicalizeComparisonExpressionsTransformer(
class NormalizeComparisonExpressionsTransformer(
ObservationExpressionTransformer,
):
"""
Canonicalize all comparison expressions.
Normalize all comparison expressions.
"""
def __init__(self):
comp_flatten = CFlattenTransformer()
@ -495,13 +504,13 @@ class CanonicalizeComparisonExpressionsTransformer(
comp_special = SpecialValueCanonicalization()
comp_dnf = CDNFTransformer()
self.__comp_canonicalize = ChainTransformer(
self.__comp_normalize = ChainTransformer(
comp_special, settle_simplify, comp_dnf, settle_simplify,
)
def transform_observation(self, ast):
comp_expr = ast.operand
canon_comp_expr, changed = self.__comp_canonicalize.transform(comp_expr)
ast.operand = canon_comp_expr
norm_comp_expr, changed = self.__comp_normalize.transform(comp_expr)
ast.operand = norm_comp_expr
return ast, changed

View File

@ -1,5 +1,5 @@
"""
Some simple comparison expression canonicalization functions.
Some simple comparison expression normalization functions.
"""
import socket

View File

@ -175,7 +175,14 @@ class ImmutableError(STIXError):
return msg.format(self)
class UnmodifiablePropertyError(STIXError):
class VersioningError(STIXError):
"""
Base class for object versioning errors.
"""
pass
class UnmodifiablePropertyError(VersioningError):
"""Attempted to modify an unmodifiable property of object when creating a new version."""
def __init__(self, unchangable_properties):
@ -187,6 +194,40 @@ class UnmodifiablePropertyError(STIXError):
return msg.format(", ".join(self.unchangable_properties))
class TypeNotVersionableError(VersioningError):
"""
An object couldn't be versioned because it lacked the versioning properties
and its type does not support them.
"""
def __init__(self, obj):
if isinstance(obj, dict):
type_name = obj.get("type")
else:
# try standard attribute of _STIXBase subclasses/instances
type_name = getattr(obj, "_type", None)
self.object = obj
msg = "Object type{}is not versionable. Try a dictionary or " \
"instance of an SDO or SRO class.".format(
" '{}' ".format(type_name) if type_name else " ",
)
super().__init__(msg)
class ObjectNotVersionableError(VersioningError):
"""
An object's type supports versioning, but the object couldn't be versioned
because it lacked sufficient versioning properties.
"""
def __init__(self, obj):
self.object = obj
msg = "Creating a new object version requires at least the 'created'" \
" property: " + str(obj)
super().__init__(msg)
class RevokeError(STIXError):
"""Attempted an operation on a revoked object."""

View File

@ -5,13 +5,9 @@ import importlib
import pkgutil
import re
import stix2
from .base import _DomainObject
from .exceptions import DuplicateRegistrationError, ParseError
from .utils import PREFIX_21_REGEX, _get_dict, get_class_hierarchy_names
STIX2_OBJ_MAPS = {}
from . import registry
from .exceptions import ParseError
from .utils import _get_dict, detect_spec_version
def parse(data, allow_custom=False, version=None):
@ -49,53 +45,6 @@ def parse(data, allow_custom=False, version=None):
return obj
def _detect_spec_version(stix_dict):
"""
Given a dict representing a STIX object, try to detect what spec version
it is likely to comply with.
:param stix_dict: A dict with some STIX content. Must at least have a
"type" property.
:return: A string in "vXX" format, where "XX" indicates the spec version,
e.g. "v20", "v21", etc.
"""
obj_type = stix_dict["type"]
if 'spec_version' in stix_dict:
# For STIX 2.0, applies to bundles only.
# For STIX 2.1+, applies to SCOs, SDOs, SROs, and markings only.
v = 'v' + stix_dict['spec_version'].replace('.', '')
elif "id" not in stix_dict:
# Only 2.0 SCOs don't have ID properties
v = "v20"
elif obj_type == 'bundle':
# Bundle without a spec_version property: must be 2.1. But to
# future-proof, use max version over all contained SCOs, with 2.1
# minimum.
v = max(
"v21",
max(
_detect_spec_version(obj) for obj in stix_dict["objects"]
),
)
elif obj_type in STIX2_OBJ_MAPS["v21"]["observables"]:
# Non-bundle object with an ID and without spec_version. Could be a
# 2.1 SCO or 2.0 SDO/SRO/marking. Check for 2.1 SCO...
v = "v21"
else:
# Not a 2.1 SCO; must be a 2.0 object.
v = "v20"
return v
def _get_extension_class(extension_uuid, version):
"""Retrieve a registered class Extension"""
v = 'v' + version.replace('.', '')
return STIX2_OBJ_MAPS[v]['extensions'].get(extension_uuid)
def dict_to_stix2(stix_dict, allow_custom=False, version=None):
"""convert dictionary to full python-stix2 object
@ -128,17 +77,14 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None):
if 'type' not in stix_dict:
raise ParseError("Can't parse object with no 'type' property: %s" % str(stix_dict))
if version:
# If the version argument was passed, override other approaches.
v = 'v' + version.replace('.', '')
else:
v = _detect_spec_version(stix_dict)
if not version:
version = detect_spec_version(stix_dict)
OBJ_MAP = dict(STIX2_OBJ_MAPS[v]['objects'], **STIX2_OBJ_MAPS[v]['observables'])
obj_type = stix_dict["type"]
obj_class = registry.class_for_type(obj_type, version, "objects") \
or registry.class_for_type(obj_type, version, "observables")
try:
obj_class = OBJ_MAP[stix_dict['type']]
except KeyError:
if not obj_class:
if allow_custom:
# flag allows for unknown custom objects too, but will not
# be parsed into STIX object, returned as is
@ -183,16 +129,12 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None):
obj['_valid_refs'] = _valid_refs or []
if version:
# If the version argument was passed, override other approaches.
v = 'v' + version.replace('.', '')
else:
v = _detect_spec_version(obj)
if not version:
version = detect_spec_version(obj)
try:
OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables']
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
except KeyError:
obj_type = obj["type"]
obj_class = registry.class_for_type(obj_type, version, "observables")
if not obj_class:
if allow_custom:
# flag allows for unknown custom objects too, but will not
# be parsed into STIX observable object, just returned as is
@ -203,199 +145,3 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None):
)
return obj_class(allow_custom=allow_custom, **obj)
def _register_object(new_type, version=stix2.DEFAULT_VERSION):
"""Register a custom STIX Object type.
Args:
new_type (class): A class to register in the Object map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Raises:
ValueError: If the class being registered wasn't created with the
@CustomObject decorator.
DuplicateRegistrationError: If the class has already been registered.
"""
if not issubclass(new_type, _DomainObject):
raise ValueError(
"'%s' must be created with the @CustomObject decorator." %
new_type.__name__,
)
properties = new_type._properties
if version == "2.1":
for prop_name, prop in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character" % prop_name)
if version:
v = 'v' + version.replace('.', '')
else:
# Use default version (latest) if no version was provided.
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
OBJ_MAP = STIX2_OBJ_MAPS[v]['objects']
if new_type._type in OBJ_MAP.keys():
raise DuplicateRegistrationError("STIX Object", new_type._type)
OBJ_MAP[new_type._type] = new_type
def _register_marking(new_marking, version=stix2.DEFAULT_VERSION):
"""Register a custom STIX Marking Definition type.
Args:
new_marking (class): A class to register in the Marking map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
mark_type = new_marking._type
properties = new_marking._properties
stix2.properties._validate_type(mark_type, version)
if version == "2.1":
for prop_name, prop_value in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
if version:
v = 'v' + version.replace('.', '')
else:
# Use default version (latest) if no version was provided.
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
OBJ_MAP_MARKING = STIX2_OBJ_MAPS[v]['markings']
if mark_type in OBJ_MAP_MARKING.keys():
raise DuplicateRegistrationError("STIX Marking", mark_type)
OBJ_MAP_MARKING[mark_type] = new_marking
def _register_observable(new_observable, version=stix2.DEFAULT_VERSION):
"""Register a custom STIX Cyber Observable type.
Args:
new_observable (class): A class to register in the Observables map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
properties = new_observable._properties
if version == "2.0":
# If using STIX2.0, check properties ending in "_ref/s" are ObjectReferenceProperties
for prop_name, prop in properties.items():
if prop_name.endswith('_ref') and ('ObjectReferenceProperty' not in get_class_hierarchy_names(prop)):
raise ValueError(
"'%s' is named like an object reference property but "
"is not an ObjectReferenceProperty." % prop_name,
)
elif (
prop_name.endswith('_refs') and (
'ListProperty' not in get_class_hierarchy_names(prop) or
'ObjectReferenceProperty' not in get_class_hierarchy_names(prop.contained)
)
):
raise ValueError(
"'%s' is named like an object reference list property but "
"is not a ListProperty containing ObjectReferenceProperty." % prop_name,
)
else:
# If using STIX2.1 (or newer...), check properties ending in "_ref/s" are ReferenceProperties
for prop_name, prop in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
elif prop_name.endswith('_ref') and ('ReferenceProperty' not in get_class_hierarchy_names(prop)):
raise ValueError(
"'%s' is named like a reference property but "
"is not a ReferenceProperty." % prop_name,
)
elif (
prop_name.endswith('_refs') and (
'ListProperty' not in get_class_hierarchy_names(prop) or
'ReferenceProperty' not in get_class_hierarchy_names(prop.contained)
)
):
raise ValueError(
"'%s' is named like a reference list property but "
"is not a ListProperty containing ReferenceProperty." % prop_name,
)
if version:
v = 'v' + version.replace('.', '')
else:
# Use default version (latest) if no version was provided.
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables']
if new_observable._type in OBJ_MAP_OBSERVABLE.keys():
raise DuplicateRegistrationError("Cyber Observable", new_observable._type)
OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable
def _register_extension(
new_extension, version=stix2.DEFAULT_VERSION,
):
"""Register a custom extension to any STIX Object type.
Args:
new_extension (class): A class to register in the Extensions map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1").
Defaults to the latest supported version.
"""
ext_type = new_extension._type
properties = new_extension._properties
stix2.properties._validate_type(ext_type, version)
if not new_extension._properties:
raise ValueError(
"Invalid extension: must define at least one property: " +
ext_type,
)
if version == "2.1":
if not (ext_type.endswith('-ext') or ext_type.startswith('extension-definition--')):
raise ValueError(
"Invalid extension type name '%s': must end with '-ext' or start with 'extension-definition--<UUID>'." %
ext_type,
)
for prop_name in properties.keys():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
v = 'v' + version.replace('.', '')
EXT_MAP = STIX2_OBJ_MAPS[v]['extensions']
if ext_type in EXT_MAP:
raise DuplicateRegistrationError("Extension", ext_type)
EXT_MAP[ext_type] = new_extension
def _collect_stix2_mappings():
"""Navigate the package once and retrieve all object mapping dicts for each
v2X package. Includes OBJ_MAP, OBJ_MAP_OBSERVABLE, EXT_MAP."""
if not STIX2_OBJ_MAPS:
top_level_module = importlib.import_module('stix2')
path = top_level_module.__path__
prefix = str(top_level_module.__name__) + '.'
for module_loader, name, is_pkg in pkgutil.walk_packages(path=path, prefix=prefix):
ver = name.split('.')[1]
if re.match(r'^stix2\.v2[0-9]$', name) and is_pkg:
mod = importlib.import_module(name, str(top_level_module.__name__))
STIX2_OBJ_MAPS[ver] = {}
STIX2_OBJ_MAPS[ver]['objects'] = mod.OBJ_MAP
STIX2_OBJ_MAPS[ver]['observables'] = mod.OBJ_MAP_OBSERVABLE
STIX2_OBJ_MAPS[ver]['extensions'] = mod.EXT_MAP
elif re.match(r'^stix2\.v2[0-9]\.common$', name) and is_pkg is False:
mod = importlib.import_module(name, str(top_level_module.__name__))
STIX2_OBJ_MAPS[ver]['markings'] = mod.OBJ_MAP_MARKING

View File

@ -17,10 +17,9 @@ from stix2patterns.v21.grammars.STIXPatternVisitor import \
STIXPatternVisitor as STIXPatternVisitor21
from stix2patterns.v21.pattern import Pattern as Pattern21
import stix2
from .patterns import *
from .patterns import _BooleanExpression
from .version import DEFAULT_VERSION
# flake8: noqa F405
@ -391,7 +390,7 @@ class STIXPatternVisitorForSTIX20(STIXPatternVisitorForSTIX2, STIXPatternVisitor
super(STIXPatternVisitor20, self).__init__()
def create_pattern_object(pattern, module_suffix="", module_name="", version=stix2.DEFAULT_VERSION):
def create_pattern_object(pattern, module_suffix="", module_name="", version=DEFAULT_VERSION):
"""
Create a STIX pattern AST from a pattern string.
"""

View File

@ -9,14 +9,13 @@ import uuid
from six import string_types, text_type
import stix2
from . import registry, version
from .base import _STIXBase
from .exceptions import (
CustomContentError, DictionaryKeyError, MissingPropertiesError,
MutuallyExclusivePropertiesError, STIXError,
)
from .parsing import STIX2_OBJ_MAPS, parse, parse_observable
from .parsing import parse, parse_observable
from .utils import _get_dict, get_class_hierarchy_names, parse_into_datetime
try:
@ -275,7 +274,7 @@ class StringProperty(Property):
class TypeProperty(Property):
def __init__(self, type, spec_version=stix2.DEFAULT_VERSION):
def __init__(self, type, spec_version=version.DEFAULT_VERSION):
_validate_type(type, spec_version)
self.spec_version = spec_version
super(TypeProperty, self).__init__(fixed=type)
@ -283,7 +282,7 @@ class TypeProperty(Property):
class IDProperty(Property):
def __init__(self, type, spec_version=stix2.DEFAULT_VERSION):
def __init__(self, type, spec_version=version.DEFAULT_VERSION):
self.required_prefix = type + "--"
self.spec_version = spec_version
super(IDProperty, self).__init__()
@ -382,7 +381,7 @@ class TimestampProperty(Property):
class DictionaryProperty(Property):
def __init__(self, spec_version=stix2.DEFAULT_VERSION, **kwargs):
def __init__(self, spec_version=version.DEFAULT_VERSION, **kwargs):
self.spec_version = spec_version
super(DictionaryProperty, self).__init__(**kwargs)
@ -471,7 +470,7 @@ class HexProperty(Property):
class ReferenceProperty(Property):
def __init__(self, valid_types=None, invalid_types=None, spec_version=stix2.DEFAULT_VERSION, **kwargs):
def __init__(self, valid_types=None, invalid_types=None, spec_version=version.DEFAULT_VERSION, **kwargs):
"""
references sometimes must be to a specific object type
"""
@ -503,14 +502,14 @@ class ReferenceProperty(Property):
possible_prefix = value[:value.index('--')]
if self.valid_types:
ref_valid_types = enumerate_types(self.valid_types, 'v' + self.spec_version.replace(".", ""))
ref_valid_types = enumerate_types(self.valid_types, self.spec_version)
if possible_prefix in ref_valid_types:
required_prefix = possible_prefix
else:
raise ValueError("The type-specifying prefix '%s' for this property is not valid" % (possible_prefix))
elif self.invalid_types:
ref_invalid_types = enumerate_types(self.invalid_types, 'v' + self.spec_version.replace(".", ""))
ref_invalid_types = enumerate_types(self.invalid_types, self.spec_version)
if possible_prefix not in ref_invalid_types:
required_prefix = possible_prefix
@ -536,10 +535,10 @@ def enumerate_types(types, spec_version):
if "SDO" in types:
return_types.remove("SDO")
return_types += STIX2_OBJ_MAPS[spec_version]['objects'].keys()
return_types += registry.STIX2_OBJ_MAPS[spec_version]['objects'].keys()
if "SCO" in types:
return_types.remove("SCO")
return_types += STIX2_OBJ_MAPS[spec_version]['observables'].keys()
return_types += registry.STIX2_OBJ_MAPS[spec_version]['observables'].keys()
if "SRO" in types:
return_types.remove("SRO")
return_types += ['relationship', 'sighting']
@ -605,7 +604,7 @@ class ObservableProperty(Property):
"""Property for holding Cyber Observable Objects.
"""
def __init__(self, spec_version=stix2.DEFAULT_VERSION, allow_custom=False, *args, **kwargs):
def __init__(self, spec_version=version.DEFAULT_VERSION, allow_custom=False, *args, **kwargs):
self.allow_custom = allow_custom
self.spec_version = spec_version
super(ObservableProperty, self).__init__(*args, **kwargs)
@ -640,7 +639,7 @@ class ExtensionsProperty(DictionaryProperty):
"""Property for representing extensions on Observable objects.
"""
def __init__(self, spec_version=stix2.DEFAULT_VERSION, allow_custom=False, required=False):
def __init__(self, spec_version=version.DEFAULT_VERSION, allow_custom=False, required=False):
self.allow_custom = allow_custom
super(ExtensionsProperty, self).__init__(spec_version=spec_version, required=required)
@ -654,9 +653,7 @@ class ExtensionsProperty(DictionaryProperty):
except ValueError:
raise ValueError("The extensions property must contain a dictionary")
v = 'v' + self.spec_version.replace('.', '')
extension_type_map = STIX2_OBJ_MAPS[v].get('extensions', {})
extension_type_map = registry.STIX2_OBJ_MAPS[self.spec_version].get('extensions', {})
for key, subvalue in dictified.items():
if key in extension_type_map:
cls = extension_type_map[key]
@ -684,7 +681,7 @@ class ExtensionsProperty(DictionaryProperty):
class STIXObjectProperty(Property):
def __init__(self, spec_version=stix2.DEFAULT_VERSION, allow_custom=False, *args, **kwargs):
def __init__(self, spec_version=version.DEFAULT_VERSION, allow_custom=False, *args, **kwargs):
self.allow_custom = allow_custom
self.spec_version = spec_version
super(STIXObjectProperty, self).__init__(*args, **kwargs)

199
stix2/registration.py Normal file
View File

@ -0,0 +1,199 @@
import re
from . import registry
from .base import _DomainObject, _Observable
from .exceptions import DuplicateRegistrationError
from .properties import _validate_type
from .utils import PREFIX_21_REGEX, get_class_hierarchy_names
from .version import DEFAULT_VERSION
def _register_object(new_type, version=DEFAULT_VERSION):
"""Register a custom STIX Object type.
Args:
new_type (class): A class to register in the Object map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Raises:
ValueError: If the class being registered wasn't created with the
@CustomObject decorator.
DuplicateRegistrationError: If the class has already been registered.
"""
if not issubclass(new_type, _DomainObject):
raise ValueError(
"'%s' must be created with the @CustomObject decorator." %
new_type.__name__,
)
properties = new_type._properties
if not version:
version = DEFAULT_VERSION
if version == "2.1":
for prop_name, prop in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character" % prop_name)
OBJ_MAP = registry.STIX2_OBJ_MAPS[version]['objects']
if new_type._type in OBJ_MAP.keys():
raise DuplicateRegistrationError("STIX Object", new_type._type)
OBJ_MAP[new_type._type] = new_type
def _register_marking(new_marking, version=DEFAULT_VERSION):
"""Register a custom STIX Marking Definition type.
Args:
new_marking (class): A class to register in the Marking map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
mark_type = new_marking._type
properties = new_marking._properties
if not version:
version = DEFAULT_VERSION
_validate_type(mark_type, version)
if version == "2.1":
for prop_name, prop_value in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
OBJ_MAP_MARKING = registry.STIX2_OBJ_MAPS[version]['markings']
if mark_type in OBJ_MAP_MARKING.keys():
raise DuplicateRegistrationError("STIX Marking", mark_type)
OBJ_MAP_MARKING[mark_type] = new_marking
def _register_observable(new_observable, version=DEFAULT_VERSION):
"""Register a custom STIX Cyber Observable type.
Args:
new_observable (class): A class to register in the Observables map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
properties = new_observable._properties
if not version:
version = DEFAULT_VERSION
if version == "2.0":
# If using STIX2.0, check properties ending in "_ref/s" are ObjectReferenceProperties
for prop_name, prop in properties.items():
if prop_name.endswith('_ref') and ('ObjectReferenceProperty' not in get_class_hierarchy_names(prop)):
raise ValueError(
"'%s' is named like an object reference property but "
"is not an ObjectReferenceProperty." % prop_name,
)
elif (
prop_name.endswith('_refs') and (
'ListProperty' not in get_class_hierarchy_names(prop) or
'ObjectReferenceProperty' not in get_class_hierarchy_names(prop.contained)
)
):
raise ValueError(
"'%s' is named like an object reference list property but "
"is not a ListProperty containing ObjectReferenceProperty." % prop_name,
)
else:
# If using STIX2.1 (or newer...), check properties ending in "_ref/s" are ReferenceProperties
for prop_name, prop in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
elif prop_name.endswith('_ref') and ('ReferenceProperty' not in get_class_hierarchy_names(prop)):
raise ValueError(
"'%s' is named like a reference property but "
"is not a ReferenceProperty." % prop_name,
)
elif (
prop_name.endswith('_refs') and (
'ListProperty' not in get_class_hierarchy_names(prop) or
'ReferenceProperty' not in get_class_hierarchy_names(prop.contained)
)
):
raise ValueError(
"'%s' is named like a reference list property but "
"is not a ListProperty containing ReferenceProperty." % prop_name,
)
OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[version]['observables']
if new_observable._type in OBJ_MAP_OBSERVABLE.keys():
raise DuplicateRegistrationError("Cyber Observable", new_observable._type)
OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable
def _register_observable_extension(
observable, new_extension, version=DEFAULT_VERSION,
):
"""Register a custom extension to a STIX Cyber Observable type.
Args:
observable: An observable class or instance
new_extension (class): A class to register in the Observables
Extensions map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1").
Defaults to the latest supported version.
"""
obs_class = observable if isinstance(observable, type) else \
type(observable)
ext_type = new_extension._type
properties = new_extension._properties
if not issubclass(obs_class, _Observable):
raise ValueError("'observable' must be a valid Observable class!")
_validate_type(ext_type, version)
if not new_extension._properties:
raise ValueError(
"Invalid extension: must define at least one property: " +
ext_type,
)
if version == "2.1":
if not ext_type.endswith('-ext'):
raise ValueError(
"Invalid extension type name '%s': must end with '-ext'." %
ext_type,
)
for prop_name, prop_value in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
try:
observable_type = observable._type
except AttributeError:
raise ValueError(
"Unknown observable type. Custom observables must be "
"created with the @CustomObservable decorator.",
)
OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[version]['observables']
EXT_MAP = registry.STIX2_OBJ_MAPS[version]['observable-extensions']
try:
if ext_type in EXT_MAP[observable_type].keys():
raise DuplicateRegistrationError("Observable Extension", ext_type)
EXT_MAP[observable_type][ext_type] = new_extension
except KeyError:
if observable_type not in OBJ_MAP_OBSERVABLE:
raise ValueError(
"Unknown observable type '%s'. Custom observables "
"must be created with the @CustomObservable decorator."
% observable_type,
)
else:
EXT_MAP[observable_type] = {ext_type: new_extension}

80
stix2/registry.py Normal file
View File

@ -0,0 +1,80 @@
import importlib
import pkgutil
import re
# Collects information on which classes implement which STIX types, for the
# various STIX spec versions.
STIX2_OBJ_MAPS = {}
def _stix_vid_to_version(stix_vid):
"""
Convert a python package name representing a STIX version in the form "vXX"
to the dotted style used in the public APIs of this library, "X.X".
:param stix_vid: A package name in the form "vXX"
:return: A STIX version in dotted style
"""
assert len(stix_vid) >= 3
stix_version = stix_vid[1] + "." + stix_vid[2:]
return stix_version
def _collect_stix2_mappings():
"""Navigate the package once and retrieve all object mapping dicts for each
v2X package. Includes OBJ_MAP, OBJ_MAP_OBSERVABLE, EXT_MAP."""
if not STIX2_OBJ_MAPS:
top_level_module = importlib.import_module('stix2')
path = top_level_module.__path__
prefix = str(top_level_module.__name__) + '.'
for module_loader, name, is_pkg in pkgutil.walk_packages(path=path, prefix=prefix):
stix_vid = name.split('.')[1]
if re.match(r'^stix2\.v2[0-9]$', name) and is_pkg:
ver = _stix_vid_to_version(stix_vid)
mod = importlib.import_module(name, str(top_level_module.__name__))
STIX2_OBJ_MAPS[ver] = {}
STIX2_OBJ_MAPS[ver]['objects'] = mod.OBJ_MAP
STIX2_OBJ_MAPS[ver]['observables'] = mod.OBJ_MAP_OBSERVABLE
STIX2_OBJ_MAPS[ver]['observable-extensions'] = mod.EXT_MAP
elif re.match(r'^stix2\.v2[0-9]\.common$', name) and is_pkg is False:
ver = _stix_vid_to_version(stix_vid)
mod = importlib.import_module(name, str(top_level_module.__name__))
STIX2_OBJ_MAPS[ver]['markings'] = mod.OBJ_MAP_MARKING
def class_for_type(stix_type, stix_version, category=None):
"""
Get the registered class which implements a particular STIX type for a
particular STIX version.
:param stix_type: A STIX type as a string
:param stix_version: A STIX version as a string, e.g. "2.1"
:param category: An optional "category" value, which is just used directly
as a second key after the STIX version, and depends on how the types
are internally categorized. This would be useful if the same STIX type
is used to mean two different things within the same STIX version. So
it's unlikely to be necessary. Pass None to just search all the
categories and return the first class found.
:return: A registered python class which implements the given STIX type, or
None if one is not found.
"""
cls = None
cat_map = STIX2_OBJ_MAPS.get(stix_version)
if cat_map:
if category:
class_map = cat_map.get(category)
if class_map:
cls = class_map.get(stix_type)
else:
cls = cat_map["objects"].get(stix_type) \
or cat_map["observables"].get(stix_type) \
or cat_map["markings"].get(stix_type)
# Left "observable-extensions" out; it has a different
# substructure. A version->category->type lookup would result
# in another map, not a class. So it doesn't fit the pattern.
return cls

View File

@ -223,6 +223,10 @@ def test_obs_absorb_not_equivalent(patt1, patt2):
"([a:b=1] OR [a:b=2]) FOLLOWEDBY ([a:b=3] OR [a:b=4])",
"([a:b=1] FOLLOWEDBY [a:b=3]) OR ([a:b=1] FOLLOWEDBY [a:b=4]) OR ([a:b=2] FOLLOWEDBY [a:b=3]) OR ([a:b=2] FOLLOWEDBY [a:b=4])",
),
(
"([a:b=1] OR [a:b=2]) FOLLOWEDBY ([a:b=5] AND [a:b=6])",
"([a:b=1] FOLLOWEDBY ([a:b=5] AND [a:b=6])) OR ([a:b=2] FOLLOWEDBY ([a:b=5] AND [a:b=6]))",
),
],
)
def test_obs_dnf_equivalent(patt1, patt2):
@ -243,6 +247,10 @@ def test_obs_dnf_equivalent(patt1, patt2):
"[a:b=1] WITHIN 2 SECONDS",
"[a:b=1] REPEATS 2 TIMES",
),
(
"[a:b=1] FOLLOWEDBY ([a:b=2] OR [a:b=3])",
"([a:b=2] FOLLOWEDBY [a:b=1]) OR ([a:b=1] FOLLOWEDBY [a:b=3])",
),
],
)
def test_obs_not_equivalent(patt1, patt2):

View File

@ -2,7 +2,7 @@ from __future__ import unicode_literals
import pytest
from stix2.parsing import _detect_spec_version
from stix2.utils import detect_spec_version
@pytest.mark.parametrize(
@ -17,7 +17,7 @@ from stix2.parsing import _detect_spec_version
"name": "alice",
"identity_class": "individual",
},
"v20",
"2.0",
),
(
{
@ -29,14 +29,14 @@ from stix2.parsing import _detect_spec_version
"target_ref": "identity--ba18dde2-56d3-4a34-aa0b-fc56f5be568f",
"relationship_type": "targets",
},
"v20",
"2.0",
),
(
{
"type": "file",
"name": "notes.txt",
},
"v20",
"2.0",
),
(
{
@ -48,7 +48,7 @@ from stix2.parsing import _detect_spec_version
"statement": "Copyright (c) ACME Corp.",
},
},
"v20",
"2.0",
),
(
{
@ -75,7 +75,7 @@ from stix2.parsing import _detect_spec_version
},
],
},
"v20",
"2.0",
),
# STIX 2.1 examples
(
@ -87,7 +87,7 @@ from stix2.parsing import _detect_spec_version
"modified": "2001-07-01T09:33:17.000Z",
"name": "alice",
},
"v21",
"2.1",
),
(
{
@ -100,7 +100,7 @@ from stix2.parsing import _detect_spec_version
"target_ref": "identity--ba18dde2-56d3-4a34-aa0b-fc56f5be568f",
"relationship_type": "targets",
},
"v21",
"2.1",
),
(
{
@ -109,7 +109,7 @@ from stix2.parsing import _detect_spec_version
"spec_version": "2.1",
"name": "notes.txt",
},
"v21",
"2.1",
),
(
{
@ -117,7 +117,7 @@ from stix2.parsing import _detect_spec_version
"id": "file--5eef3404-6a94-4db3-9a1a-5684cbea0dfe",
"name": "notes.txt",
},
"v21",
"2.1",
),
(
{
@ -131,7 +131,7 @@ from stix2.parsing import _detect_spec_version
"tlp": "green",
},
},
"v21",
"2.1",
),
(
{
@ -153,7 +153,7 @@ from stix2.parsing import _detect_spec_version
},
],
},
"v21",
"2.1",
),
# Mixed spec examples
(
@ -180,7 +180,7 @@ from stix2.parsing import _detect_spec_version
},
],
},
"v21",
"2.1",
),
(
{
@ -202,11 +202,11 @@ from stix2.parsing import _detect_spec_version
},
],
},
"v21",
"2.1",
),
],
)
def test_spec_version_detect(obj_dict, expected_ver):
detected_ver = _detect_spec_version(obj_dict)
detected_ver = detect_spec_version(obj_dict)
assert detected_ver == expected_ver

View File

@ -0,0 +1,262 @@
import pytest
import stix2.utils
###
# Tests using types/behaviors common to STIX 2.0 and 2.1.
###
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
@pytest.mark.parametrize(
"type_", [
"attack-pattern",
"campaign",
"course-of-action",
"identity",
"indicator",
"intrusion-set",
"malware",
"observed-data",
"report",
"threat-actor",
"tool",
"vulnerability",
],
)
def test_is_sdo(type_, stix_version):
assert stix2.utils.is_sdo(type_, stix_version)
id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert stix2.utils.is_sdo(id_, stix_version)
assert stix2.utils.is_stix_type(
type_, stix_version, stix2.utils.STIXTypeClass.SDO,
)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
@pytest.mark.parametrize(
"type_", [
"relationship",
"sighting",
"marking-definition",
"bundle",
"language-content",
"ipv4-addr",
"foo",
],
)
def test_is_not_sdo(type_, stix_version):
assert not stix2.utils.is_sdo(type_, stix_version)
id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert not stix2.utils.is_sdo(id_, stix_version)
d = {
"type": type_,
}
assert not stix2.utils.is_sdo(d, stix_version)
assert not stix2.utils.is_stix_type(
type_, stix_version, stix2.utils.STIXTypeClass.SDO,
)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
@pytest.mark.parametrize(
"type_", [
"artifact",
"autonomous-system",
"directory",
"domain-name",
"email-addr",
"email-message",
"file",
"ipv4-addr",
"ipv6-addr",
"mac-addr",
"mutex",
"network-traffic",
"process",
"software",
"url",
"user-account",
"windows-registry-key",
"x509-certificate",
],
)
def test_is_sco(type_, stix_version):
assert stix2.utils.is_sco(type_, stix_version)
id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert stix2.utils.is_sco(id_, stix_version)
assert stix2.utils.is_stix_type(
type_, stix_version, stix2.utils.STIXTypeClass.SCO,
)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
@pytest.mark.parametrize(
"type_", [
"identity",
"sighting",
"marking-definition",
"bundle",
"language-content",
"foo",
],
)
def test_is_not_sco(type_, stix_version):
assert not stix2.utils.is_sco(type_, stix_version)
id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert not stix2.utils.is_sco(id_, stix_version)
d = {
"type": type_,
}
assert not stix2.utils.is_sco(d, stix_version)
assert not stix2.utils.is_stix_type(
type_, stix_version, stix2.utils.STIXTypeClass.SCO,
)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
@pytest.mark.parametrize(
"type_", [
"relationship",
"sighting",
],
)
def test_is_sro(type_, stix_version):
assert stix2.utils.is_sro(type_, stix_version)
id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert stix2.utils.is_sro(id_, stix_version)
assert stix2.utils.is_stix_type(
type_, stix_version, stix2.utils.STIXTypeClass.SRO,
)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
@pytest.mark.parametrize(
"type_", [
"identity",
"marking-definition",
"bundle",
"language-content",
"ipv4-addr",
"foo",
],
)
def test_is_not_sro(type_, stix_version):
assert not stix2.utils.is_sro(type_, stix_version)
id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert not stix2.utils.is_sro(id_, stix_version)
d = {
"type": type_,
}
assert not stix2.utils.is_sro(d, stix_version)
assert not stix2.utils.is_stix_type(
type_, stix_version, stix2.utils.STIXTypeClass.SRO,
)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
def test_is_marking(stix_version):
assert stix2.utils.is_marking("marking-definition", stix_version)
id_ = "marking-definition--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert stix2.utils.is_marking(id_, stix_version)
assert stix2.utils.is_stix_type(
"marking-definition", stix_version, "marking-definition",
)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
@pytest.mark.parametrize(
"type_", [
"identity",
"bundle",
"language-content",
"ipv4-addr",
"foo",
],
)
def test_is_not_marking(type_, stix_version):
assert not stix2.utils.is_marking(type_, stix_version)
id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert not stix2.utils.is_marking(id_, stix_version)
d = {
"type": type_,
}
assert not stix2.utils.is_marking(d, stix_version)
assert not stix2.utils.is_stix_type(
type_, stix_version, "marking-definition",
)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
@pytest.mark.parametrize(
"type_", [
"identity",
"relationship",
"sighting",
"marking-definition",
"bundle",
"ipv4-addr",
],
)
def test_is_object(type_, stix_version):
assert stix2.utils.is_object(type_, stix_version)
id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert stix2.utils.is_object(id_, stix_version)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
def test_is_not_object(stix_version):
assert not stix2.utils.is_object("foo", stix_version)
id_ = "foo--a12fa04c-6586-4128-8d1a-cfe0d1c081f5"
assert not stix2.utils.is_object(id_, stix_version)
d = {
"type": "foo",
}
assert not stix2.utils.is_object(d, stix_version)
@pytest.mark.parametrize("stix_version", ["2.0", "2.1"])
def test_is_stix_type(stix_version):
assert not stix2.utils.is_stix_type(
"foo", stix_version, stix2.utils.STIXTypeClass.SDO, "foo",
)
assert stix2.utils.is_stix_type(
"bundle", stix_version, "foo", "bundle",
)
assert stix2.utils.is_stix_type(
"identity", stix_version,
stix2.utils.STIXTypeClass.SDO,
stix2.utils.STIXTypeClass.SRO,
)
assert stix2.utils.is_stix_type(
"software", stix_version,
stix2.utils.STIXTypeClass.SDO,
stix2.utils.STIXTypeClass.SCO,
)

View File

@ -1,7 +1,9 @@
import pytest
import stix2
from stix2 import parsing
import stix2.parsing
import stix2.registration
import stix2.registry
import stix2.v20
from ...exceptions import DuplicateRegistrationError, InvalidValueError
@ -939,7 +941,7 @@ def test_register_custom_object():
_type = 'awesome-object'
with pytest.raises(ValueError):
stix2.parsing._register_object(CustomObject2, version="2.0")
stix2.registration._register_object(CustomObject2, version="2.0")
def test_extension_property_location():
@ -999,10 +1001,9 @@ def test_register_custom_object_with_version():
"id": "x-new-type-2--00000000-0000-4000-8000-000000000007",
}
cust_obj_1 = parsing.dict_to_stix2(custom_obj_1, version='2.0')
v = 'v20'
cust_obj_1 = stix2.parsing.dict_to_stix2(custom_obj_1, version='2.0')
assert cust_obj_1.type in parsing.STIX2_OBJ_MAPS[v]['objects']
assert cust_obj_1.type in stix2.registry.STIX2_OBJ_MAPS['2.0']['objects']
# spec_version is not in STIX 2.0, and is required in 2.1, so this
# suffices as a test for a STIX 2.0 object.
assert "spec_version" not in cust_obj_1
@ -1032,9 +1033,8 @@ class NewObservable2(object):
def test_register_observable_with_version():
custom_obs = NewObservable2(property1="Test Observable")
v = 'v20'
assert custom_obs.type in parsing.STIX2_OBJ_MAPS[v]['observables']
assert custom_obs.type in stix2.registry.STIX2_OBJ_MAPS['2.0']['observables']
def test_register_duplicate_observable_with_version():
@ -1057,10 +1057,9 @@ def test_register_marking_with_version():
)
class NewObj2():
pass
v = 'v20'
no = NewObj2(property1='something')
assert no._type in parsing.STIX2_OBJ_MAPS[v]['markings']
assert no._type in stix2.registry.STIX2_OBJ_MAPS['2.0']['markings']
def test_register_observable_extension_with_version():
@ -1072,10 +1071,9 @@ def test_register_observable_extension_with_version():
class SomeCustomExtension2:
pass
v = 'v20'
example = SomeCustomExtension2(keys='test123')
assert example._type in parsing.STIX2_OBJ_MAPS[v]['extensions']
assert example._type in stix2.registry.STIX2_OBJ_MAPS['2.0']['extensions']
def test_register_duplicate_observable_extension():

View File

@ -2,8 +2,7 @@ from collections import OrderedDict
import pytest
import stix2
from stix2 import exceptions, parsing
from stix2 import DEFAULT_VERSION, exceptions, parsing, registration, registry
BUNDLE = {
"type": "bundle",
@ -59,7 +58,7 @@ def test_parse_observable_with_version():
assert v in str(obs_obj.__class__)
@pytest.mark.xfail(reason="The default version is no longer 2.0", condition=stix2.DEFAULT_VERSION != "2.0")
@pytest.mark.xfail(reason="The default version is no longer 2.0", condition=DEFAULT_VERSION != "2.0")
def test_parse_observable_with_no_version():
observable = {"type": "file", "name": "foo.exe"}
obs_obj = parsing.parse_observable(observable)
@ -73,8 +72,7 @@ def test_register_marking_with_version():
_type = 'x-new-marking1'
_properties = OrderedDict()
parsing._register_marking(NewMarking1, version='2.0')
v = 'v20'
registration._register_marking(NewMarking1, version='2.0')
assert NewMarking1._type in parsing.STIX2_OBJ_MAPS[v]['markings']
assert v in str(parsing.STIX2_OBJ_MAPS[v]['markings'][NewMarking1._type])
assert NewMarking1._type in registry.STIX2_OBJ_MAPS['2.0']['markings']
assert 'v20' in str(registry.STIX2_OBJ_MAPS['2.0']['markings'][NewMarking1._type])

View File

@ -237,3 +237,146 @@ def test_find_property_index(object, tuple_to_find, expected_index):
)
def test_iterate_over_values(dict_value, tuple_to_find, expected_index):
assert stix2.serialization._find_property_in_seq(dict_value.values(), *tuple_to_find) == expected_index
@pytest.mark.parametrize(
"type_", [
"attack-pattern",
"campaign",
"course-of-action",
"identity",
"indicator",
"intrusion-set",
"malware",
"observed-data",
"report",
"threat-actor",
"tool",
"vulnerability",
],
)
def test_is_sdo_dict(type_):
d = {
"type": type_,
}
assert stix2.utils.is_sdo(d, "2.0")
@pytest.mark.parametrize(
"dict_", [
{"type": "software", "spec_version": "2.1"},
{"type": "software"},
{"type": "identity", "spec_version": "2.1"},
{"type": "marking-definition", "spec_version": "2.1"},
{"type": "marking-definition"},
{"type": "bundle", "spec_version": "2.1"},
{"type": "bundle"},
{"type": "language-content", "spec_version": "2.1"},
{"type": "language-content"},
{"type": "relationship", "spec_version": "2.1"},
{"type": "relationship"},
{"type": "foo", "spec_version": "2.1"},
{"type": "foo"},
],
)
def test_is_not_sdo_dict(dict_):
assert not stix2.utils.is_sdo(dict_, "2.0")
def test_is_sco_dict():
d = {
"type": "file",
}
assert stix2.utils.is_sco(d, "2.0")
@pytest.mark.parametrize(
"dict_", [
{"type": "identity"},
{"type": "identity", "spec_version": "2.1"},
{"type": "software", "spec_version": "2.1"},
{"type": "marking-definition", "spec_version": "2.1"},
{"type": "marking-definition"},
{"type": "bundle", "spec_version": "2.1"},
{"type": "bundle"},
{"type": "language-content", "spec_version": "2.1"},
{"type": "language-content"},
{"type": "relationship", "spec_version": "2.1"},
{"type": "relationship"},
{"type": "foo", "spec_version": "2.1"},
{"type": "foo"},
],
)
def test_is_not_sco_dict(dict_):
assert not stix2.utils.is_sco(dict_, "2.0")
@pytest.mark.parametrize(
"dict_", [
{"type": "relationship"},
{"type": "sighting"},
],
)
def test_is_sro_dict(dict_):
assert stix2.utils.is_sro(dict_, "2.0")
@pytest.mark.parametrize(
"dict_", [
{"type": "identity", "spec_version": "2.1"},
{"type": "identity"},
{"type": "software", "spec_version": "2.1"},
{"type": "software"},
{"type": "marking-definition", "spec_version": "2.1"},
{"type": "marking-definition"},
{"type": "bundle", "spec_version": "2.1"},
{"type": "bundle"},
{"type": "language-content", "spec_version": "2.1"},
{"type": "language-content"},
{"type": "relationship", "spec_version": "2.1"},
{"type": "sighting", "spec_version": "2.1"},
{"type": "foo", "spec_version": "2.1"},
{"type": "foo"},
],
)
def test_is_not_sro_dict(dict_):
assert not stix2.utils.is_sro(dict_, "2.0")
@pytest.mark.parametrize(
"dict_", [
{"type": "identity"},
{"type": "software"},
{"type": "marking-definition"},
{
"type": "bundle",
"id": "bundle--8f431680-6278-4767-ba43-5edb682d7086",
"spec_version": "2.0",
"objects": [
{"type": "identity"},
{"type": "software"},
{"type": "marking-definition"},
],
},
],
)
def test_is_object_dict(dict_):
assert stix2.utils.is_object(dict_, "2.0")
@pytest.mark.parametrize(
"dict_", [
{"type": "identity", "spec_version": "2.1"},
{"type": "software", "spec_version": "2.1"},
{"type": "marking-definition", "spec_version": "2.1"},
{"type": "bundle", "spec_version": "2.1"},
{"type": "language-content", "spec_version": "2.1"},
{"type": "relationship", "spec_version": "2.1"},
{"type": "sighting", "spec_version": "2.1"},
{"type": "foo", "spec_version": "2.1"},
{"type": "foo"},
],
)
def test_is_not_object_dict(dict_):
assert not stix2.utils.is_object(dict_, "2.0")

View File

@ -170,6 +170,60 @@ def test_versioning_error_dict_bad_modified_value():
"but have the same id and modified timestamp do not have defined consumer behavior."
def test_versioning_dict_unregistered_no_modified():
d = {
"type": "not-registered",
"id": "not-registered--4da54535-47b7-468c-88fa-d13b04033c4b",
"created": "1995-04-07T15:37:48.178Z",
}
new_d = stix2.versioning.new_version(d)
assert "modified" in new_d
assert new_d["modified"] > stix2.utils.parse_into_datetime(d["created"])
new_d = stix2.versioning.new_version(d, modified="1996-11-20T01:19:29.134Z")
assert new_d["modified"] == "1996-11-20T01:19:29.134Z"
def test_versioning_dict_unregistered_unversionable():
d = {
"type": "not-registered",
"id": "not-registered--4da54535-47b7-468c-88fa-d13b04033c4b",
"modified": "1995-04-07T15:37:48.178Z",
}
with pytest.raises(stix2.exceptions.ObjectNotVersionableError):
stix2.versioning.new_version(d)
with pytest.raises(stix2.exceptions.ObjectNotVersionableError):
# should fail even if we provide a "created" kwarg.
stix2.versioning.new_version(d, created="1985-06-29T06:09:51.157Z")
def test_versioning_custom_object():
@stix2.v20.CustomObject(
"x-versionable-all-optional-20", [
("created", stix2.properties.TimestampProperty()),
("modified", stix2.properties.TimestampProperty()),
("revoked", stix2.properties.BooleanProperty()),
],
)
class CustomSDO:
pass
obj = CustomSDO(created="1990-12-18T17:56:11.346234Z")
new_obj = stix2.versioning.new_version(obj)
assert "modified" in new_obj
assert new_obj["modified"] > new_obj["created"]
obj = CustomSDO()
with pytest.raises(stix2.exceptions.ObjectNotVersionableError):
# fails due to insufficient properties on the object, even though its
# type supports versioning.
stix2.versioning.new_version(obj)
def test_versioning_error_dict_no_modified_value():
campaign_v1 = {
'type': 'campaign',
@ -184,10 +238,10 @@ def test_versioning_error_dict_no_modified_value():
def test_making_new_version_invalid_cls():
campaign_v1 = "This is a campaign."
with pytest.raises(ValueError) as excinfo:
with pytest.raises(stix2.exceptions.TypeNotVersionableError) as excinfo:
stix2.versioning.new_version(campaign_v1, name="fred")
assert 'cannot create new version of object of this type' in str(excinfo.value)
assert excinfo.value.object is campaign_v1
def test_revoke_dict():
@ -206,7 +260,7 @@ def test_revoke_dict():
def test_revoke_unversionable():
sco = stix2.v20.File(name="data.txt")
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
sco.revoke()
@ -277,7 +331,7 @@ def test_version_unversionable_dict():
"name": "data.txt",
}
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(f)
@ -295,10 +349,10 @@ def test_version_sco_with_modified():
"modified": "1991-05-13T19:24:57Z",
}
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(file_sco, name="newname.txt")
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.revoke(file_sco)
file_sco_obj = stix2.v20.File(
@ -307,10 +361,10 @@ def test_version_sco_with_modified():
modified="1991-05-13T19:24:57Z",
)
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(file_sco_obj, name="newname.txt")
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.revoke(file_sco_obj)
@ -337,6 +391,45 @@ def test_version_sco_with_custom():
revoked_obj = stix2.versioning.revoke(new_file_sco_obj)
assert revoked_obj.revoked
# Same thing with a dict
d = {
"type": "file",
"name": "data.txt",
"created": "1973-11-23T02:31:37Z",
"modified": "1991-05-13T19:24:57Z",
"revoked": False,
}
new_d = stix2.versioning.new_version(d, size=1234)
assert new_d["size"] == 1234
revoked_d = stix2.versioning.revoke(new_d)
assert revoked_d["revoked"]
def test_version_marking():
m = stix2.v20.MarkingDefinition(
created="1982-11-29T12:20:13.723Z",
definition_type="statement",
definition={"statement": "Copyright (c) 2000-2020 Acme Corp"},
)
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(m)
m = {
"type": "marking-definition",
"id": "marking-definition--2a9f3f6e-5cbd-423b-a40d-02aefd29e612",
"created": "1982-11-29T12:20:13.723Z",
"definition_type": "statement",
"definition": {
"statement": "Copyright (c) 2000-2020 Acme Corp",
},
}
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(m)
def test_version_disable_custom():
m = stix2.v20.Malware(

View File

@ -4,6 +4,8 @@ import pytest
import stix2
import stix2.base
import stix2.registration
import stix2.registry
import stix2.v21
from ...exceptions import DuplicateRegistrationError, InvalidValueError
@ -1157,7 +1159,7 @@ def test_register_custom_object():
_type = 'awesome-object'
with pytest.raises(ValueError) as excinfo:
stix2.parsing._register_object(CustomObject2, version="2.1")
stix2.registration._register_object(CustomObject2, version="2.1")
assert '@CustomObject decorator' in str(excinfo)
@ -1221,9 +1223,8 @@ def test_register_custom_object_with_version():
}
cust_obj_1 = stix2.parsing.dict_to_stix2(custom_obj_1, version='2.1')
v = 'v21'
assert cust_obj_1.type in stix2.parsing.STIX2_OBJ_MAPS[v]['objects']
assert cust_obj_1.type in stix2.registry.STIX2_OBJ_MAPS['2.1']['objects']
assert cust_obj_1.spec_version == "2.1"
@ -1251,9 +1252,8 @@ class NewObservable3(object):
def test_register_observable():
custom_obs = NewObservable3(property1="Test Observable")
v = 'v21'
assert custom_obs.type in stix2.parsing.STIX2_OBJ_MAPS[v]['observables']
assert custom_obs.type in stix2.registry.STIX2_OBJ_MAPS['2.1']['observables']
def test_register_duplicate_observable():
@ -1279,10 +1279,9 @@ def test_register_observable_custom_extension():
pass
example = NewExtension2(property1="Hi there")
v = 'v21'
assert 'domain-name' in stix2.parsing.STIX2_OBJ_MAPS[v]['observables']
assert example._type in stix2.parsing.STIX2_OBJ_MAPS[v]['extensions']
assert 'domain-name' in stix2.registry.STIX2_OBJ_MAPS['2.1']['observables']
assert example._type in stix2.registry.STIX2_OBJ_MAPS['2.1']['extensions']
def test_register_duplicate_observable_extension():

View File

@ -2,8 +2,7 @@ from collections import OrderedDict
import pytest
import stix2
from stix2 import exceptions, parsing
from stix2 import DEFAULT_VERSION, exceptions, parsing, registration, registry
BUNDLE = {
"type": "bundle",
@ -64,7 +63,7 @@ def test_parse_observable_with_version():
assert v in str(obs_obj.__class__)
@pytest.mark.xfail(reason="The default version is not 2.1", condition=stix2.DEFAULT_VERSION != "2.1")
@pytest.mark.xfail(reason="The default version is not 2.1", condition=DEFAULT_VERSION != "2.1")
def test_parse_observable_with_no_version():
observable = {"type": "file", "name": "foo.exe", "spec_version": "2.1"}
obs_obj = parsing.parse_observable(observable)
@ -78,22 +77,20 @@ def test_register_marking_with_version():
_type = 'x-new-marking1'
_properties = OrderedDict()
parsing._register_marking(NewMarking1, version='2.1')
v = 'v21'
registration._register_marking(NewMarking1, version='2.1')
assert NewMarking1._type in parsing.STIX2_OBJ_MAPS[v]['markings']
assert v in str(parsing.STIX2_OBJ_MAPS[v]['markings'][NewMarking1._type])
assert NewMarking1._type in registry.STIX2_OBJ_MAPS['2.1']['markings']
assert 'v21' in str(registry.STIX2_OBJ_MAPS['2.1']['markings'][NewMarking1._type])
@pytest.mark.xfail(reason="The default version is not 2.1", condition=stix2.DEFAULT_VERSION != "2.1")
@pytest.mark.xfail(reason="The default version is not 2.1", condition=DEFAULT_VERSION != "2.1")
def test_register_marking_with_no_version():
# Uses default version (2.1 in this case)
class NewMarking2:
_type = 'x-new-marking2'
_properties = OrderedDict()
parsing._register_marking(NewMarking2)
v = 'v21'
registration._register_marking(NewMarking2)
assert NewMarking2._type in parsing.STIX2_OBJ_MAPS[v]['markings']
assert v in str(parsing.STIX2_OBJ_MAPS[v]['markings'][NewMarking2._type])
assert NewMarking2._type in registry.STIX2_OBJ_MAPS['2.1']['markings']
assert 'v21' in str(registry.STIX2_OBJ_MAPS['2.1']['markings'][NewMarking2._type])

View File

@ -241,3 +241,153 @@ def test_find_property_index(object, tuple_to_find, expected_index):
)
def test_iterate_over_values(dict_value, tuple_to_find, expected_index):
assert stix2.serialization._find_property_in_seq(dict_value.values(), *tuple_to_find) == expected_index
@pytest.mark.parametrize(
"type_", [
"attack-pattern",
"campaign",
"course-of-action",
"identity",
"indicator",
"intrusion-set",
"malware",
"observed-data",
"report",
"threat-actor",
"tool",
"vulnerability",
# New in 2.1
"grouping",
"infrastructure",
"location",
"malware-analysis",
"note",
"opinion",
],
)
def test_is_sdo_dict(type_):
d = {
"type": type_,
"spec_version": "2.1",
}
assert stix2.utils.is_sdo(d, "2.1")
@pytest.mark.parametrize(
"dict_", [
{"type": "software", "spec_version": "2.1"},
{"type": "software"},
{"type": "identity"},
{"type": "marking-definition", "spec_version": "2.1"},
{"type": "marking-definition"},
{"type": "bundle", "spec_version": "2.1"},
{"type": "bundle"},
{"type": "language-content", "spec_version": "2.1"},
{"type": "language-content"},
{"type": "relationship", "spec_version": "2.1"},
{"type": "relationship"},
{"type": "foo", "spec_version": "2.1"},
],
)
def test_is_not_sdo_dict(dict_):
assert not stix2.utils.is_sdo(dict_, "2.1")
def test_is_sco_dict():
d = {
"type": "file",
"spec_version": "2.1",
}
assert stix2.utils.is_sco(d, "2.1")
@pytest.mark.parametrize(
"dict_", [
{"type": "identity"},
{"type": "identity", "spec_version": "2.1"},
{"type": "software"},
{"type": "marking-definition", "spec_version": "2.1"},
{"type": "marking-definition"},
{"type": "bundle", "spec_version": "2.1"},
{"type": "bundle"},
{"type": "language-content", "spec_version": "2.1"},
{"type": "language-content"},
{"type": "relationship", "spec_version": "2.1"},
{"type": "relationship"},
{"type": "foo", "spec_version": "2.1"},
],
)
def test_is_not_sco_dict(dict_):
assert not stix2.utils.is_sco(dict_, "2.1")
@pytest.mark.parametrize(
"dict_", [
{"type": "relationship", "spec_version": "2.1"},
{"type": "sighting", "spec_version": "2.1"},
],
)
def test_is_sro_dict(dict_):
assert stix2.utils.is_sro(dict_, "2.1")
@pytest.mark.parametrize(
"dict_", [
{"type": "identity", "spec_version": "2.1"},
{"type": "identity"},
{"type": "software", "spec_version": "2.1"},
{"type": "software"},
{"type": "marking-definition", "spec_version": "2.1"},
{"type": "marking-definition"},
{"type": "bundle", "spec_version": "2.1"},
{"type": "bundle"},
{"type": "language-content", "spec_version": "2.1"},
{"type": "language-content"},
{"type": "relationship"},
{"type": "sighting"},
{"type": "foo", "spec_version": "2.1"},
],
)
def test_is_not_sro_dict(dict_):
assert not stix2.utils.is_sro(dict_, "2.1")
@pytest.mark.parametrize(
"dict_", [
{"type": "identity", "spec_version": "2.1"},
{"type": "software", "spec_version": "2.1"},
{"type": "marking-definition", "spec_version": "2.1"},
{"type": "language-content", "spec_version": "2.1"},
{
"type": "bundle",
"id": "bundle--8f431680-6278-4767-ba43-5edb682d7086",
"objects": [
{"type": "identity", "spec_version": "2.1"},
{"type": "software", "spec_version": "2.1"},
{"type": "marking-definition", "spec_version": "2.1"},
{"type": "language-content", "spec_version": "2.1"},
],
},
],
)
def test_is_object_dict(dict_):
assert stix2.utils.is_object(dict_, "2.1")
@pytest.mark.parametrize(
"dict_", [
{"type": "identity"},
{"type": "software"},
{"type": "marking-definition"},
{"type": "bundle"},
{"type": "language-content"},
{"type": "relationship"},
{"type": "sighting"},
{"type": "foo"},
],
)
def test_is_not_object_dict(dict_):
assert not stix2.utils.is_object(dict_, "2.1")

View File

@ -4,6 +4,7 @@ import pytest
import stix2
import stix2.exceptions
import stix2.properties
import stix2.utils
import stix2.v21
import stix2.versioning
@ -179,6 +180,62 @@ def test_versioning_error_dict_bad_modified_value():
"but have the same id and modified timestamp do not have defined consumer behavior."
def test_versioning_dict_unregistered_no_modified():
d = {
"type": "not-registered",
"id": "not-registered--4da54535-47b7-468c-88fa-d13b04033c4b",
"spec_version": "2.1",
"created": "1995-04-07T15:37:48.178Z",
}
new_d = stix2.versioning.new_version(d)
assert "modified" in new_d
assert new_d["modified"] > stix2.utils.parse_into_datetime(d["created"])
new_d = stix2.versioning.new_version(d, modified="1996-11-20T01:19:29.134Z")
assert new_d["modified"] == "1996-11-20T01:19:29.134Z"
def test_versioning_dict_unregistered_unversionable():
d = {
"type": "not-registered",
"id": "not-registered--4da54535-47b7-468c-88fa-d13b04033c4b",
"spec_version": "2.1",
"modified": "1995-04-07T15:37:48.178Z",
}
with pytest.raises(stix2.exceptions.ObjectNotVersionableError):
stix2.versioning.new_version(d)
with pytest.raises(stix2.exceptions.ObjectNotVersionableError):
# should fail even if we provide a "created" kwarg.
stix2.versioning.new_version(d, created="1985-06-29T06:09:51.157Z")
def test_versioning_custom_object():
@stix2.v21.CustomObject(
"x-versionable-all-optional-21", [
("created", stix2.properties.TimestampProperty()),
("modified", stix2.properties.TimestampProperty()),
("revoked", stix2.properties.BooleanProperty()),
],
)
class CustomSDO:
pass
obj = CustomSDO(created="1990-12-18T17:56:11.346234Z")
new_obj = stix2.versioning.new_version(obj)
assert "modified" in new_obj
assert new_obj["modified"] > new_obj["created"]
obj = CustomSDO()
with pytest.raises(stix2.exceptions.ObjectNotVersionableError):
# fails due to insufficient properties on the object, even though its
# type supports versioning.
stix2.versioning.new_version(obj)
def test_versioning_error_dict_no_modified_value():
campaign_v1 = {
'type': 'campaign',
@ -193,10 +250,10 @@ def test_versioning_error_dict_no_modified_value():
def test_making_new_version_invalid_cls():
campaign_v1 = "This is a campaign."
with pytest.raises(ValueError) as excinfo:
with pytest.raises(stix2.exceptions.TypeNotVersionableError) as excinfo:
stix2.versioning.new_version(campaign_v1, name="fred")
assert 'cannot create new version of object of this type' in str(excinfo.value)
assert excinfo.value.object is campaign_v1
def test_revoke_dict():
@ -216,7 +273,7 @@ def test_revoke_dict():
def test_revoke_unversionable():
sco = stix2.v21.File(name="data.txt")
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
sco.revoke()
@ -318,7 +375,7 @@ def test_version_unversionable_dict():
"name": "data.txt",
}
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(f)
@ -345,6 +402,82 @@ def test_version_sco_with_custom():
revoked_obj = stix2.versioning.revoke(new_file_sco_obj)
assert revoked_obj.revoked
# Same thing with a dict
d = {
"type": "file",
"id": "file--d287f10a-98b4-4a47-8fa0-64b12695ea58",
"spec_version": "2.1",
"name": "data.txt",
"created": "1973-11-23T02:31:37Z",
"modified": "1991-05-13T19:24:57Z",
"revoked": False,
}
new_d = stix2.versioning.new_version(d, size=1234)
assert new_d["size"] == 1234
revoked_d = stix2.versioning.revoke(new_d)
assert revoked_d["revoked"]
def test_version_sco_id_contributing_properties():
file_sco_obj = stix2.v21.File(
name="data.txt",
created="1973-11-23T02:31:37Z",
modified="1991-05-13T19:24:57Z",
revoked=False,
allow_custom=True,
)
with pytest.raises(stix2.exceptions.UnmodifiablePropertyError) as e:
stix2.versioning.new_version(file_sco_obj, name="foo.dat")
assert e.value.unchangable_properties == {"name"}
def test_version_sco_id_contributing_properties_dict():
file_sco_dict = {
"type": "file",
"id": "file--c27c572c-2e17-5ce1-817e-67bb97629a56",
"spec_version": "2.1",
"name": "data.txt",
"created": "1973-11-23T02:31:37Z",
"modified": "1991-05-13T19:24:57Z",
"revoked": False,
}
with pytest.raises(stix2.exceptions.UnmodifiablePropertyError) as e:
stix2.versioning.new_version(file_sco_dict, name="foo.dat")
assert e.value.unchangable_properties == {"name"}
def test_version_marking():
m = stix2.v21.MarkingDefinition(
name="a name",
created="1982-11-29T12:20:13.723Z",
definition_type="statement",
definition={"statement": "Copyright (c) 2000-2020 Acme Corp"},
)
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(m)
m = {
"type": "marking-definition",
"id": "marking-definition--2a9f3f6e-5cbd-423b-a40d-02aefd29e612",
"spec_version": "2.1",
"name": "a name",
"created": "1982-11-29T12:20:13.723Z",
"definition_type": "statement",
"definition": {
"statement": "Copyright (c) 2000-2020 Acme Corp",
},
}
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(m)
def test_version_disable_custom():
m = stix2.v21.Malware(

View File

@ -1,5 +1,6 @@
"""Utility functions and classes for the STIX2 library."""
import collections.abc
import datetime as dt
import enum
import json
@ -8,7 +9,8 @@ import re
import pytz
import six
import stix2
import stix2.registry as mappings
import stix2.version
# Sentinel value for properties that should be set to the current time.
# We can't use the standard 'default' approach, since if there are multiple
@ -313,18 +315,262 @@ def get_type_from_id(stix_id):
return stix_id.split('--', 1)[0]
def is_marking(obj_or_id):
"""Determines whether the given object or object ID is/is for a marking
definition.
def detect_spec_version(stix_dict):
"""
Given a dict representing a STIX object, try to detect what spec version
it is likely to comply with.
:param obj_or_id: A STIX object or object ID as a string.
:return: True if a marking definition, False otherwise.
:param stix_dict: A dict with some STIX content. Must at least have a
"type" property.
:return: A STIX version in "X.Y" format
"""
if isinstance(obj_or_id, (stix2.base._STIXBase, dict)):
result = obj_or_id["type"] == "marking-definition"
obj_type = stix_dict["type"]
if 'spec_version' in stix_dict:
# For STIX 2.0, applies to bundles only.
# For STIX 2.1+, applies to SCOs, SDOs, SROs, and markings only.
v = stix_dict['spec_version']
elif "id" not in stix_dict:
# Only 2.0 SCOs don't have ID properties
v = "2.0"
elif obj_type == 'bundle':
# Bundle without a spec_version property: must be 2.1. But to
# future-proof, use max version over all contained SCOs, with 2.1
# minimum.
v = max(
"2.1",
max(
detect_spec_version(obj) for obj in stix_dict["objects"]
),
)
elif obj_type in mappings.STIX2_OBJ_MAPS["2.1"]["observables"]:
# Non-bundle object with an ID and without spec_version. Could be a
# 2.1 SCO or 2.0 SDO/SRO/marking. Check for 2.1 SCO...
v = "2.1"
else:
# it's a string ID
result = obj_or_id.startswith("marking-definition--")
# Not a 2.1 SCO; must be a 2.0 object.
v = "2.0"
return v
def _stix_type_of(value):
"""
Get a STIX type from the given value: if a STIX ID is passed, the type
prefix is extracted; if string which is not a STIX ID is passed, it is
assumed to be a STIX type and is returned; otherwise it is assumed to be a
mapping with a "type" property, and the value of that property is returned.
:param value: A mapping with a "type" property, or a STIX ID or type
as a string
:return: A STIX type
"""
if isinstance(value, str):
if "--" in value:
type_ = get_type_from_id(value)
else:
type_ = value
else:
type_ = value["type"]
return type_
def is_sdo(value, stix_version=stix2.version.DEFAULT_VERSION):
"""
Determine whether the given object, type, or ID is/is for an SDO of the
given STIX version. If value is a type or ID, this just checks whether
the type was registered as an SDO in the given STIX version. If a mapping,
*simple* STIX version inference is additionally done on the value, and the
result is checked against stix_version. It does not attempt to fully
validate the value.
:param value: A mapping with a "type" property, or a STIX ID or type
as a string
:param stix_version: A STIX version as a string
:return: True if the type of the given value is an SDO type of the given
version; False if not
"""
result = True
if isinstance(value, collections.abc.Mapping):
value_stix_version = detect_spec_version(value)
if value_stix_version != stix_version:
result = False
if result:
cls_maps = mappings.STIX2_OBJ_MAPS[stix_version]
type_ = _stix_type_of(value)
result = type_ in cls_maps["objects"] and type_ not in {
"relationship", "sighting", "marking-definition", "bundle",
"language-content",
}
return result
def is_sco(value, stix_version=stix2.version.DEFAULT_VERSION):
"""
Determine whether the given object, type, or ID is/is for an SCO of the
given STIX version. If value is a type or ID, this just checks whether
the type was registered as an SCO in the given STIX version. If a mapping,
*simple* STIX version inference is additionally done on the value, and the
result is checked against stix_version. It does not attempt to fully
validate the value.
:param value: A mapping with a "type" property, or a STIX ID or type
as a string
:param stix_version: A STIX version as a string
:return: True if the type of the given value is an SCO type of the given
version; False if not
"""
result = True
if isinstance(value, collections.abc.Mapping):
value_stix_version = detect_spec_version(value)
if value_stix_version != stix_version:
result = False
if result:
cls_maps = mappings.STIX2_OBJ_MAPS[stix_version]
type_ = _stix_type_of(value)
result = type_ in cls_maps["observables"]
return result
def is_sro(value, stix_version=stix2.version.DEFAULT_VERSION):
"""
Determine whether the given object, type, or ID is/is for an SRO of the
given STIX version. If value is a type or ID, this just checks whether
the type is "sighting" or "relationship". If a mapping, *simple* STIX
version inference is additionally done on the value, and the result is
checked against stix_version. It does not attempt to fully validate the
value.
:param value: A mapping with a "type" property, or a STIX ID or type
as a string
:param stix_version: A STIX version as a string
:return: True if the type of the given value is an SRO type of the given
version; False if not
"""
result = True
if isinstance(value, collections.abc.Mapping):
value_stix_version = detect_spec_version(value)
if value_stix_version != stix_version:
result = False
if result:
# No need to check registration in this case
type_ = _stix_type_of(value)
result = type_ in ("sighting", "relationship")
return result
def is_object(value, stix_version=stix2.version.DEFAULT_VERSION):
"""
Determine whether an object, type, or ID is/is for any STIX object. This
includes all SDOs, SCOs, meta-objects, and bundle. If value is a type or
ID, this just checks whether the type was registered in the given STIX
version. If a mapping, *simple* STIX version inference is additionally
done on the value, and the result is checked against stix_version. It does
not attempt to fully validate the value.
:param value: A mapping with a "type" property, or a STIX ID or type
as a string
:param stix_version: A STIX version as a string
:return: True if the type of the given value is a valid STIX type with
respect to the given STIX version; False if not
"""
result = True
if isinstance(value, collections.abc.Mapping):
value_stix_version = detect_spec_version(value)
if value_stix_version != stix_version:
result = False
if result:
cls_maps = mappings.STIX2_OBJ_MAPS[stix_version]
type_ = _stix_type_of(value)
result = type_ in cls_maps["observables"] \
or type_ in cls_maps["objects"]
return result
def is_marking(value, stix_version=stix2.version.DEFAULT_VERSION):
"""
Determine whether the given object, type, or ID is/is for an marking
definition of the given STIX version. If value is a type or ID, this just
checks whether the type is "marking-definition". If a mapping, *simple*
STIX version inference is additionally done on the value, and the result
is checked against stix_version. It does not attempt to fully validate the
value.
:param value: A STIX object, object ID, or type as a string.
:param stix_version: A STIX version as a string
:return: True if the value is/is for a marking definition, False otherwise.
"""
result = True
if isinstance(value, collections.abc.Mapping):
value_stix_version = detect_spec_version(value)
if value_stix_version != stix_version:
result = False
if result:
# No need to check registration in this case
type_ = _stix_type_of(value)
result = type_ == "marking-definition"
return result
class STIXTypeClass(enum.Enum):
"""
Represents different classes of STIX type.
"""
SDO = 0
SCO = 1
SRO = 2
def is_stix_type(value, stix_version=stix2.version.DEFAULT_VERSION, *types):
"""
Determine whether the type of the given value satisfies the given
constraints. 'types' must contain STIX types as strings, and/or the
STIXTypeClass enum values. STIX types imply an exact match constraint;
STIXTypeClass enum values imply a more general constraint, that the object
or type be in that class of STIX type. These constraints are implicitly
OR'd together.
:param value: A mapping with a "type" property, or a STIX ID or type
as a string
:param stix_version: A STIX version as a string
:param types: A sequence of STIX type strings or STIXTypeClass enum values
:return: True if the object or type satisfies the constraints; False if not
"""
for type_ in types:
if type_ is STIXTypeClass.SDO:
result = is_sdo(value, stix_version)
elif type_ is STIXTypeClass.SCO:
result = is_sco(value, stix_version)
elif type_ is STIXTypeClass.SRO:
result = is_sro(value, stix_version)
else:
# Assume a string STIX type is given instead of a class enum,
# and just check for exact match.
obj_type = _stix_type_of(value)
result = obj_type == type_ and is_object(value, stix_version)
if result:
break
else:
result = False
return result

View File

@ -1 +1,3 @@
__version__ = "2.1.0"
DEFAULT_VERSION = '2.1' # Default version will always be the latest STIX 2.X version

View File

@ -1,19 +1,21 @@
"""STIX2 core versioning methods."""
from collections.abc import Mapping
import copy
import datetime as dt
import itertools
import uuid
import six
from six.moves.collections_abc import Mapping
import stix2.base
from stix2.utils import get_timestamp, parse_into_datetime
import stix2.registry
from stix2.utils import (
detect_spec_version, get_timestamp, is_sco, parse_into_datetime,
)
import stix2.v20
from .exceptions import (
InvalidValueError, RevokeError, UnmodifiablePropertyError,
InvalidValueError, ObjectNotVersionableError, RevokeError,
TypeNotVersionableError, UnmodifiablePropertyError,
)
# STIX object properties that cannot be modified
@ -54,77 +56,131 @@ def _fudge_modified(old_modified, new_modified, use_stix21):
return new_modified
def _is_versionable(data):
def _get_stix_version(data):
"""
Determine whether the given object is versionable. This check is done on
the basis of support for three properties for the object type: "created",
"modified", and "revoked". If all three are supported, the object is
versionable; otherwise it is not. Dicts must have a "type" property whose
value is for a registered object type. This is used to determine a
Bit of factored out functionality for getting/detecting the STIX version
of the given value.
:param data: An object, e.g. _STIXBase instance or dict
:return: The STIX version as a string in "X.Y" notation, or None if the
version could not be determined.
"""
stix_version = None
if isinstance(data, Mapping):
# First, determine spec version. It's easy for our stix2 objects; more
# work for dicts.
if isinstance(data, stix2.v20._STIXBase20):
stix_version = "2.0"
elif isinstance(data, stix2.v21._STIXBase21):
stix_version = "2.1"
elif isinstance(data, dict):
stix_version = detect_spec_version(data)
return stix_version
def _is_versionable_type(data):
"""
Determine whether type of the given object is versionable. This check is
done on the basis of support for three properties for the object type:
"created", "modified", and "revoked". If all three are supported, the
object type is versionable; otherwise it is not. Dicts must have a "type"
property. This is used in STIX version detection and to determine a
complete set of supported properties for the type.
If a dict is passed whose "type" is unregistered, then this library has no
knowledge of the type. It can't determine what properties are "supported".
This function will be lax and treat the type as versionable.
Note that this support check is not sufficient for creating a new object
version. Support for the versioning properties does not mean that
sufficient properties are actually present on the object.
Also, detect whether it represents a STIX 2.1 or greater spec version.
:param data: The object to check. Must be either a stix object, or a dict
with a "type" property.
:return: A 2-tuple of bools: the first is True if the object is versionable
and False if not; the second is True if the object is STIX 2.1+ and
False if not.
:return: A 2-tuple: the first element is True if the object is versionable
and False if not; the second is the STIX version as a string in "X.Y"
notation.
"""
is_versionable = False
is_21 = False
stix_vid = None
stix_version = None
if isinstance(data, Mapping):
# First, determine spec version. It's easy for our stix2 objects; more
# work for dicts.
is_21 = False
if isinstance(data, stix2.base._STIXBase) and \
not isinstance(data, stix2.v20._STIXBase20):
# (is_21 means 2.1 or later; try not to be 2.1-specific)
is_21 = True
elif isinstance(data, dict):
stix_vid = stix2.parsing._detect_spec_version(data)
is_21 = stix_vid != "v20"
# First, determine spec version
stix_version = _get_stix_version(data)
# Then, determine versionability.
if isinstance(data, stix2.base._STIXBase):
is_versionable = _VERSIONING_PROPERTIES.issubset(
data._properties,
)
if six.PY2:
# dumb python2 compatibility: map.keys() returns a list, not a set!
# six.viewkeys() compatibility function uses dict.viewkeys() on
# python2, which is not a Mapping mixin method, so that doesn't
# work either (for our stix2 objects).
keys = set(data)
else:
keys = data.keys()
# This should be sufficient for STIX objects; maybe we get lucky with
# dicts here but probably not.
if keys >= _VERSIONING_PROPERTIES:
is_versionable = True
# Tougher to handle dicts. We need to consider STIX version, map to a
# registered class, and from that get a more complete picture of its
# properties.
elif isinstance(data, dict):
class_maps = stix2.parsing.STIX2_OBJ_MAPS[stix_vid]
obj_type = data["type"]
# Tougher to handle dicts. We need to consider STIX version,
# map to a registered class, and from that get a more complete
# picture of its properties.
if obj_type in class_maps["objects"]:
# Should we bother checking properties for SDOs/SROs?
# They were designed to be versionable.
is_versionable = True
elif obj_type in class_maps["observables"]:
# but do check SCOs
cls = class_maps["observables"][obj_type]
cls = stix2.registry.class_for_type(data.get("type"), stix_version)
if cls:
is_versionable = _VERSIONING_PROPERTIES.issubset(
cls._properties,
)
return is_versionable, is_21
else:
# The type is not registered, so we have no knowledge of
# what properties are supported. Let's be lax and let them
# version it.
is_versionable = True
return is_versionable, stix_version
def _check_versionable_object(data):
"""
Determine whether there are or may be sufficient properties present on
an object to allow versioning. Raises an exception if the object can't be
versioned.
Also detect STIX spec version.
:param data: The object to check, e.g. dict with a "type" property, or
_STIXBase instance
:return: True if the object is STIX 2.1+, or False if not
:raises TypeNotVersionableError: If the object didn't have the versioning
properties and the type was found to not support them
:raises ObjectNotVersionableError: If the type was found to support
versioning but there were insufficient properties on the object
"""
if isinstance(data, Mapping):
if data.keys() >= _VERSIONING_PROPERTIES:
# If the properties all already exist in the object, assume they
# are either supported by the type, or are custom properties, and
# allow versioning.
stix_version = _get_stix_version(data)
else:
is_versionable_type, stix_version = _is_versionable_type(data)
if is_versionable_type:
# The type supports the versioning properties (or we don't
# recognize it and just assume it does). The question shifts
# to whether the object has sufficient properties to create a
# new version. Just require "created" for now. We need at
# least that as a starting point for new version timestamps.
is_versionable = "created" in data
if not is_versionable:
raise ObjectNotVersionableError(data)
else:
raise TypeNotVersionableError(data)
else:
raise TypeNotVersionableError(data)
return stix_version
def new_version(data, allow_custom=None, **kwargs):
@ -143,13 +199,7 @@ def new_version(data, allow_custom=None, **kwargs):
:return: The new object.
"""
is_versionable, is_21 = _is_versionable(data)
if not is_versionable:
raise ValueError(
"cannot create new version of object of this type! "
"Try a dictionary or instance of an SDO or SRO class.",
)
stix_version = _check_versionable_object(data)
if data.get('revoked'):
raise RevokeError("new_version")
@ -164,10 +214,17 @@ def new_version(data, allow_custom=None, **kwargs):
# probably were). That would imply an ID change, which is not allowed
# across versions.
sco_locked_props = []
if is_21 and isinstance(data, stix2.base._Observable):
if is_sco(data, "2.1"):
uuid_ = uuid.UUID(data["id"][-36:])
if uuid_.variant == uuid.RFC_4122 and uuid_.version == 5:
sco_locked_props = data._id_contributing_properties
if isinstance(data, stix2.base._Observable):
cls = data.__class__
else:
cls = stix2.registry.class_for_type(
data["type"], stix_version, "observables",
)
sco_locked_props = cls._id_contributing_properties
unchangable_properties = set()
for prop in itertools.chain(STIX_UNMOD_PROPERTIES, sco_locked_props):
@ -178,36 +235,36 @@ def new_version(data, allow_custom=None, **kwargs):
# Different versioning precision rules in STIX 2.0 vs 2.1, so we need
# to know which rules to apply.
precision_constraint = "min" if is_21 else "exact"
precision_constraint = "min" if stix_version == "2.1" else "exact"
old_modified = data.get("modified") or data.get("created")
old_modified = parse_into_datetime(
old_modified, precision="millisecond",
precision_constraint=precision_constraint,
)
cls = type(data)
if 'modified' not in kwargs:
old_modified = parse_into_datetime(
data["modified"], precision="millisecond",
precision_constraint=precision_constraint,
)
new_modified = get_timestamp()
new_modified = _fudge_modified(old_modified, new_modified, is_21)
kwargs['modified'] = new_modified
elif 'modified' in data:
old_modified_property = parse_into_datetime(
data.get('modified'), precision='millisecond',
precision_constraint=precision_constraint,
)
new_modified_property = parse_into_datetime(
if 'modified' in kwargs:
new_modified = parse_into_datetime(
kwargs['modified'], precision='millisecond',
precision_constraint=precision_constraint,
)
if new_modified_property <= old_modified_property:
if new_modified <= old_modified:
raise InvalidValueError(
cls, 'modified',
"The new modified datetime cannot be before than or equal to the current modified datetime."
"It cannot be equal, as according to STIX 2 specification, objects that are different "
"but have the same id and modified timestamp do not have defined consumer behavior.",
)
else:
new_modified = get_timestamp()
new_modified = _fudge_modified(
old_modified, new_modified, stix_version != "2.0",
)
kwargs['modified'] = new_modified
new_obj_inner.update(kwargs)
# Set allow_custom appropriately if versioning an object. We will ignore

View File

@ -22,8 +22,6 @@
import functools
import stix2
from . import AttackPattern as _AttackPattern
from . import Campaign as _Campaign
from . import CourseOfAction as _CourseOfAction
@ -36,12 +34,14 @@ from . import Location as _Location
from . import Malware as _Malware
from . import MalwareAnalysis as _MalwareAnalysis
from . import Note as _Note
from . import OBJ_MAP
from . import ObservedData as _ObservedData
from . import Opinion as _Opinion
from . import Report as _Report
from . import ThreatActor as _ThreatActor
from . import Tool as _Tool
from . import Vulnerability as _Vulnerability
from .version import DEFAULT_VERSION
from . import ( # noqa: F401 isort:skip
AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem,
@ -64,7 +64,7 @@ from .datastore.filters import FilterSet # isort:skip
# Enable some adaptation to the current default supported STIX version.
_STIX_VID = "v" + stix2.DEFAULT_VERSION.replace(".", "")
_STIX_VID = "v" + DEFAULT_VERSION.replace(".", "")
# Use an implicit MemoryStore
@ -161,10 +161,17 @@ def _setup_workbench():
new_class = type(obj_type.__name__, (obj_type,), new_class_dict)
factory_func = functools.partial(_environ.create, new_class)
# Copy over some class attributes that other code expects to find
factory_func._type = obj_type._type
factory_func._properties = obj_type._properties
if hasattr(obj_type, "_id_contributing_properties"):
factory_func._id_contributing_properties = \
obj_type._id_contributing_properties
# Add our new "class" to this module's globals and to the library-wide
# mapping. This allows parse() to use the wrapped classes.
globals()[obj_type.__name__] = factory_func
stix2.OBJ_MAP[obj_type._type] = factory_func
OBJ_MAP[obj_type._type] = factory_func
_setup_workbench()