Moved STIXDomainObject and STIXRelationshipObject here, the observable
parsing code, minor changes to the mappings collections and all _register methodsstix2.1
parent
b76888c682
commit
03e19f197c
187
stix2/core.py
187
stix2/core.py
|
@ -1,14 +1,25 @@
|
|||
import copy
|
||||
import importlib
|
||||
import pkgutil
|
||||
|
||||
import stix2
|
||||
|
||||
from .exceptions import ParseError
|
||||
from .base import _STIXBase
|
||||
from .exceptions import CustomContentError, ParseError
|
||||
from .markings import _MarkingsMixin
|
||||
from .utils import _get_dict
|
||||
|
||||
STIX2_OBJ_MAPS = {}
|
||||
|
||||
|
||||
class STIXDomainObject(_STIXBase, _MarkingsMixin):
|
||||
pass
|
||||
|
||||
|
||||
class STIXRelationshipObject(_STIXBase, _MarkingsMixin):
|
||||
pass
|
||||
|
||||
|
||||
def parse(data, allow_custom=False, version=None):
|
||||
"""Convert a string, dict or file-like object into a STIX object.
|
||||
|
||||
|
@ -76,7 +87,10 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None):
|
|||
if 'type' not in stix_dict:
|
||||
raise ParseError("Can't parse object with no 'type' property: %s" % str(stix_dict))
|
||||
|
||||
if 'spec_version' in stix_dict:
|
||||
if version:
|
||||
# If the version argument was passed, override other approaches.
|
||||
v = 'v' + version.replace('.', '')
|
||||
elif 'spec_version' in stix_dict:
|
||||
# For STIX 2.0, applies to bundles only.
|
||||
# For STIX 2.1+, applies to SDOs, SROs, and markings only.
|
||||
v = 'v' + stix_dict['spec_version'].replace('.', '')
|
||||
|
@ -87,9 +101,11 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None):
|
|||
else:
|
||||
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
|
||||
else:
|
||||
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
|
||||
# The spec says that SDO/SROs without spec_version will default to a
|
||||
# '2.0' representation.
|
||||
v = 'v20'
|
||||
|
||||
OBJ_MAP = STIX2_OBJ_MAPS[v]
|
||||
OBJ_MAP = STIX2_OBJ_MAPS[v]['objects']
|
||||
|
||||
try:
|
||||
obj_class = OBJ_MAP[stix_dict['type']]
|
||||
|
@ -103,7 +119,69 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None):
|
|||
return obj_class(allow_custom=allow_custom, **stix_dict)
|
||||
|
||||
|
||||
def _register_type(new_type, version=None):
|
||||
def parse_observable(data, _valid_refs=None, allow_custom=False, version=None):
|
||||
"""Deserialize a string or file-like object into a STIX Cyber Observable
|
||||
object.
|
||||
|
||||
Args:
|
||||
data: The STIX 2 string to be parsed.
|
||||
_valid_refs: A list of object references valid for the scope of the
|
||||
object being parsed. Use empty list if no valid refs are present.
|
||||
allow_custom (bool): Whether to allow custom properties or not.
|
||||
Default: False.
|
||||
version (str): If the spec version is missing, the latest supported
|
||||
stix2 version will be used to parse the observable object.
|
||||
|
||||
Returns:
|
||||
An instantiated Python STIX Cyber Observable object.
|
||||
"""
|
||||
obj = _get_dict(data)
|
||||
# get deep copy since we are going modify the dict and might
|
||||
# modify the original dict as _get_dict() does not return new
|
||||
# dict when passed a dict
|
||||
obj = copy.deepcopy(obj)
|
||||
|
||||
obj['_valid_refs'] = _valid_refs or []
|
||||
|
||||
if version:
|
||||
# If the version argument was passed, override other approaches.
|
||||
v = 'v' + version.replace('.', '')
|
||||
elif 'spec_version' in obj:
|
||||
v = 'v' + obj['spec_version'].replace('.', '')
|
||||
else:
|
||||
# Use default version (latest) if no version was provided.
|
||||
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
|
||||
|
||||
if 'type' not in obj:
|
||||
raise ParseError("Can't parse observable with no 'type' property: %s" % str(obj))
|
||||
try:
|
||||
OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables']
|
||||
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
|
||||
except KeyError:
|
||||
if allow_custom:
|
||||
# flag allows for unknown custom objects too, but will not
|
||||
# be parsed into STIX observable object, just returned as is
|
||||
return obj
|
||||
raise CustomContentError("Can't parse unknown observable type '%s'! For custom observables, "
|
||||
"use the CustomObservable decorator." % obj['type'])
|
||||
|
||||
EXT_MAP = STIX2_OBJ_MAPS[v]['observable-extensions']
|
||||
|
||||
if 'extensions' in obj and obj['type'] in EXT_MAP:
|
||||
for name, ext in obj['extensions'].items():
|
||||
try:
|
||||
ext_class = EXT_MAP[obj['type']][name]
|
||||
except KeyError:
|
||||
if not allow_custom:
|
||||
raise CustomContentError("Can't parse unknown extension type '%s'"
|
||||
"for observable type '%s'!" % (name, obj['type']))
|
||||
else: # extension was found
|
||||
obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name])
|
||||
|
||||
return obj_class(allow_custom=allow_custom, **obj)
|
||||
|
||||
|
||||
def _register_object(new_type, version=None):
|
||||
"""Register a custom STIX Object type.
|
||||
|
||||
Args:
|
||||
|
@ -111,26 +189,103 @@ def _register_type(new_type, version=None):
|
|||
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
|
||||
None, use latest version.
|
||||
"""
|
||||
if not version:
|
||||
# Use latest version
|
||||
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
|
||||
else:
|
||||
if version:
|
||||
v = 'v' + version.replace('.', '')
|
||||
else:
|
||||
# Use default version (latest) if no version was provided.
|
||||
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
|
||||
|
||||
OBJ_MAP = STIX2_OBJ_MAPS[v]
|
||||
OBJ_MAP = STIX2_OBJ_MAPS[v]['objects']
|
||||
OBJ_MAP[new_type._type] = new_type
|
||||
|
||||
|
||||
def _collect_stix2_obj_maps():
|
||||
"""Navigate the package once and retrieve all OBJ_MAP dicts for each v2X
|
||||
package."""
|
||||
def _register_marking(new_marking, version=None):
|
||||
"""Register a custom STIX Marking Definition type.
|
||||
|
||||
Args:
|
||||
new_marking (class): A class to register in the Marking map.
|
||||
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
|
||||
None, use latest version.
|
||||
"""
|
||||
if version:
|
||||
v = 'v' + version.replace('.', '')
|
||||
else:
|
||||
# Use default version (latest) if no version was provided.
|
||||
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
|
||||
|
||||
OBJ_MAP_MARKING = STIX2_OBJ_MAPS[v]['markings']
|
||||
OBJ_MAP_MARKING[new_marking._type] = new_marking
|
||||
|
||||
|
||||
def _register_observable(new_observable, version=None):
|
||||
"""Register a custom STIX Cyber Observable type.
|
||||
|
||||
Args:
|
||||
new_observable (class): A class to register in the Observables map.
|
||||
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
|
||||
None, use latest version.
|
||||
"""
|
||||
if version:
|
||||
v = 'v' + version.replace('.', '')
|
||||
else:
|
||||
# Use default version (latest) if no version was provided.
|
||||
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
|
||||
|
||||
OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables']
|
||||
OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable
|
||||
|
||||
|
||||
def _register_observable_extension(observable, new_extension, version=None):
|
||||
"""Register a custom extension to a STIX Cyber Observable type.
|
||||
|
||||
Args:
|
||||
observable: An observable object
|
||||
new_extension (class): A class to register in the Observables
|
||||
Extensions map.
|
||||
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
|
||||
None, use latest version.
|
||||
"""
|
||||
if version:
|
||||
v = 'v' + version.replace('.', '')
|
||||
else:
|
||||
# Use default version (latest) if no version was provided.
|
||||
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
|
||||
|
||||
try:
|
||||
observable_type = observable._type
|
||||
except AttributeError:
|
||||
raise ValueError("Unknown observable type. Custom observables must be "
|
||||
"created with the @CustomObservable decorator.")
|
||||
|
||||
OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables']
|
||||
EXT_MAP = STIX2_OBJ_MAPS[v]['observable-extensions']
|
||||
|
||||
try:
|
||||
EXT_MAP[observable_type][new_extension._type] = new_extension
|
||||
except KeyError:
|
||||
if observable_type not in OBJ_MAP_OBSERVABLE:
|
||||
raise ValueError("Unknown observable type '%s'. Custom observables "
|
||||
"must be created with the @CustomObservable decorator."
|
||||
% observable_type)
|
||||
else:
|
||||
EXT_MAP[observable_type] = {new_extension._type: new_extension}
|
||||
|
||||
|
||||
def _collect_stix2_mappings():
|
||||
"""Navigate the package once and retrieve all object mapping dicts for each
|
||||
v2X package. Includes OBJ_MAP, OBJ_MAP_OBSERVABLE, EXT_MAP."""
|
||||
if not STIX2_OBJ_MAPS:
|
||||
top_level_module = importlib.import_module('stix2')
|
||||
path = top_level_module.__path__
|
||||
prefix = str(top_level_module.__name__) + '.'
|
||||
|
||||
for module_loader, name, is_pkg in pkgutil.walk_packages(path=path,
|
||||
prefix=prefix):
|
||||
for module_loader, name, is_pkg in pkgutil.walk_packages(path=path, prefix=prefix):
|
||||
if name.startswith('stix2.v2') and is_pkg:
|
||||
mod = importlib.import_module(name, str(top_level_module.__name__))
|
||||
STIX2_OBJ_MAPS[name.split('.')[-1]] = mod.OBJ_MAP
|
||||
STIX2_OBJ_MAPS[name.split('.')[1]] = {}
|
||||
STIX2_OBJ_MAPS[name.split('.')[1]]['objects'] = mod.OBJ_MAP
|
||||
STIX2_OBJ_MAPS[name.split('.')[1]]['observables'] = mod.OBJ_MAP_OBSERVABLE
|
||||
STIX2_OBJ_MAPS[name.split('.')[1]]['observable-extensions'] = mod.EXT_MAP
|
||||
elif name.startswith('stix2.v2') and name.endswith('.common') and is_pkg is False:
|
||||
mod = importlib.import_module(name, str(top_level_module.__name__))
|
||||
STIX2_OBJ_MAPS[name.split('.')[1]]['markings'] = mod.OBJ_MAP_MARKING
|
||||
|
|
Loading…
Reference in New Issue