From 03e19f197c1b297d47ef9333ab86ecfc52d809ba Mon Sep 17 00:00:00 2001 From: Emmanuelle Vargas-Gonzalez Date: Tue, 10 Jul 2018 14:56:31 -0400 Subject: [PATCH] Moved STIXDomainObject and STIXRelationshipObject here, the observable parsing code, minor changes to the mappings collections and all _register methods --- stix2/core.py | 187 +++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 171 insertions(+), 16 deletions(-) diff --git a/stix2/core.py b/stix2/core.py index 3ee0f6b..663b3e6 100644 --- a/stix2/core.py +++ b/stix2/core.py @@ -1,14 +1,25 @@ +import copy import importlib import pkgutil import stix2 -from .exceptions import ParseError +from .base import _STIXBase +from .exceptions import CustomContentError, ParseError +from .markings import _MarkingsMixin from .utils import _get_dict STIX2_OBJ_MAPS = {} +class STIXDomainObject(_STIXBase, _MarkingsMixin): + pass + + +class STIXRelationshipObject(_STIXBase, _MarkingsMixin): + pass + + def parse(data, allow_custom=False, version=None): """Convert a string, dict or file-like object into a STIX object. @@ -76,7 +87,10 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None): if 'type' not in stix_dict: raise ParseError("Can't parse object with no 'type' property: %s" % str(stix_dict)) - if 'spec_version' in stix_dict: + if version: + # If the version argument was passed, override other approaches. + v = 'v' + version.replace('.', '') + elif 'spec_version' in stix_dict: # For STIX 2.0, applies to bundles only. # For STIX 2.1+, applies to SDOs, SROs, and markings only. v = 'v' + stix_dict['spec_version'].replace('.', '') @@ -87,9 +101,11 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None): else: v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') else: - v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') + # The spec says that SDO/SROs without spec_version will default to a + # '2.0' representation. + v = 'v20' - OBJ_MAP = STIX2_OBJ_MAPS[v] + OBJ_MAP = STIX2_OBJ_MAPS[v]['objects'] try: obj_class = OBJ_MAP[stix_dict['type']] @@ -103,7 +119,69 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None): return obj_class(allow_custom=allow_custom, **stix_dict) -def _register_type(new_type, version=None): +def parse_observable(data, _valid_refs=None, allow_custom=False, version=None): + """Deserialize a string or file-like object into a STIX Cyber Observable + object. + + Args: + data: The STIX 2 string to be parsed. + _valid_refs: A list of object references valid for the scope of the + object being parsed. Use empty list if no valid refs are present. + allow_custom (bool): Whether to allow custom properties or not. + Default: False. + version (str): If the spec version is missing, the latest supported + stix2 version will be used to parse the observable object. + + Returns: + An instantiated Python STIX Cyber Observable object. + """ + obj = _get_dict(data) + # get deep copy since we are going modify the dict and might + # modify the original dict as _get_dict() does not return new + # dict when passed a dict + obj = copy.deepcopy(obj) + + obj['_valid_refs'] = _valid_refs or [] + + if version: + # If the version argument was passed, override other approaches. + v = 'v' + version.replace('.', '') + elif 'spec_version' in obj: + v = 'v' + obj['spec_version'].replace('.', '') + else: + # Use default version (latest) if no version was provided. + v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') + + if 'type' not in obj: + raise ParseError("Can't parse observable with no 'type' property: %s" % str(obj)) + try: + OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables'] + obj_class = OBJ_MAP_OBSERVABLE[obj['type']] + except KeyError: + if allow_custom: + # flag allows for unknown custom objects too, but will not + # be parsed into STIX observable object, just returned as is + return obj + raise CustomContentError("Can't parse unknown observable type '%s'! For custom observables, " + "use the CustomObservable decorator." % obj['type']) + + EXT_MAP = STIX2_OBJ_MAPS[v]['observable-extensions'] + + if 'extensions' in obj and obj['type'] in EXT_MAP: + for name, ext in obj['extensions'].items(): + try: + ext_class = EXT_MAP[obj['type']][name] + except KeyError: + if not allow_custom: + raise CustomContentError("Can't parse unknown extension type '%s'" + "for observable type '%s'!" % (name, obj['type'])) + else: # extension was found + obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name]) + + return obj_class(allow_custom=allow_custom, **obj) + + +def _register_object(new_type, version=None): """Register a custom STIX Object type. Args: @@ -111,26 +189,103 @@ def _register_type(new_type, version=None): version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, use latest version. """ - if not version: - # Use latest version - v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') - else: + if version: v = 'v' + version.replace('.', '') + else: + # Use default version (latest) if no version was provided. + v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') - OBJ_MAP = STIX2_OBJ_MAPS[v] + OBJ_MAP = STIX2_OBJ_MAPS[v]['objects'] OBJ_MAP[new_type._type] = new_type -def _collect_stix2_obj_maps(): - """Navigate the package once and retrieve all OBJ_MAP dicts for each v2X - package.""" +def _register_marking(new_marking, version=None): + """Register a custom STIX Marking Definition type. + + Args: + new_marking (class): A class to register in the Marking map. + version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If + None, use latest version. + """ + if version: + v = 'v' + version.replace('.', '') + else: + # Use default version (latest) if no version was provided. + v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') + + OBJ_MAP_MARKING = STIX2_OBJ_MAPS[v]['markings'] + OBJ_MAP_MARKING[new_marking._type] = new_marking + + +def _register_observable(new_observable, version=None): + """Register a custom STIX Cyber Observable type. + + Args: + new_observable (class): A class to register in the Observables map. + version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If + None, use latest version. + """ + if version: + v = 'v' + version.replace('.', '') + else: + # Use default version (latest) if no version was provided. + v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') + + OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables'] + OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable + + +def _register_observable_extension(observable, new_extension, version=None): + """Register a custom extension to a STIX Cyber Observable type. + + Args: + observable: An observable object + new_extension (class): A class to register in the Observables + Extensions map. + version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If + None, use latest version. + """ + if version: + v = 'v' + version.replace('.', '') + else: + # Use default version (latest) if no version was provided. + v = 'v' + stix2.DEFAULT_VERSION.replace('.', '') + + try: + observable_type = observable._type + except AttributeError: + raise ValueError("Unknown observable type. Custom observables must be " + "created with the @CustomObservable decorator.") + + OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables'] + EXT_MAP = STIX2_OBJ_MAPS[v]['observable-extensions'] + + try: + EXT_MAP[observable_type][new_extension._type] = new_extension + except KeyError: + if observable_type not in OBJ_MAP_OBSERVABLE: + raise ValueError("Unknown observable type '%s'. Custom observables " + "must be created with the @CustomObservable decorator." + % observable_type) + else: + EXT_MAP[observable_type] = {new_extension._type: new_extension} + + +def _collect_stix2_mappings(): + """Navigate the package once and retrieve all object mapping dicts for each + v2X package. Includes OBJ_MAP, OBJ_MAP_OBSERVABLE, EXT_MAP.""" if not STIX2_OBJ_MAPS: top_level_module = importlib.import_module('stix2') path = top_level_module.__path__ prefix = str(top_level_module.__name__) + '.' - for module_loader, name, is_pkg in pkgutil.walk_packages(path=path, - prefix=prefix): + for module_loader, name, is_pkg in pkgutil.walk_packages(path=path, prefix=prefix): if name.startswith('stix2.v2') and is_pkg: mod = importlib.import_module(name, str(top_level_module.__name__)) - STIX2_OBJ_MAPS[name.split('.')[-1]] = mod.OBJ_MAP + STIX2_OBJ_MAPS[name.split('.')[1]] = {} + STIX2_OBJ_MAPS[name.split('.')[1]]['objects'] = mod.OBJ_MAP + STIX2_OBJ_MAPS[name.split('.')[1]]['observables'] = mod.OBJ_MAP_OBSERVABLE + STIX2_OBJ_MAPS[name.split('.')[1]]['observable-extensions'] = mod.EXT_MAP + elif name.startswith('stix2.v2') and name.endswith('.common') and is_pkg is False: + mod = importlib.import_module(name, str(top_level_module.__name__)) + STIX2_OBJ_MAPS[name.split('.')[1]]['markings'] = mod.OBJ_MAP_MARKING