Merge pull request #481 from chisholm/parsing_refactoring

Refactor stix2.parsing into more focused modules
pull/1/head
Chris Lenk 2021-01-20 10:47:36 -05:00 committed by GitHub
commit 03b3423cbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 312 additions and 302 deletions

View File

@ -24,8 +24,6 @@
# flake8: noqa # flake8: noqa
DEFAULT_VERSION = '2.1' # Default version will always be the latest STIX 2.X version
from .confidence import scales from .confidence import scales
from .datastore import CompositeDataSource from .datastore import CompositeDataSource
from .datastore.filesystem import ( from .datastore.filesystem import (
@ -41,7 +39,7 @@ from .markings import (
add_markings, clear_markings, get_markings, is_marked, remove_markings, add_markings, clear_markings, get_markings, is_marked, remove_markings,
set_markings, set_markings,
) )
from .parsing import _collect_stix2_mappings, parse, parse_observable from .parsing import parse, parse_observable
from .patterns import ( from .patterns import (
AndBooleanExpression, AndObservationExpression, BasicObjectPathComponent, AndBooleanExpression, AndObservationExpression, BasicObjectPathComponent,
BinaryConstant, BooleanConstant, EqualityComparisonExpression, BinaryConstant, BooleanConstant, EqualityComparisonExpression,
@ -57,8 +55,9 @@ from .patterns import (
RepeatQualifier, StartStopQualifier, StringConstant, TimestampConstant, RepeatQualifier, StartStopQualifier, StringConstant, TimestampConstant,
WithinQualifier, WithinQualifier,
) )
from .registry import _collect_stix2_mappings
from .v21 import * # This import will always be the latest STIX 2.X version from .v21 import * # This import will always be the latest STIX 2.X version
from .version import __version__ from .version import DEFAULT_VERSION, __version__
from .versioning import new_version, revoke from .versioning import new_version, revoke
_collect_stix2_mappings() _collect_stix2_mappings()

View File

@ -3,7 +3,7 @@ from collections import OrderedDict
import six import six
from .base import _cls_init from .base import _cls_init
from .parsing import ( from .registration import (
_register_marking, _register_object, _register_observable, _register_marking, _register_object, _register_observable,
_register_observable_extension, _register_observable_extension,
) )

View File

@ -9,18 +9,14 @@
| |
""" """
import stix2 from ... import pattern_visitor
from stix2.equivalence.pattern.compare.observation import ( from ...version import DEFAULT_VERSION
observation_expression_cmp, from .compare.observation import observation_expression_cmp
) from .transform import ChainTransformer, SettleTransformer
from stix2.equivalence.pattern.transform import ( from .transform.observation import (
ChainTransformer, SettleTransformer,
)
from stix2.equivalence.pattern.transform.observation import (
AbsorptionTransformer, CanonicalizeComparisonExpressionsTransformer, AbsorptionTransformer, CanonicalizeComparisonExpressionsTransformer,
DNFTransformer, FlattenTransformer, OrderDedupeTransformer, DNFTransformer, FlattenTransformer, OrderDedupeTransformer,
) )
import stix2.pattern_visitor
# Lazy-initialize # Lazy-initialize
_pattern_canonicalizer = None _pattern_canonicalizer = None
@ -61,7 +57,7 @@ def _get_pattern_canonicalizer():
return _pattern_canonicalizer return _pattern_canonicalizer
def equivalent_patterns(pattern1, pattern2, stix_version=stix2.DEFAULT_VERSION): def equivalent_patterns(pattern1, pattern2, stix_version=DEFAULT_VERSION):
""" """
Determine whether two STIX patterns are semantically equivalent. Determine whether two STIX patterns are semantically equivalent.
@ -74,10 +70,10 @@ def equivalent_patterns(pattern1, pattern2, stix_version=stix2.DEFAULT_VERSION):
Returns: Returns:
True if the patterns are semantically equivalent; False if not True if the patterns are semantically equivalent; False if not
""" """
patt_ast1 = stix2.pattern_visitor.create_pattern_object( patt_ast1 = pattern_visitor.create_pattern_object(
pattern1, version=stix_version, pattern1, version=stix_version,
) )
patt_ast2 = stix2.pattern_visitor.create_pattern_object( patt_ast2 = pattern_visitor.create_pattern_object(
pattern2, version=stix_version, pattern2, version=stix_version,
) )
@ -91,7 +87,7 @@ def equivalent_patterns(pattern1, pattern2, stix_version=stix2.DEFAULT_VERSION):
def find_equivalent_patterns( def find_equivalent_patterns(
search_pattern, patterns, stix_version=stix2.DEFAULT_VERSION, search_pattern, patterns, stix_version=DEFAULT_VERSION,
): ):
""" """
Find patterns from a sequence which are equivalent to a given pattern. Find patterns from a sequence which are equivalent to a given pattern.
@ -109,7 +105,7 @@ def find_equivalent_patterns(
Returns: Returns:
A generator iterator producing the semantically equivalent patterns A generator iterator producing the semantically equivalent patterns
""" """
search_pattern_ast = stix2.pattern_visitor.create_pattern_object( search_pattern_ast = pattern_visitor.create_pattern_object(
search_pattern, version=stix_version, search_pattern, version=stix_version,
) )
@ -119,7 +115,7 @@ def find_equivalent_patterns(
) )
for pattern in patterns: for pattern in patterns:
pattern_ast = stix2.pattern_visitor.create_pattern_object( pattern_ast = pattern_visitor.create_pattern_object(
pattern, version=stix_version, pattern, version=stix_version,
) )
canon_pattern_ast, _ = pattern_canonicalizer.transform(pattern_ast) canon_pattern_ast, _ = pattern_canonicalizer.transform(pattern_ast)

View File

@ -1,17 +1,10 @@
"""STIX2 Core parsing methods.""" """STIX2 Core parsing methods."""
import copy import copy
import importlib
import pkgutil
import re
import stix2 from . import registry
from .exceptions import ParseError
from .base import _DomainObject, _Observable from .utils import _get_dict
from .exceptions import DuplicateRegistrationError, ParseError
from .utils import PREFIX_21_REGEX, _get_dict, get_class_hierarchy_names
STIX2_OBJ_MAPS = {}
def parse(data, allow_custom=False, version=None): def parse(data, allow_custom=False, version=None):
@ -79,7 +72,7 @@ def _detect_spec_version(stix_dict):
_detect_spec_version(obj) for obj in stix_dict["objects"] _detect_spec_version(obj) for obj in stix_dict["objects"]
), ),
) )
elif obj_type in STIX2_OBJ_MAPS["v21"]["observables"]: elif obj_type in registry.STIX2_OBJ_MAPS["v21"]["observables"]:
# Non-bundle object with an ID and without spec_version. Could be a # Non-bundle object with an ID and without spec_version. Could be a
# 2.1 SCO or 2.0 SDO/SRO/marking. Check for 2.1 SCO... # 2.1 SCO or 2.0 SDO/SRO/marking. Check for 2.1 SCO...
v = "v21" v = "v21"
@ -128,7 +121,10 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None):
else: else:
v = _detect_spec_version(stix_dict) v = _detect_spec_version(stix_dict)
OBJ_MAP = dict(STIX2_OBJ_MAPS[v]['objects'], **STIX2_OBJ_MAPS[v]['observables']) OBJ_MAP = dict(
registry.STIX2_OBJ_MAPS[v]['objects'],
**registry.STIX2_OBJ_MAPS[v]['observables']
)
try: try:
obj_class = OBJ_MAP[stix_dict['type']] obj_class = OBJ_MAP[stix_dict['type']]
@ -179,7 +175,7 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None):
v = _detect_spec_version(obj) v = _detect_spec_version(obj)
try: try:
OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables'] OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[v]['observables']
obj_class = OBJ_MAP_OBSERVABLE[obj['type']] obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
except KeyError: except KeyError:
if allow_custom: if allow_custom:
@ -192,226 +188,3 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None):
) )
return obj_class(allow_custom=allow_custom, **obj) return obj_class(allow_custom=allow_custom, **obj)
def _register_object(new_type, version=stix2.DEFAULT_VERSION):
"""Register a custom STIX Object type.
Args:
new_type (class): A class to register in the Object map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Raises:
ValueError: If the class being registered wasn't created with the
@CustomObject decorator.
DuplicateRegistrationError: If the class has already been registered.
"""
if not issubclass(new_type, _DomainObject):
raise ValueError(
"'%s' must be created with the @CustomObject decorator." %
new_type.__name__,
)
properties = new_type._properties
if version == "2.1":
for prop_name, prop in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character" % prop_name)
if version:
v = 'v' + version.replace('.', '')
else:
# Use default version (latest) if no version was provided.
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
OBJ_MAP = STIX2_OBJ_MAPS[v]['objects']
if new_type._type in OBJ_MAP.keys():
raise DuplicateRegistrationError("STIX Object", new_type._type)
OBJ_MAP[new_type._type] = new_type
def _register_marking(new_marking, version=stix2.DEFAULT_VERSION):
"""Register a custom STIX Marking Definition type.
Args:
new_marking (class): A class to register in the Marking map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
mark_type = new_marking._type
properties = new_marking._properties
stix2.properties._validate_type(mark_type, version)
if version == "2.1":
for prop_name, prop_value in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
if version:
v = 'v' + version.replace('.', '')
else:
# Use default version (latest) if no version was provided.
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
OBJ_MAP_MARKING = STIX2_OBJ_MAPS[v]['markings']
if mark_type in OBJ_MAP_MARKING.keys():
raise DuplicateRegistrationError("STIX Marking", mark_type)
OBJ_MAP_MARKING[mark_type] = new_marking
def _register_observable(new_observable, version=stix2.DEFAULT_VERSION):
"""Register a custom STIX Cyber Observable type.
Args:
new_observable (class): A class to register in the Observables map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
properties = new_observable._properties
if version == "2.0":
# If using STIX2.0, check properties ending in "_ref/s" are ObjectReferenceProperties
for prop_name, prop in properties.items():
if prop_name.endswith('_ref') and ('ObjectReferenceProperty' not in get_class_hierarchy_names(prop)):
raise ValueError(
"'%s' is named like an object reference property but "
"is not an ObjectReferenceProperty." % prop_name,
)
elif (
prop_name.endswith('_refs') and (
'ListProperty' not in get_class_hierarchy_names(prop) or
'ObjectReferenceProperty' not in get_class_hierarchy_names(prop.contained)
)
):
raise ValueError(
"'%s' is named like an object reference list property but "
"is not a ListProperty containing ObjectReferenceProperty." % prop_name,
)
else:
# If using STIX2.1 (or newer...), check properties ending in "_ref/s" are ReferenceProperties
for prop_name, prop in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
elif prop_name.endswith('_ref') and ('ReferenceProperty' not in get_class_hierarchy_names(prop)):
raise ValueError(
"'%s' is named like a reference property but "
"is not a ReferenceProperty." % prop_name,
)
elif (
prop_name.endswith('_refs') and (
'ListProperty' not in get_class_hierarchy_names(prop) or
'ReferenceProperty' not in get_class_hierarchy_names(prop.contained)
)
):
raise ValueError(
"'%s' is named like a reference list property but "
"is not a ListProperty containing ReferenceProperty." % prop_name,
)
if version:
v = 'v' + version.replace('.', '')
else:
# Use default version (latest) if no version was provided.
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables']
if new_observable._type in OBJ_MAP_OBSERVABLE.keys():
raise DuplicateRegistrationError("Cyber Observable", new_observable._type)
OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable
def _register_observable_extension(
observable, new_extension, version=stix2.DEFAULT_VERSION,
):
"""Register a custom extension to a STIX Cyber Observable type.
Args:
observable: An observable class or instance
new_extension (class): A class to register in the Observables
Extensions map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1").
Defaults to the latest supported version.
"""
obs_class = observable if isinstance(observable, type) else \
type(observable)
ext_type = new_extension._type
properties = new_extension._properties
if not issubclass(obs_class, _Observable):
raise ValueError("'observable' must be a valid Observable class!")
stix2.properties._validate_type(ext_type, version)
if not new_extension._properties:
raise ValueError(
"Invalid extension: must define at least one property: " +
ext_type,
)
if version == "2.1":
if not ext_type.endswith('-ext'):
raise ValueError(
"Invalid extension type name '%s': must end with '-ext'." %
ext_type,
)
for prop_name, prop_value in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
v = 'v' + version.replace('.', '')
try:
observable_type = observable._type
except AttributeError:
raise ValueError(
"Unknown observable type. Custom observables must be "
"created with the @CustomObservable decorator.",
)
OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables']
EXT_MAP = STIX2_OBJ_MAPS[v]['observable-extensions']
try:
if ext_type in EXT_MAP[observable_type].keys():
raise DuplicateRegistrationError("Observable Extension", ext_type)
EXT_MAP[observable_type][ext_type] = new_extension
except KeyError:
if observable_type not in OBJ_MAP_OBSERVABLE:
raise ValueError(
"Unknown observable type '%s'. Custom observables "
"must be created with the @CustomObservable decorator."
% observable_type,
)
else:
EXT_MAP[observable_type] = {ext_type: new_extension}
def _collect_stix2_mappings():
"""Navigate the package once and retrieve all object mapping dicts for each
v2X package. Includes OBJ_MAP, OBJ_MAP_OBSERVABLE, EXT_MAP."""
if not STIX2_OBJ_MAPS:
top_level_module = importlib.import_module('stix2')
path = top_level_module.__path__
prefix = str(top_level_module.__name__) + '.'
for module_loader, name, is_pkg in pkgutil.walk_packages(path=path, prefix=prefix):
ver = name.split('.')[1]
if re.match(r'^stix2\.v2[0-9]$', name) and is_pkg:
mod = importlib.import_module(name, str(top_level_module.__name__))
STIX2_OBJ_MAPS[ver] = {}
STIX2_OBJ_MAPS[ver]['objects'] = mod.OBJ_MAP
STIX2_OBJ_MAPS[ver]['observables'] = mod.OBJ_MAP_OBSERVABLE
STIX2_OBJ_MAPS[ver]['observable-extensions'] = mod.EXT_MAP
elif re.match(r'^stix2\.v2[0-9]\.common$', name) and is_pkg is False:
mod = importlib.import_module(name, str(top_level_module.__name__))
STIX2_OBJ_MAPS[ver]['markings'] = mod.OBJ_MAP_MARKING

View File

@ -17,10 +17,9 @@ from stix2patterns.v21.grammars.STIXPatternVisitor import \
STIXPatternVisitor as STIXPatternVisitor21 STIXPatternVisitor as STIXPatternVisitor21
from stix2patterns.v21.pattern import Pattern as Pattern21 from stix2patterns.v21.pattern import Pattern as Pattern21
import stix2
from .patterns import * from .patterns import *
from .patterns import _BooleanExpression from .patterns import _BooleanExpression
from .version import DEFAULT_VERSION
# flake8: noqa F405 # flake8: noqa F405
@ -391,7 +390,7 @@ class STIXPatternVisitorForSTIX20(STIXPatternVisitorForSTIX2, STIXPatternVisitor
super(STIXPatternVisitor20, self).__init__() super(STIXPatternVisitor20, self).__init__()
def create_pattern_object(pattern, module_suffix="", module_name="", version=stix2.DEFAULT_VERSION): def create_pattern_object(pattern, module_suffix="", module_name="", version=DEFAULT_VERSION):
""" """
Create a STIX pattern AST from a pattern string. Create a STIX pattern AST from a pattern string.
""" """

View File

@ -9,15 +9,15 @@ import uuid
from six import string_types, text_type from six import string_types, text_type
import stix2
from .base import _STIXBase from .base import _STIXBase
from .exceptions import ( from .exceptions import (
CustomContentError, DictionaryKeyError, MissingPropertiesError, CustomContentError, DictionaryKeyError, MissingPropertiesError,
MutuallyExclusivePropertiesError, STIXError, MutuallyExclusivePropertiesError, STIXError,
) )
from .parsing import STIX2_OBJ_MAPS, parse, parse_observable from .parsing import parse, parse_observable
from .registry import STIX2_OBJ_MAPS
from .utils import _get_dict, get_class_hierarchy_names, parse_into_datetime from .utils import _get_dict, get_class_hierarchy_names, parse_into_datetime
from .version import DEFAULT_VERSION
try: try:
from collections.abc import Mapping from collections.abc import Mapping
@ -275,7 +275,7 @@ class StringProperty(Property):
class TypeProperty(Property): class TypeProperty(Property):
def __init__(self, type, spec_version=stix2.DEFAULT_VERSION): def __init__(self, type, spec_version=DEFAULT_VERSION):
_validate_type(type, spec_version) _validate_type(type, spec_version)
self.spec_version = spec_version self.spec_version = spec_version
super(TypeProperty, self).__init__(fixed=type) super(TypeProperty, self).__init__(fixed=type)
@ -283,7 +283,7 @@ class TypeProperty(Property):
class IDProperty(Property): class IDProperty(Property):
def __init__(self, type, spec_version=stix2.DEFAULT_VERSION): def __init__(self, type, spec_version=DEFAULT_VERSION):
self.required_prefix = type + "--" self.required_prefix = type + "--"
self.spec_version = spec_version self.spec_version = spec_version
super(IDProperty, self).__init__() super(IDProperty, self).__init__()
@ -382,7 +382,7 @@ class TimestampProperty(Property):
class DictionaryProperty(Property): class DictionaryProperty(Property):
def __init__(self, spec_version=stix2.DEFAULT_VERSION, **kwargs): def __init__(self, spec_version=DEFAULT_VERSION, **kwargs):
self.spec_version = spec_version self.spec_version = spec_version
super(DictionaryProperty, self).__init__(**kwargs) super(DictionaryProperty, self).__init__(**kwargs)
@ -471,7 +471,7 @@ class HexProperty(Property):
class ReferenceProperty(Property): class ReferenceProperty(Property):
def __init__(self, valid_types=None, invalid_types=None, spec_version=stix2.DEFAULT_VERSION, **kwargs): def __init__(self, valid_types=None, invalid_types=None, spec_version=DEFAULT_VERSION, **kwargs):
""" """
references sometimes must be to a specific object type references sometimes must be to a specific object type
""" """
@ -605,7 +605,7 @@ class ObservableProperty(Property):
"""Property for holding Cyber Observable Objects. """Property for holding Cyber Observable Objects.
""" """
def __init__(self, spec_version=stix2.DEFAULT_VERSION, allow_custom=False, *args, **kwargs): def __init__(self, spec_version=DEFAULT_VERSION, allow_custom=False, *args, **kwargs):
self.allow_custom = allow_custom self.allow_custom = allow_custom
self.spec_version = spec_version self.spec_version = spec_version
super(ObservableProperty, self).__init__(*args, **kwargs) super(ObservableProperty, self).__init__(*args, **kwargs)
@ -640,7 +640,7 @@ class ExtensionsProperty(DictionaryProperty):
"""Property for representing extensions on Observable objects. """Property for representing extensions on Observable objects.
""" """
def __init__(self, spec_version=stix2.DEFAULT_VERSION, allow_custom=False, enclosing_type=None, required=False): def __init__(self, spec_version=DEFAULT_VERSION, allow_custom=False, enclosing_type=None, required=False):
self.allow_custom = allow_custom self.allow_custom = allow_custom
self.enclosing_type = enclosing_type self.enclosing_type = enclosing_type
super(ExtensionsProperty, self).__init__(spec_version=spec_version, required=required) super(ExtensionsProperty, self).__init__(spec_version=spec_version, required=required)
@ -682,7 +682,7 @@ class ExtensionsProperty(DictionaryProperty):
class STIXObjectProperty(Property): class STIXObjectProperty(Property):
def __init__(self, spec_version=stix2.DEFAULT_VERSION, allow_custom=False, *args, **kwargs): def __init__(self, spec_version=DEFAULT_VERSION, allow_custom=False, *args, **kwargs):
self.allow_custom = allow_custom self.allow_custom = allow_custom
self.spec_version = spec_version self.spec_version = spec_version
super(STIXObjectProperty, self).__init__(*args, **kwargs) super(STIXObjectProperty, self).__init__(*args, **kwargs)

210
stix2/registration.py Normal file
View File

@ -0,0 +1,210 @@
import re
from . import registry
from .base import _DomainObject, _Observable
from .exceptions import DuplicateRegistrationError
from .properties import _validate_type
from .utils import PREFIX_21_REGEX, get_class_hierarchy_names
from .version import DEFAULT_VERSION
def _register_object(new_type, version=DEFAULT_VERSION):
"""Register a custom STIX Object type.
Args:
new_type (class): A class to register in the Object map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Raises:
ValueError: If the class being registered wasn't created with the
@CustomObject decorator.
DuplicateRegistrationError: If the class has already been registered.
"""
if not issubclass(new_type, _DomainObject):
raise ValueError(
"'%s' must be created with the @CustomObject decorator." %
new_type.__name__,
)
properties = new_type._properties
if version == "2.1":
for prop_name, prop in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character" % prop_name)
if version:
v = 'v' + version.replace('.', '')
else:
# Use default version (latest) if no version was provided.
v = 'v' + DEFAULT_VERSION.replace('.', '')
OBJ_MAP = registry.STIX2_OBJ_MAPS[v]['objects']
if new_type._type in OBJ_MAP.keys():
raise DuplicateRegistrationError("STIX Object", new_type._type)
OBJ_MAP[new_type._type] = new_type
def _register_marking(new_marking, version=DEFAULT_VERSION):
"""Register a custom STIX Marking Definition type.
Args:
new_marking (class): A class to register in the Marking map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
mark_type = new_marking._type
properties = new_marking._properties
_validate_type(mark_type, version)
if version == "2.1":
for prop_name, prop_value in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
if version:
v = 'v' + version.replace('.', '')
else:
# Use default version (latest) if no version was provided.
v = 'v' + DEFAULT_VERSION.replace('.', '')
OBJ_MAP_MARKING = registry.STIX2_OBJ_MAPS[v]['markings']
if mark_type in OBJ_MAP_MARKING.keys():
raise DuplicateRegistrationError("STIX Marking", mark_type)
OBJ_MAP_MARKING[mark_type] = new_marking
def _register_observable(new_observable, version=DEFAULT_VERSION):
"""Register a custom STIX Cyber Observable type.
Args:
new_observable (class): A class to register in the Observables map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
properties = new_observable._properties
if version == "2.0":
# If using STIX2.0, check properties ending in "_ref/s" are ObjectReferenceProperties
for prop_name, prop in properties.items():
if prop_name.endswith('_ref') and ('ObjectReferenceProperty' not in get_class_hierarchy_names(prop)):
raise ValueError(
"'%s' is named like an object reference property but "
"is not an ObjectReferenceProperty." % prop_name,
)
elif (
prop_name.endswith('_refs') and (
'ListProperty' not in get_class_hierarchy_names(prop) or
'ObjectReferenceProperty' not in get_class_hierarchy_names(prop.contained)
)
):
raise ValueError(
"'%s' is named like an object reference list property but "
"is not a ListProperty containing ObjectReferenceProperty." % prop_name,
)
else:
# If using STIX2.1 (or newer...), check properties ending in "_ref/s" are ReferenceProperties
for prop_name, prop in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
elif prop_name.endswith('_ref') and ('ReferenceProperty' not in get_class_hierarchy_names(prop)):
raise ValueError(
"'%s' is named like a reference property but "
"is not a ReferenceProperty." % prop_name,
)
elif (
prop_name.endswith('_refs') and (
'ListProperty' not in get_class_hierarchy_names(prop) or
'ReferenceProperty' not in get_class_hierarchy_names(prop.contained)
)
):
raise ValueError(
"'%s' is named like a reference list property but "
"is not a ListProperty containing ReferenceProperty." % prop_name,
)
if version:
v = 'v' + version.replace('.', '')
else:
# Use default version (latest) if no version was provided.
v = 'v' + DEFAULT_VERSION.replace('.', '')
OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[v]['observables']
if new_observable._type in OBJ_MAP_OBSERVABLE.keys():
raise DuplicateRegistrationError("Cyber Observable", new_observable._type)
OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable
def _register_observable_extension(
observable, new_extension, version=DEFAULT_VERSION,
):
"""Register a custom extension to a STIX Cyber Observable type.
Args:
observable: An observable class or instance
new_extension (class): A class to register in the Observables
Extensions map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1").
Defaults to the latest supported version.
"""
obs_class = observable if isinstance(observable, type) else \
type(observable)
ext_type = new_extension._type
properties = new_extension._properties
if not issubclass(obs_class, _Observable):
raise ValueError("'observable' must be a valid Observable class!")
_validate_type(ext_type, version)
if not new_extension._properties:
raise ValueError(
"Invalid extension: must define at least one property: " +
ext_type,
)
if version == "2.1":
if not ext_type.endswith('-ext'):
raise ValueError(
"Invalid extension type name '%s': must end with '-ext'." %
ext_type,
)
for prop_name, prop_value in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)
v = 'v' + version.replace('.', '')
try:
observable_type = observable._type
except AttributeError:
raise ValueError(
"Unknown observable type. Custom observables must be "
"created with the @CustomObservable decorator.",
)
OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[v]['observables']
EXT_MAP = registry.STIX2_OBJ_MAPS[v]['observable-extensions']
try:
if ext_type in EXT_MAP[observable_type].keys():
raise DuplicateRegistrationError("Observable Extension", ext_type)
EXT_MAP[observable_type][ext_type] = new_extension
except KeyError:
if observable_type not in OBJ_MAP_OBSERVABLE:
raise ValueError(
"Unknown observable type '%s'. Custom observables "
"must be created with the @CustomObservable decorator."
% observable_type,
)
else:
EXT_MAP[observable_type] = {ext_type: new_extension}

28
stix2/registry.py Normal file
View File

@ -0,0 +1,28 @@
import importlib
import pkgutil
import re
# Collects information on which classes implement which STIX types, for the
# various STIX spec versions.
STIX2_OBJ_MAPS = {}
def _collect_stix2_mappings():
"""Navigate the package once and retrieve all object mapping dicts for each
v2X package. Includes OBJ_MAP, OBJ_MAP_OBSERVABLE, EXT_MAP."""
if not STIX2_OBJ_MAPS:
top_level_module = importlib.import_module('stix2')
path = top_level_module.__path__
prefix = str(top_level_module.__name__) + '.'
for module_loader, name, is_pkg in pkgutil.walk_packages(path=path, prefix=prefix):
ver = name.split('.')[1]
if re.match(r'^stix2\.v2[0-9]$', name) and is_pkg:
mod = importlib.import_module(name, str(top_level_module.__name__))
STIX2_OBJ_MAPS[ver] = {}
STIX2_OBJ_MAPS[ver]['objects'] = mod.OBJ_MAP
STIX2_OBJ_MAPS[ver]['observables'] = mod.OBJ_MAP_OBSERVABLE
STIX2_OBJ_MAPS[ver]['observable-extensions'] = mod.EXT_MAP
elif re.match(r'^stix2\.v2[0-9]\.common$', name) and is_pkg is False:
mod = importlib.import_module(name, str(top_level_module.__name__))
STIX2_OBJ_MAPS[ver]['markings'] = mod.OBJ_MAP_MARKING

View File

@ -1,7 +1,9 @@
import pytest import pytest
import stix2 import stix2
from stix2 import parsing import stix2.parsing
import stix2.registration
import stix2.registry
import stix2.v20 import stix2.v20
from ...exceptions import DuplicateRegistrationError, InvalidValueError from ...exceptions import DuplicateRegistrationError, InvalidValueError
@ -981,7 +983,7 @@ def test_register_custom_object():
_type = 'awesome-object' _type = 'awesome-object'
with pytest.raises(ValueError): with pytest.raises(ValueError):
stix2.parsing._register_object(CustomObject2, version="2.0") stix2.registration._register_object(CustomObject2, version="2.0")
def test_extension_property_location(): def test_extension_property_location():
@ -1041,10 +1043,10 @@ def test_register_custom_object_with_version():
"id": "x-new-type-2--00000000-0000-4000-8000-000000000007", "id": "x-new-type-2--00000000-0000-4000-8000-000000000007",
} }
cust_obj_1 = parsing.dict_to_stix2(custom_obj_1, version='2.0') cust_obj_1 = stix2.parsing.dict_to_stix2(custom_obj_1, version='2.0')
v = 'v20' v = 'v20'
assert cust_obj_1.type in parsing.STIX2_OBJ_MAPS[v]['objects'] assert cust_obj_1.type in stix2.registry.STIX2_OBJ_MAPS[v]['objects']
# spec_version is not in STIX 2.0, and is required in 2.1, so this # spec_version is not in STIX 2.0, and is required in 2.1, so this
# suffices as a test for a STIX 2.0 object. # suffices as a test for a STIX 2.0 object.
assert "spec_version" not in cust_obj_1 assert "spec_version" not in cust_obj_1
@ -1076,7 +1078,7 @@ def test_register_observable_with_version():
custom_obs = NewObservable2(property1="Test Observable") custom_obs = NewObservable2(property1="Test Observable")
v = 'v20' v = 'v20'
assert custom_obs.type in parsing.STIX2_OBJ_MAPS[v]['observables'] assert custom_obs.type in stix2.registry.STIX2_OBJ_MAPS[v]['observables']
def test_register_duplicate_observable_with_version(): def test_register_duplicate_observable_with_version():
@ -1102,7 +1104,7 @@ def test_register_marking_with_version():
v = 'v20' v = 'v20'
no = NewObj2(property1='something') no = NewObj2(property1='something')
assert no._type in parsing.STIX2_OBJ_MAPS[v]['markings'] assert no._type in stix2.registry.STIX2_OBJ_MAPS[v]['markings']
def test_register_observable_extension_with_version(): def test_register_observable_extension_with_version():
@ -1117,7 +1119,7 @@ def test_register_observable_extension_with_version():
v = 'v20' v = 'v20'
example = SomeCustomExtension2(keys='test123') example = SomeCustomExtension2(keys='test123')
assert example._type in parsing.STIX2_OBJ_MAPS[v]['observable-extensions']['user-account'] assert example._type in stix2.registry.STIX2_OBJ_MAPS[v]['observable-extensions']['user-account']
def test_register_duplicate_observable_extension(): def test_register_duplicate_observable_extension():

View File

@ -2,8 +2,7 @@ from collections import OrderedDict
import pytest import pytest
import stix2 from stix2 import DEFAULT_VERSION, exceptions, parsing, registration, registry
from stix2 import exceptions, parsing
BUNDLE = { BUNDLE = {
"type": "bundle", "type": "bundle",
@ -59,7 +58,7 @@ def test_parse_observable_with_version():
assert v in str(obs_obj.__class__) assert v in str(obs_obj.__class__)
@pytest.mark.xfail(reason="The default version is no longer 2.0", condition=stix2.DEFAULT_VERSION != "2.0") @pytest.mark.xfail(reason="The default version is no longer 2.0", condition=DEFAULT_VERSION != "2.0")
def test_parse_observable_with_no_version(): def test_parse_observable_with_no_version():
observable = {"type": "file", "name": "foo.exe"} observable = {"type": "file", "name": "foo.exe"}
obs_obj = parsing.parse_observable(observable) obs_obj = parsing.parse_observable(observable)
@ -73,8 +72,8 @@ def test_register_marking_with_version():
_type = 'x-new-marking1' _type = 'x-new-marking1'
_properties = OrderedDict() _properties = OrderedDict()
parsing._register_marking(NewMarking1, version='2.0') registration._register_marking(NewMarking1, version='2.0')
v = 'v20' v = 'v20'
assert NewMarking1._type in parsing.STIX2_OBJ_MAPS[v]['markings'] assert NewMarking1._type in registry.STIX2_OBJ_MAPS[v]['markings']
assert v in str(parsing.STIX2_OBJ_MAPS[v]['markings'][NewMarking1._type]) assert v in str(registry.STIX2_OBJ_MAPS[v]['markings'][NewMarking1._type])

View File

@ -4,6 +4,8 @@ import pytest
import stix2 import stix2
import stix2.base import stix2.base
import stix2.registration
import stix2.registry
import stix2.v21 import stix2.v21
from ...exceptions import DuplicateRegistrationError, InvalidValueError from ...exceptions import DuplicateRegistrationError, InvalidValueError
@ -1199,7 +1201,7 @@ def test_register_custom_object():
_type = 'awesome-object' _type = 'awesome-object'
with pytest.raises(ValueError) as excinfo: with pytest.raises(ValueError) as excinfo:
stix2.parsing._register_object(CustomObject2, version="2.1") stix2.registration._register_object(CustomObject2, version="2.1")
assert '@CustomObject decorator' in str(excinfo) assert '@CustomObject decorator' in str(excinfo)
@ -1265,7 +1267,7 @@ def test_register_custom_object_with_version():
cust_obj_1 = stix2.parsing.dict_to_stix2(custom_obj_1, version='2.1') cust_obj_1 = stix2.parsing.dict_to_stix2(custom_obj_1, version='2.1')
v = 'v21' v = 'v21'
assert cust_obj_1.type in stix2.parsing.STIX2_OBJ_MAPS[v]['objects'] assert cust_obj_1.type in stix2.registry.STIX2_OBJ_MAPS[v]['objects']
assert cust_obj_1.spec_version == "2.1" assert cust_obj_1.spec_version == "2.1"
@ -1295,7 +1297,7 @@ def test_register_observable():
custom_obs = NewObservable3(property1="Test Observable") custom_obs = NewObservable3(property1="Test Observable")
v = 'v21' v = 'v21'
assert custom_obs.type in stix2.parsing.STIX2_OBJ_MAPS[v]['observables'] assert custom_obs.type in stix2.registry.STIX2_OBJ_MAPS[v]['observables']
def test_register_duplicate_observable(): def test_register_duplicate_observable():
@ -1323,8 +1325,8 @@ def test_register_observable_custom_extension():
example = NewExtension2(property1="Hi there") example = NewExtension2(property1="Hi there")
v = 'v21' v = 'v21'
assert 'domain-name' in stix2.parsing.STIX2_OBJ_MAPS[v]['observables'] assert 'domain-name' in stix2.registry.STIX2_OBJ_MAPS[v]['observables']
assert example._type in stix2.parsing.STIX2_OBJ_MAPS[v]['observable-extensions']['domain-name'] assert example._type in stix2.registry.STIX2_OBJ_MAPS[v]['observable-extensions']['domain-name']
def test_register_duplicate_observable_extension(): def test_register_duplicate_observable_extension():

View File

@ -2,8 +2,7 @@ from collections import OrderedDict
import pytest import pytest
import stix2 from stix2 import DEFAULT_VERSION, exceptions, parsing, registration, registry
from stix2 import exceptions, parsing
BUNDLE = { BUNDLE = {
"type": "bundle", "type": "bundle",
@ -64,7 +63,7 @@ def test_parse_observable_with_version():
assert v in str(obs_obj.__class__) assert v in str(obs_obj.__class__)
@pytest.mark.xfail(reason="The default version is not 2.1", condition=stix2.DEFAULT_VERSION != "2.1") @pytest.mark.xfail(reason="The default version is not 2.1", condition=DEFAULT_VERSION != "2.1")
def test_parse_observable_with_no_version(): def test_parse_observable_with_no_version():
observable = {"type": "file", "name": "foo.exe", "spec_version": "2.1"} observable = {"type": "file", "name": "foo.exe", "spec_version": "2.1"}
obs_obj = parsing.parse_observable(observable) obs_obj = parsing.parse_observable(observable)
@ -78,22 +77,22 @@ def test_register_marking_with_version():
_type = 'x-new-marking1' _type = 'x-new-marking1'
_properties = OrderedDict() _properties = OrderedDict()
parsing._register_marking(NewMarking1, version='2.1') registration._register_marking(NewMarking1, version='2.1')
v = 'v21' v = 'v21'
assert NewMarking1._type in parsing.STIX2_OBJ_MAPS[v]['markings'] assert NewMarking1._type in registry.STIX2_OBJ_MAPS[v]['markings']
assert v in str(parsing.STIX2_OBJ_MAPS[v]['markings'][NewMarking1._type]) assert v in str(registry.STIX2_OBJ_MAPS[v]['markings'][NewMarking1._type])
@pytest.mark.xfail(reason="The default version is not 2.1", condition=stix2.DEFAULT_VERSION != "2.1") @pytest.mark.xfail(reason="The default version is not 2.1", condition=DEFAULT_VERSION != "2.1")
def test_register_marking_with_no_version(): def test_register_marking_with_no_version():
# Uses default version (2.1 in this case) # Uses default version (2.1 in this case)
class NewMarking2: class NewMarking2:
_type = 'x-new-marking2' _type = 'x-new-marking2'
_properties = OrderedDict() _properties = OrderedDict()
parsing._register_marking(NewMarking2) registration._register_marking(NewMarking2)
v = 'v21' v = 'v21'
assert NewMarking2._type in parsing.STIX2_OBJ_MAPS[v]['markings'] assert NewMarking2._type in registry.STIX2_OBJ_MAPS[v]['markings']
assert v in str(parsing.STIX2_OBJ_MAPS[v]['markings'][NewMarking2._type]) assert v in str(registry.STIX2_OBJ_MAPS[v]['markings'][NewMarking2._type])

View File

@ -1 +1,3 @@
__version__ = "2.1.0" __version__ = "2.1.0"
DEFAULT_VERSION = '2.1' # Default version will always be the latest STIX 2.X version

View File

@ -9,6 +9,7 @@ import six
from six.moves.collections_abc import Mapping from six.moves.collections_abc import Mapping
import stix2.base import stix2.base
import stix2.registry
from stix2.utils import get_timestamp, parse_into_datetime from stix2.utils import get_timestamp, parse_into_datetime
import stix2.v20 import stix2.v20
@ -109,7 +110,7 @@ def _is_versionable(data):
# registered class, and from that get a more complete picture of its # registered class, and from that get a more complete picture of its
# properties. # properties.
elif isinstance(data, dict): elif isinstance(data, dict):
class_maps = stix2.parsing.STIX2_OBJ_MAPS[stix_vid] class_maps = stix2.registry.STIX2_OBJ_MAPS[stix_vid]
obj_type = data["type"] obj_type = data["type"]
if obj_type in class_maps["objects"]: if obj_type in class_maps["objects"]:

View File

@ -22,8 +22,6 @@
import functools import functools
import stix2
from . import AttackPattern as _AttackPattern from . import AttackPattern as _AttackPattern
from . import Campaign as _Campaign from . import Campaign as _Campaign
from . import CourseOfAction as _CourseOfAction from . import CourseOfAction as _CourseOfAction
@ -36,12 +34,14 @@ from . import Location as _Location
from . import Malware as _Malware from . import Malware as _Malware
from . import MalwareAnalysis as _MalwareAnalysis from . import MalwareAnalysis as _MalwareAnalysis
from . import Note as _Note from . import Note as _Note
from . import OBJ_MAP
from . import ObservedData as _ObservedData from . import ObservedData as _ObservedData
from . import Opinion as _Opinion from . import Opinion as _Opinion
from . import Report as _Report from . import Report as _Report
from . import ThreatActor as _ThreatActor from . import ThreatActor as _ThreatActor
from . import Tool as _Tool from . import Tool as _Tool
from . import Vulnerability as _Vulnerability from . import Vulnerability as _Vulnerability
from .version import DEFAULT_VERSION
from . import ( # noqa: F401 isort:skip from . import ( # noqa: F401 isort:skip
AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem,
@ -64,7 +64,7 @@ from .datastore.filters import FilterSet # isort:skip
# Enable some adaptation to the current default supported STIX version. # Enable some adaptation to the current default supported STIX version.
_STIX_VID = "v" + stix2.DEFAULT_VERSION.replace(".", "") _STIX_VID = "v" + DEFAULT_VERSION.replace(".", "")
# Use an implicit MemoryStore # Use an implicit MemoryStore
@ -164,7 +164,7 @@ def _setup_workbench():
# Add our new "class" to this module's globals and to the library-wide # Add our new "class" to this module's globals and to the library-wide
# mapping. This allows parse() to use the wrapped classes. # mapping. This allows parse() to use the wrapped classes.
globals()[obj_type.__name__] = factory_func globals()[obj_type.__name__] = factory_func
stix2.OBJ_MAP[obj_type._type] = factory_func OBJ_MAP[obj_type._type] = factory_func
_setup_workbench() _setup_workbench()