Merge pull request #431 from oasis-open/filesys-write-custom

Fix bug when adding custom object to FileSystemSink if the object type hasn't been registered
pull/1/head
Emmanuelle Vargas-Gonzalez 2020-07-27 09:43:38 -04:00 committed by GitHub
commit 8cdbfed5e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 240 additions and 159 deletions

View File

@ -0,0 +1,5 @@
serialization
================
.. automodule:: stix2.serialization
:members:

View File

@ -12,6 +12,7 @@
pattern_visitor pattern_visitor
patterns patterns
properties properties
serialization
utils utils
v20 v20
v21 v21

View File

@ -1,7 +1,6 @@
"""Base classes for type definitions in the STIX2 library.""" """Base classes for type definitions in the STIX2 library."""
import copy import copy
import datetime as dt
import re import re
import uuid import uuid
@ -18,9 +17,10 @@ from .exceptions import (
) )
from .markings import _MarkingsMixin from .markings import _MarkingsMixin
from .markings.utils import validate from .markings.utils import validate
from .utils import ( from .serialization import (
NOW, PREFIX_21_REGEX, find_property_index, format_datetime, get_timestamp, STIXJSONEncoder, STIXJSONIncludeOptionalDefaultsEncoder, serialize,
) )
from .utils import NOW, PREFIX_21_REGEX, get_timestamp
from .versioning import new_version as _new_version from .versioning import new_version as _new_version
from .versioning import revoke as _revoke from .versioning import revoke as _revoke
@ -29,51 +29,14 @@ try:
except ImportError: except ImportError:
from collections import Mapping from collections import Mapping
# TODO: Remove STIXJSONEncoder, STIXJSONIncludeOptionalDefaultsEncoder, serialize from __all__ on next major release.
__all__ = ['STIXJSONEncoder', '_STIXBase'] # Kept for backwards compatibility.
__all__ = ['STIXJSONEncoder', 'STIXJSONIncludeOptionalDefaultsEncoder', '_STIXBase', 'serialize']
DEFAULT_ERROR = "{type} must have {property}='{expected}'." DEFAULT_ERROR = "{type} must have {property}='{expected}'."
SCO_DET_ID_NAMESPACE = uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7") SCO_DET_ID_NAMESPACE = uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7")
class STIXJSONEncoder(json.JSONEncoder):
"""Custom JSONEncoder subclass for serializing Python ``stix2`` objects.
If an optional property with a default value specified in the STIX 2 spec
is set to that default value, it will be left out of the serialized output.
An example of this type of property include the ``revoked`` common property.
"""
def default(self, obj):
if isinstance(obj, (dt.date, dt.datetime)):
return format_datetime(obj)
elif isinstance(obj, _STIXBase):
tmp_obj = dict(copy.deepcopy(obj))
for prop_name in obj._defaulted_optional_properties:
del tmp_obj[prop_name]
return tmp_obj
else:
return super(STIXJSONEncoder, self).default(obj)
class STIXJSONIncludeOptionalDefaultsEncoder(json.JSONEncoder):
"""Custom JSONEncoder subclass for serializing Python ``stix2`` objects.
Differs from ``STIXJSONEncoder`` in that if an optional property with a default
value specified in the STIX 2 spec is set to that default value, it will be
included in the serialized output.
"""
def default(self, obj):
if isinstance(obj, (dt.date, dt.datetime)):
return format_datetime(obj)
elif isinstance(obj, _STIXBase):
return dict(obj)
else:
return super(STIXJSONIncludeOptionalDefaultsEncoder, self).default(obj)
def get_required_properties(properties): def get_required_properties(properties):
return (k for k, v in properties.items() if v.required) return (k for k, v in properties.items() if v.required)
@ -270,18 +233,10 @@ class _STIXBase(Mapping):
def revoke(self): def revoke(self):
return _revoke(self) return _revoke(self)
def serialize(self, pretty=False, include_optional_defaults=False, **kwargs): def serialize(self, *args, **kwargs):
""" """
Serialize a STIX object. Serialize a STIX object.
Args:
pretty (bool): If True, output properties following the STIX specs
formatting. This includes indentation. Refer to notes for more
details. (Default: ``False``)
include_optional_defaults (bool): Determines whether to include
optional properties set to the default value defined in the spec.
**kwargs: The arguments for a json.dumps() call.
Examples: Examples:
>>> import stix2 >>> import stix2
>>> identity = stix2.Identity(name='Example Corp.', identity_class='organization') >>> identity = stix2.Identity(name='Example Corp.', identity_class='organization')
@ -300,25 +255,10 @@ class _STIXBase(Mapping):
Returns: Returns:
str: The serialized JSON object. str: The serialized JSON object.
Note: See Also:
The argument ``pretty=True`` will output the STIX object following ``stix2.serialization.serialize`` for options.
spec order. Using this argument greatly impacts object serialization
performance. If your use case is centered across machine-to-machine
operation it is recommended to set ``pretty=False``.
When ``pretty=True`` the following key-value pairs will be added or
overridden: indent=4, separators=(",", ": "), item_sort_key=sort_by.
""" """
if pretty: return serialize(self, *args, **kwargs)
def sort_by(element):
return find_property_index(self, *element)
kwargs.update({'indent': 4, 'separators': (',', ': '), 'item_sort_key': sort_by})
if include_optional_defaults:
return json.dumps(self, cls=STIXJSONIncludeOptionalDefaultsEncoder, **kwargs)
else:
return json.dumps(self, cls=STIXJSONEncoder, **kwargs)
class _DomainObject(_STIXBase, _MarkingsMixin): class _DomainObject(_STIXBase, _MarkingsMixin):

View File

@ -15,7 +15,8 @@ from stix2.datastore import (
) )
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
from stix2.parsing import parse from stix2.parsing import parse
from stix2.utils import format_datetime, get_type_from_id from stix2.serialization import serialize
from stix2.utils import format_datetime, get_type_from_id, parse_into_datetime
def _timestamp2filename(timestamp): def _timestamp2filename(timestamp):
@ -24,10 +25,12 @@ def _timestamp2filename(timestamp):
"modified" property value. This should not include an extension. "modified" property value. This should not include an extension.
Args: Args:
timestamp: A timestamp, as a datetime.datetime object. timestamp: A timestamp, as a datetime.datetime object or string.
""" """
# The format_datetime will determine the correct level of precision. # The format_datetime will determine the correct level of precision.
if isinstance(timestamp, str):
timestamp = parse_into_datetime(timestamp)
ts = format_datetime(timestamp) ts = format_datetime(timestamp)
ts = re.sub(r"[-T:\.Z ]", "", ts) ts = re.sub(r"[-T:\.Z ]", "", ts)
return ts return ts
@ -582,10 +585,10 @@ class FileSystemSink(DataSink):
if os.path.isfile(file_path): if os.path.isfile(file_path):
raise DataSourceError("Attempted to overwrite file (!) at: {}".format(file_path)) raise DataSourceError("Attempted to overwrite file (!) at: {}".format(file_path))
else:
with io.open(file_path, 'w', encoding=encoding) as f: with io.open(file_path, 'w', encoding=encoding) as f:
stix_obj = stix_obj.serialize(pretty=True, encoding=encoding, ensure_ascii=False) stix_obj = serialize(stix_obj, pretty=True, encoding=encoding, ensure_ascii=False)
f.write(stix_obj) f.write(stix_obj)
def add(self, stix_data=None, version=None): def add(self, stix_data=None, version=None):
"""Add STIX objects to file directory. """Add STIX objects to file directory.
@ -614,8 +617,12 @@ class FileSystemSink(DataSink):
self._check_path_and_write(stix_data) self._check_path_and_write(stix_data)
elif isinstance(stix_data, (str, dict)): elif isinstance(stix_data, (str, dict)):
stix_data = parse(stix_data, allow_custom=self.allow_custom, version=version) parsed_data = parse(stix_data, allow_custom=self.allow_custom, version=version)
self.add(stix_data, version=version) if isinstance(parsed_data, _STIXBase):
self.add(parsed_data, version=version)
else:
# custom unregistered object type
self._check_path_and_write(parsed_data)
elif isinstance(stix_data, list): elif isinstance(stix_data, list):
# recursively add individual STIX objects # recursively add individual STIX objects

162
stix2/serialization.py Normal file
View File

@ -0,0 +1,162 @@
"""STIX2 core serialization methods."""
import copy
import datetime as dt
import simplejson as json
import stix2.base
from .utils import format_datetime
class STIXJSONEncoder(json.JSONEncoder):
"""Custom JSONEncoder subclass for serializing Python ``stix2`` objects.
If an optional property with a default value specified in the STIX 2 spec
is set to that default value, it will be left out of the serialized output.
An example of this type of property include the ``revoked`` common property.
"""
def default(self, obj):
if isinstance(obj, (dt.date, dt.datetime)):
return format_datetime(obj)
elif isinstance(obj, stix2.base._STIXBase):
tmp_obj = dict(copy.deepcopy(obj))
for prop_name in obj._defaulted_optional_properties:
del tmp_obj[prop_name]
return tmp_obj
else:
return super(STIXJSONEncoder, self).default(obj)
class STIXJSONIncludeOptionalDefaultsEncoder(json.JSONEncoder):
"""Custom JSONEncoder subclass for serializing Python ``stix2`` objects.
Differs from ``STIXJSONEncoder`` in that if an optional property with a default
value specified in the STIX 2 spec is set to that default value, it will be
included in the serialized output.
"""
def default(self, obj):
if isinstance(obj, (dt.date, dt.datetime)):
return format_datetime(obj)
elif isinstance(obj, stix2.base._STIXBase):
return dict(obj)
else:
return super(STIXJSONIncludeOptionalDefaultsEncoder, self).default(obj)
def serialize(obj, pretty=False, include_optional_defaults=False, **kwargs):
"""
Serialize a STIX object.
Args:
obj: The STIX object to be serialized.
pretty (bool): If True, output properties following the STIX specs
formatting. This includes indentation. Refer to notes for more
details. (Default: ``False``)
include_optional_defaults (bool): Determines whether to include
optional properties set to the default value defined in the spec.
**kwargs: The arguments for a json.dumps() call.
Returns:
str: The serialized JSON object.
Note:
The argument ``pretty=True`` will output the STIX object following
spec order. Using this argument greatly impacts object serialization
performance. If your use case is centered across machine-to-machine
operation it is recommended to set ``pretty=False``.
When ``pretty=True`` the following key-value pairs will be added or
overridden: indent=4, separators=(",", ": "), item_sort_key=sort_by.
"""
if pretty:
def sort_by(element):
return find_property_index(obj, *element)
kwargs.update({'indent': 4, 'separators': (',', ': '), 'item_sort_key': sort_by})
if include_optional_defaults:
return json.dumps(obj, cls=STIXJSONIncludeOptionalDefaultsEncoder, **kwargs)
else:
return json.dumps(obj, cls=STIXJSONEncoder, **kwargs)
def _find(seq, val):
"""
Search sequence 'seq' for val. This behaves like str.find(): if not found,
-1 is returned instead of throwing an exception.
Args:
seq: The sequence to search
val: The value to search for
Returns:
int: The index of the value if found, or -1 if not found
"""
try:
return seq.index(val)
except ValueError:
return -1
def _find_property_in_seq(seq, search_key, search_value):
"""
Helper for find_property_index(): search for the property in all elements
of the given sequence.
Args:
seq: The sequence
search_key: Property name to find
search_value: Property value to find
Returns:
int: A property index, or -1 if the property was not found
"""
idx = -1
for elem in seq:
idx = find_property_index(elem, search_key, search_value)
if idx >= 0:
break
return idx
def find_property_index(obj, search_key, search_value):
"""
Search (recursively) for the given key and value in the given object.
Return an index for the key, relative to whatever object it's found in.
Args:
obj: The object to search (list, dict, or stix object)
search_key: A search key
search_value: A search value
Returns:
int: An index; -1 if the key and value aren't found
"""
# Special-case keys which are numbers-as-strings, e.g. for cyber-observable
# mappings. Use the int value of the key as the index.
if search_key.isdigit():
return int(search_key)
if isinstance(obj, stix2.base._STIXBase):
if search_key in obj and obj[search_key] == search_value:
idx = _find(obj.object_properties(), search_key)
else:
idx = _find_property_in_seq(obj.values(), search_key, search_value)
elif isinstance(obj, dict):
if search_key in obj and obj[search_key] == search_value:
idx = _find(sorted(obj), search_key)
else:
idx = _find_property_in_seq(obj.values(), search_key, search_value)
elif isinstance(obj, list):
idx = _find_property_in_seq(obj, search_key, search_value)
else:
# Don't know how to search this type
idx = -1
return idx

View File

@ -633,6 +633,26 @@ def test_filesystem_object_with_custom_property_in_bundle(fs_store):
assert camp_r.x_empire == camp.x_empire assert camp_r.x_empire == camp.x_empire
def test_filesystem_custom_object_dict(fs_store):
fs_store.sink.allow_custom = True
newobj = {
"type": "x-new-obj-2",
"id": "x-new-obj-2--d08dc866-6149-47db-aae6-7b58a827e7f0",
"created": "2020-07-20T03:45:02.879Z",
"modified": "2020-07-20T03:45:02.879Z",
"property1": "something",
}
fs_store.add(newobj)
newobj_r = fs_store.get(newobj["id"])
assert newobj_r["id"] == newobj["id"]
assert newobj_r["property1"] == 'something'
# remove dir
shutil.rmtree(os.path.join(FS_PATH, "x-new-obj-2"), True)
fs_store.sink.allow_custom = False
def test_filesystem_custom_object(fs_store): def test_filesystem_custom_object(fs_store):
@stix2.v20.CustomObject( @stix2.v20.CustomObject(
'x-new-obj-2', [ 'x-new-obj-2', [

View File

@ -6,6 +6,7 @@ from io import StringIO
import pytest import pytest
import pytz import pytz
import stix2.serialization
import stix2.utils import stix2.utils
from .constants import IDENTITY_ID from .constants import IDENTITY_ID
@ -198,7 +199,7 @@ def test_deduplicate(stix_objs1):
], ],
) )
def test_find_property_index(object, tuple_to_find, expected_index): def test_find_property_index(object, tuple_to_find, expected_index):
assert stix2.utils.find_property_index( assert stix2.serialization.find_property_index(
object, object,
*tuple_to_find *tuple_to_find
) == expected_index ) == expected_index
@ -235,4 +236,4 @@ def test_find_property_index(object, tuple_to_find, expected_index):
], ],
) )
def test_iterate_over_values(dict_value, tuple_to_find, expected_index): def test_iterate_over_values(dict_value, tuple_to_find, expected_index):
assert stix2.utils._find_property_in_seq(dict_value.values(), *tuple_to_find) == expected_index assert stix2.serialization._find_property_in_seq(dict_value.values(), *tuple_to_find) == expected_index

View File

@ -654,6 +654,27 @@ def test_filesystem_object_with_custom_property_in_bundle(fs_store):
assert camp_r.x_empire == camp.x_empire assert camp_r.x_empire == camp.x_empire
def test_filesystem_custom_object_dict(fs_store):
fs_store.sink.allow_custom = True
newobj = {
"type": "x-new-obj-2",
"id": "x-new-obj-2--d08dc866-6149-47db-aae6-7b58a827e7f0",
"spec_version": "2.1",
"created": "2020-07-20T03:45:02.879Z",
"modified": "2020-07-20T03:45:02.879Z",
"property1": "something",
}
fs_store.add(newobj)
newobj_r = fs_store.get(newobj["id"])
assert newobj_r["id"] == newobj["id"]
assert newobj_r["property1"] == 'something'
# remove dir
shutil.rmtree(os.path.join(FS_PATH, "x-new-obj-2"), True)
fs_store.sink.allow_custom = False
def test_filesystem_custom_object(fs_store): def test_filesystem_custom_object(fs_store):
@stix2.v21.CustomObject( @stix2.v21.CustomObject(
'x-new-obj-2', [ 'x-new-obj-2', [

View File

@ -6,6 +6,7 @@ from io import StringIO
import pytest import pytest
import pytz import pytz
import stix2.serialization
import stix2.utils import stix2.utils
from .constants import IDENTITY_ID from .constants import IDENTITY_ID
@ -201,7 +202,7 @@ def test_deduplicate(stix_objs1):
], ],
) )
def test_find_property_index(object, tuple_to_find, expected_index): def test_find_property_index(object, tuple_to_find, expected_index):
assert stix2.utils.find_property_index( assert stix2.serialization.find_property_index(
object, object,
*tuple_to_find *tuple_to_find
) == expected_index ) == expected_index
@ -238,4 +239,4 @@ def test_find_property_index(object, tuple_to_find, expected_index):
], ],
) )
def test_iterate_over_values(dict_value, tuple_to_find, expected_index): def test_iterate_over_values(dict_value, tuple_to_find, expected_index):
assert stix2.utils._find_property_in_seq(dict_value.values(), *tuple_to_find) == expected_index assert stix2.serialization._find_property_in_seq(dict_value.values(), *tuple_to_find) == expected_index

View File

@ -298,83 +298,6 @@ def _get_dict(data):
raise ValueError("Cannot convert '%s' to dictionary." % str(data)) raise ValueError("Cannot convert '%s' to dictionary." % str(data))
def _find(seq, val):
"""
Search sequence 'seq' for val. This behaves like str.find(): if not found,
-1 is returned instead of throwing an exception.
Args:
seq: The sequence to search
val: The value to search for
Returns:
int: The index of the value if found, or -1 if not found
"""
try:
return seq.index(val)
except ValueError:
return -1
def _find_property_in_seq(seq, search_key, search_value):
"""
Helper for find_property_index(): search for the property in all elements
of the given sequence.
Args:
seq: The sequence
search_key: Property name to find
search_value: Property value to find
Returns:
int: A property index, or -1 if the property was not found
"""
idx = -1
for elem in seq:
idx = find_property_index(elem, search_key, search_value)
if idx >= 0:
break
return idx
def find_property_index(obj, search_key, search_value):
"""
Search (recursively) for the given key and value in the given object.
Return an index for the key, relative to whatever object it's found in.
Args:
obj: The object to search (list, dict, or stix object)
search_key: A search key
search_value: A search value
Returns:
int: An index; -1 if the key and value aren't found
"""
# Special-case keys which are numbers-as-strings, e.g. for cyber-observable
# mappings. Use the int value of the key as the index.
if search_key.isdigit():
return int(search_key)
if isinstance(obj, stix2.base._STIXBase):
if search_key in obj and obj[search_key] == search_value:
idx = _find(obj.object_properties(), search_key)
else:
idx = _find_property_in_seq(obj.values(), search_key, search_value)
elif isinstance(obj, dict):
if search_key in obj and obj[search_key] == search_value:
idx = _find(sorted(obj), search_key)
else:
idx = _find_property_in_seq(obj.values(), search_key, search_value)
elif isinstance(obj, list):
idx = _find_property_in_seq(obj, search_key, search_value)
else:
# Don't know how to search this type
idx = -1
return idx
def get_class_hierarchy_names(obj): def get_class_hierarchy_names(obj):
"""Given an object, return the names of the class hierarchy.""" """Given an object, return the names of the class hierarchy."""
names = [] names = []