diff --git a/docs/api/stix2.serialization.rst b/docs/api/stix2.serialization.rst new file mode 100644 index 0000000..bc182d8 --- /dev/null +++ b/docs/api/stix2.serialization.rst @@ -0,0 +1,5 @@ +serialization +================ + +.. automodule:: stix2.serialization + :members: diff --git a/stix2/__init__.py b/stix2/__init__.py index d0051ee..97790aa 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -12,6 +12,7 @@ pattern_visitor patterns properties + serialization utils v20 v21 diff --git a/stix2/base.py b/stix2/base.py index 33374bc..21b6011 100644 --- a/stix2/base.py +++ b/stix2/base.py @@ -1,7 +1,6 @@ """Base classes for type definitions in the STIX2 library.""" import copy -import datetime as dt import re import uuid @@ -18,9 +17,10 @@ from .exceptions import ( ) from .markings import _MarkingsMixin from .markings.utils import validate -from .utils import ( - NOW, PREFIX_21_REGEX, find_property_index, format_datetime, get_timestamp, +from .serialization import ( + STIXJSONEncoder, STIXJSONIncludeOptionalDefaultsEncoder, serialize, ) +from .utils import NOW, PREFIX_21_REGEX, get_timestamp from .versioning import new_version as _new_version from .versioning import revoke as _revoke @@ -29,51 +29,14 @@ try: except ImportError: from collections import Mapping - -__all__ = ['STIXJSONEncoder', '_STIXBase'] +# TODO: Remove STIXJSONEncoder, STIXJSONIncludeOptionalDefaultsEncoder, serialize from __all__ on next major release. +# Kept for backwards compatibility. +__all__ = ['STIXJSONEncoder', 'STIXJSONIncludeOptionalDefaultsEncoder', '_STIXBase', 'serialize'] DEFAULT_ERROR = "{type} must have {property}='{expected}'." SCO_DET_ID_NAMESPACE = uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7") -class STIXJSONEncoder(json.JSONEncoder): - """Custom JSONEncoder subclass for serializing Python ``stix2`` objects. - - If an optional property with a default value specified in the STIX 2 spec - is set to that default value, it will be left out of the serialized output. - - An example of this type of property include the ``revoked`` common property. - """ - - def default(self, obj): - if isinstance(obj, (dt.date, dt.datetime)): - return format_datetime(obj) - elif isinstance(obj, _STIXBase): - tmp_obj = dict(copy.deepcopy(obj)) - for prop_name in obj._defaulted_optional_properties: - del tmp_obj[prop_name] - return tmp_obj - else: - return super(STIXJSONEncoder, self).default(obj) - - -class STIXJSONIncludeOptionalDefaultsEncoder(json.JSONEncoder): - """Custom JSONEncoder subclass for serializing Python ``stix2`` objects. - - Differs from ``STIXJSONEncoder`` in that if an optional property with a default - value specified in the STIX 2 spec is set to that default value, it will be - included in the serialized output. - """ - - def default(self, obj): - if isinstance(obj, (dt.date, dt.datetime)): - return format_datetime(obj) - elif isinstance(obj, _STIXBase): - return dict(obj) - else: - return super(STIXJSONIncludeOptionalDefaultsEncoder, self).default(obj) - - def get_required_properties(properties): return (k for k, v in properties.items() if v.required) @@ -270,18 +233,10 @@ class _STIXBase(Mapping): def revoke(self): return _revoke(self) - def serialize(self, pretty=False, include_optional_defaults=False, **kwargs): + def serialize(self, *args, **kwargs): """ Serialize a STIX object. - Args: - pretty (bool): If True, output properties following the STIX specs - formatting. This includes indentation. Refer to notes for more - details. (Default: ``False``) - include_optional_defaults (bool): Determines whether to include - optional properties set to the default value defined in the spec. - **kwargs: The arguments for a json.dumps() call. - Examples: >>> import stix2 >>> identity = stix2.Identity(name='Example Corp.', identity_class='organization') @@ -300,25 +255,10 @@ class _STIXBase(Mapping): Returns: str: The serialized JSON object. - Note: - The argument ``pretty=True`` will output the STIX object following - spec order. Using this argument greatly impacts object serialization - performance. If your use case is centered across machine-to-machine - operation it is recommended to set ``pretty=False``. - - When ``pretty=True`` the following key-value pairs will be added or - overridden: indent=4, separators=(",", ": "), item_sort_key=sort_by. + See Also: + ``stix2.serialization.serialize`` for options. """ - if pretty: - def sort_by(element): - return find_property_index(self, *element) - - kwargs.update({'indent': 4, 'separators': (',', ': '), 'item_sort_key': sort_by}) - - if include_optional_defaults: - return json.dumps(self, cls=STIXJSONIncludeOptionalDefaultsEncoder, **kwargs) - else: - return json.dumps(self, cls=STIXJSONEncoder, **kwargs) + return serialize(self, *args, **kwargs) class _DomainObject(_STIXBase, _MarkingsMixin): diff --git a/stix2/datastore/filesystem.py b/stix2/datastore/filesystem.py index 5a5844a..d865768 100644 --- a/stix2/datastore/filesystem.py +++ b/stix2/datastore/filesystem.py @@ -15,7 +15,8 @@ from stix2.datastore import ( ) from stix2.datastore.filters import Filter, FilterSet, apply_common_filters from stix2.parsing import parse -from stix2.utils import format_datetime, get_type_from_id +from stix2.serialization import serialize +from stix2.utils import format_datetime, get_type_from_id, parse_into_datetime def _timestamp2filename(timestamp): @@ -24,10 +25,12 @@ def _timestamp2filename(timestamp): "modified" property value. This should not include an extension. Args: - timestamp: A timestamp, as a datetime.datetime object. + timestamp: A timestamp, as a datetime.datetime object or string. """ # The format_datetime will determine the correct level of precision. + if isinstance(timestamp, str): + timestamp = parse_into_datetime(timestamp) ts = format_datetime(timestamp) ts = re.sub(r"[-T:\.Z ]", "", ts) return ts @@ -582,10 +585,10 @@ class FileSystemSink(DataSink): if os.path.isfile(file_path): raise DataSourceError("Attempted to overwrite file (!) at: {}".format(file_path)) - else: - with io.open(file_path, 'w', encoding=encoding) as f: - stix_obj = stix_obj.serialize(pretty=True, encoding=encoding, ensure_ascii=False) - f.write(stix_obj) + + with io.open(file_path, 'w', encoding=encoding) as f: + stix_obj = serialize(stix_obj, pretty=True, encoding=encoding, ensure_ascii=False) + f.write(stix_obj) def add(self, stix_data=None, version=None): """Add STIX objects to file directory. @@ -614,8 +617,12 @@ class FileSystemSink(DataSink): self._check_path_and_write(stix_data) elif isinstance(stix_data, (str, dict)): - stix_data = parse(stix_data, allow_custom=self.allow_custom, version=version) - self.add(stix_data, version=version) + parsed_data = parse(stix_data, allow_custom=self.allow_custom, version=version) + if isinstance(parsed_data, _STIXBase): + self.add(parsed_data, version=version) + else: + # custom unregistered object type + self._check_path_and_write(parsed_data) elif isinstance(stix_data, list): # recursively add individual STIX objects diff --git a/stix2/serialization.py b/stix2/serialization.py new file mode 100644 index 0000000..7488eb5 --- /dev/null +++ b/stix2/serialization.py @@ -0,0 +1,162 @@ +"""STIX2 core serialization methods.""" + +import copy +import datetime as dt + +import simplejson as json + +import stix2.base + +from .utils import format_datetime + + +class STIXJSONEncoder(json.JSONEncoder): + """Custom JSONEncoder subclass for serializing Python ``stix2`` objects. + + If an optional property with a default value specified in the STIX 2 spec + is set to that default value, it will be left out of the serialized output. + + An example of this type of property include the ``revoked`` common property. + """ + + def default(self, obj): + if isinstance(obj, (dt.date, dt.datetime)): + return format_datetime(obj) + elif isinstance(obj, stix2.base._STIXBase): + tmp_obj = dict(copy.deepcopy(obj)) + for prop_name in obj._defaulted_optional_properties: + del tmp_obj[prop_name] + return tmp_obj + else: + return super(STIXJSONEncoder, self).default(obj) + + +class STIXJSONIncludeOptionalDefaultsEncoder(json.JSONEncoder): + """Custom JSONEncoder subclass for serializing Python ``stix2`` objects. + + Differs from ``STIXJSONEncoder`` in that if an optional property with a default + value specified in the STIX 2 spec is set to that default value, it will be + included in the serialized output. + """ + + def default(self, obj): + if isinstance(obj, (dt.date, dt.datetime)): + return format_datetime(obj) + elif isinstance(obj, stix2.base._STIXBase): + return dict(obj) + else: + return super(STIXJSONIncludeOptionalDefaultsEncoder, self).default(obj) + + +def serialize(obj, pretty=False, include_optional_defaults=False, **kwargs): + """ + Serialize a STIX object. + + Args: + obj: The STIX object to be serialized. + pretty (bool): If True, output properties following the STIX specs + formatting. This includes indentation. Refer to notes for more + details. (Default: ``False``) + include_optional_defaults (bool): Determines whether to include + optional properties set to the default value defined in the spec. + **kwargs: The arguments for a json.dumps() call. + + Returns: + str: The serialized JSON object. + + Note: + The argument ``pretty=True`` will output the STIX object following + spec order. Using this argument greatly impacts object serialization + performance. If your use case is centered across machine-to-machine + operation it is recommended to set ``pretty=False``. + + When ``pretty=True`` the following key-value pairs will be added or + overridden: indent=4, separators=(",", ": "), item_sort_key=sort_by. + """ + if pretty: + def sort_by(element): + return find_property_index(obj, *element) + + kwargs.update({'indent': 4, 'separators': (',', ': '), 'item_sort_key': sort_by}) + + if include_optional_defaults: + return json.dumps(obj, cls=STIXJSONIncludeOptionalDefaultsEncoder, **kwargs) + else: + return json.dumps(obj, cls=STIXJSONEncoder, **kwargs) + + +def _find(seq, val): + """ + Search sequence 'seq' for val. This behaves like str.find(): if not found, + -1 is returned instead of throwing an exception. + + Args: + seq: The sequence to search + val: The value to search for + + Returns: + int: The index of the value if found, or -1 if not found + """ + try: + return seq.index(val) + except ValueError: + return -1 + + +def _find_property_in_seq(seq, search_key, search_value): + """ + Helper for find_property_index(): search for the property in all elements + of the given sequence. + + Args: + seq: The sequence + search_key: Property name to find + search_value: Property value to find + + Returns: + int: A property index, or -1 if the property was not found + """ + idx = -1 + for elem in seq: + idx = find_property_index(elem, search_key, search_value) + if idx >= 0: + break + + return idx + + +def find_property_index(obj, search_key, search_value): + """ + Search (recursively) for the given key and value in the given object. + Return an index for the key, relative to whatever object it's found in. + + Args: + obj: The object to search (list, dict, or stix object) + search_key: A search key + search_value: A search value + + Returns: + int: An index; -1 if the key and value aren't found + """ + # Special-case keys which are numbers-as-strings, e.g. for cyber-observable + # mappings. Use the int value of the key as the index. + if search_key.isdigit(): + return int(search_key) + + if isinstance(obj, stix2.base._STIXBase): + if search_key in obj and obj[search_key] == search_value: + idx = _find(obj.object_properties(), search_key) + else: + idx = _find_property_in_seq(obj.values(), search_key, search_value) + elif isinstance(obj, dict): + if search_key in obj and obj[search_key] == search_value: + idx = _find(sorted(obj), search_key) + else: + idx = _find_property_in_seq(obj.values(), search_key, search_value) + elif isinstance(obj, list): + idx = _find_property_in_seq(obj, search_key, search_value) + else: + # Don't know how to search this type + idx = -1 + + return idx diff --git a/stix2/test/v20/test_datastore_filesystem.py b/stix2/test/v20/test_datastore_filesystem.py index 25207dc..7ce3ecf 100644 --- a/stix2/test/v20/test_datastore_filesystem.py +++ b/stix2/test/v20/test_datastore_filesystem.py @@ -633,6 +633,26 @@ def test_filesystem_object_with_custom_property_in_bundle(fs_store): assert camp_r.x_empire == camp.x_empire +def test_filesystem_custom_object_dict(fs_store): + fs_store.sink.allow_custom = True + newobj = { + "type": "x-new-obj-2", + "id": "x-new-obj-2--d08dc866-6149-47db-aae6-7b58a827e7f0", + "created": "2020-07-20T03:45:02.879Z", + "modified": "2020-07-20T03:45:02.879Z", + "property1": "something", + } + fs_store.add(newobj) + + newobj_r = fs_store.get(newobj["id"]) + assert newobj_r["id"] == newobj["id"] + assert newobj_r["property1"] == 'something' + + # remove dir + shutil.rmtree(os.path.join(FS_PATH, "x-new-obj-2"), True) + fs_store.sink.allow_custom = False + + def test_filesystem_custom_object(fs_store): @stix2.v20.CustomObject( 'x-new-obj-2', [ diff --git a/stix2/test/v20/test_utils.py b/stix2/test/v20/test_utils.py index 67750de..9372bbb 100644 --- a/stix2/test/v20/test_utils.py +++ b/stix2/test/v20/test_utils.py @@ -6,6 +6,7 @@ from io import StringIO import pytest import pytz +import stix2.serialization import stix2.utils from .constants import IDENTITY_ID @@ -198,7 +199,7 @@ def test_deduplicate(stix_objs1): ], ) def test_find_property_index(object, tuple_to_find, expected_index): - assert stix2.utils.find_property_index( + assert stix2.serialization.find_property_index( object, *tuple_to_find ) == expected_index @@ -235,4 +236,4 @@ def test_find_property_index(object, tuple_to_find, expected_index): ], ) def test_iterate_over_values(dict_value, tuple_to_find, expected_index): - assert stix2.utils._find_property_in_seq(dict_value.values(), *tuple_to_find) == expected_index + assert stix2.serialization._find_property_in_seq(dict_value.values(), *tuple_to_find) == expected_index diff --git a/stix2/test/v21/test_datastore_filesystem.py b/stix2/test/v21/test_datastore_filesystem.py index 123fd7a..3eb20b5 100644 --- a/stix2/test/v21/test_datastore_filesystem.py +++ b/stix2/test/v21/test_datastore_filesystem.py @@ -654,6 +654,27 @@ def test_filesystem_object_with_custom_property_in_bundle(fs_store): assert camp_r.x_empire == camp.x_empire +def test_filesystem_custom_object_dict(fs_store): + fs_store.sink.allow_custom = True + newobj = { + "type": "x-new-obj-2", + "id": "x-new-obj-2--d08dc866-6149-47db-aae6-7b58a827e7f0", + "spec_version": "2.1", + "created": "2020-07-20T03:45:02.879Z", + "modified": "2020-07-20T03:45:02.879Z", + "property1": "something", + } + fs_store.add(newobj) + + newobj_r = fs_store.get(newobj["id"]) + assert newobj_r["id"] == newobj["id"] + assert newobj_r["property1"] == 'something' + + # remove dir + shutil.rmtree(os.path.join(FS_PATH, "x-new-obj-2"), True) + fs_store.sink.allow_custom = False + + def test_filesystem_custom_object(fs_store): @stix2.v21.CustomObject( 'x-new-obj-2', [ diff --git a/stix2/test/v21/test_utils.py b/stix2/test/v21/test_utils.py index f81c93f..03477aa 100644 --- a/stix2/test/v21/test_utils.py +++ b/stix2/test/v21/test_utils.py @@ -6,6 +6,7 @@ from io import StringIO import pytest import pytz +import stix2.serialization import stix2.utils from .constants import IDENTITY_ID @@ -201,7 +202,7 @@ def test_deduplicate(stix_objs1): ], ) def test_find_property_index(object, tuple_to_find, expected_index): - assert stix2.utils.find_property_index( + assert stix2.serialization.find_property_index( object, *tuple_to_find ) == expected_index @@ -238,4 +239,4 @@ def test_find_property_index(object, tuple_to_find, expected_index): ], ) def test_iterate_over_values(dict_value, tuple_to_find, expected_index): - assert stix2.utils._find_property_in_seq(dict_value.values(), *tuple_to_find) == expected_index + assert stix2.serialization._find_property_in_seq(dict_value.values(), *tuple_to_find) == expected_index diff --git a/stix2/utils.py b/stix2/utils.py index 7a8d8cb..f741581 100644 --- a/stix2/utils.py +++ b/stix2/utils.py @@ -298,83 +298,6 @@ def _get_dict(data): raise ValueError("Cannot convert '%s' to dictionary." % str(data)) -def _find(seq, val): - """ - Search sequence 'seq' for val. This behaves like str.find(): if not found, - -1 is returned instead of throwing an exception. - - Args: - seq: The sequence to search - val: The value to search for - - Returns: - int: The index of the value if found, or -1 if not found - """ - try: - return seq.index(val) - except ValueError: - return -1 - - -def _find_property_in_seq(seq, search_key, search_value): - """ - Helper for find_property_index(): search for the property in all elements - of the given sequence. - - Args: - seq: The sequence - search_key: Property name to find - search_value: Property value to find - - Returns: - int: A property index, or -1 if the property was not found - """ - idx = -1 - for elem in seq: - idx = find_property_index(elem, search_key, search_value) - if idx >= 0: - break - - return idx - - -def find_property_index(obj, search_key, search_value): - """ - Search (recursively) for the given key and value in the given object. - Return an index for the key, relative to whatever object it's found in. - - Args: - obj: The object to search (list, dict, or stix object) - search_key: A search key - search_value: A search value - - Returns: - int: An index; -1 if the key and value aren't found - """ - # Special-case keys which are numbers-as-strings, e.g. for cyber-observable - # mappings. Use the int value of the key as the index. - if search_key.isdigit(): - return int(search_key) - - if isinstance(obj, stix2.base._STIXBase): - if search_key in obj and obj[search_key] == search_value: - idx = _find(obj.object_properties(), search_key) - else: - idx = _find_property_in_seq(obj.values(), search_key, search_value) - elif isinstance(obj, dict): - if search_key in obj and obj[search_key] == search_value: - idx = _find(sorted(obj), search_key) - else: - idx = _find_property_in_seq(obj.values(), search_key, search_value) - elif isinstance(obj, list): - idx = _find_property_in_seq(obj, search_key, search_value) - else: - # Don't know how to search this type - idx = -1 - - return idx - - def get_class_hierarchy_names(obj): """Given an object, return the names of the class hierarchy.""" names = []