Fix bug when adding custom obj to FileSystemSink

... if the object type hasn't been registered.

Related: #439.
pull/1/head
Chris Lenk 2020-07-20 00:04:32 -04:00
parent c49703803c
commit 55ea84ece2
4 changed files with 96 additions and 21 deletions

View File

@ -30,7 +30,7 @@ except ImportError:
from collections import Mapping
__all__ = ['STIXJSONEncoder', '_STIXBase']
__all__ = ['STIXJSONEncoder', '_STIXBase', 'serialize']
DEFAULT_ERROR = "{type} must have {property}='{expected}'."
SCO_DET_ID_NAMESPACE = uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7")
@ -74,6 +74,43 @@ class STIXJSONIncludeOptionalDefaultsEncoder(json.JSONEncoder):
return super(STIXJSONIncludeOptionalDefaultsEncoder, self).default(obj)
def serialize(obj, pretty=False, include_optional_defaults=False, **kwargs):
"""
Serialize a STIX object.
Args:
obj: The STIX object to be serialized.
pretty (bool): If True, output properties following the STIX specs
formatting. This includes indentation. Refer to notes for more
details. (Default: ``False``)
include_optional_defaults (bool): Determines whether to include
optional properties set to the default value defined in the spec.
**kwargs: The arguments for a json.dumps() call.
Returns:
str: The serialized JSON object.
Note:
The argument ``pretty=True`` will output the STIX object following
spec order. Using this argument greatly impacts object serialization
performance. If your use case is centered across machine-to-machine
operation it is recommended to set ``pretty=False``.
When ``pretty=True`` the following key-value pairs will be added or
overridden: indent=4, separators=(",", ": "), item_sort_key=sort_by.
"""
if pretty:
def sort_by(element):
return find_property_index(obj, *element)
kwargs.update({'indent': 4, 'separators': (',', ': '), 'item_sort_key': sort_by})
if include_optional_defaults:
return json.dumps(obj, cls=STIXJSONIncludeOptionalDefaultsEncoder, **kwargs)
else:
return json.dumps(obj, cls=STIXJSONEncoder, **kwargs)
def get_required_properties(properties):
return (k for k, v in properties.items() if v.required)
@ -270,7 +307,7 @@ class _STIXBase(Mapping):
def revoke(self):
return _revoke(self)
def serialize(self, pretty=False, include_optional_defaults=False, **kwargs):
def serialize(self, *args, **kwargs):
"""
Serialize a STIX object.
@ -309,16 +346,7 @@ class _STIXBase(Mapping):
When ``pretty=True`` the following key-value pairs will be added or
overridden: indent=4, separators=(",", ": "), item_sort_key=sort_by.
"""
if pretty:
def sort_by(element):
return find_property_index(self, *element)
kwargs.update({'indent': 4, 'separators': (',', ': '), 'item_sort_key': sort_by})
if include_optional_defaults:
return json.dumps(self, cls=STIXJSONIncludeOptionalDefaultsEncoder, **kwargs)
else:
return json.dumps(self, cls=STIXJSONEncoder, **kwargs)
return serialize(self, *args, **kwargs)
class _DomainObject(_STIXBase, _MarkingsMixin):

View File

@ -9,13 +9,13 @@ import stat
import six
from stix2 import v20, v21
from stix2.base import _STIXBase
from stix2.base import _STIXBase, serialize
from stix2.datastore import (
DataSink, DataSource, DataSourceError, DataStoreMixin,
)
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
from stix2.parsing import parse
from stix2.utils import format_datetime, get_type_from_id
from stix2.utils import format_datetime, get_type_from_id, parse_into_datetime
def _timestamp2filename(timestamp):
@ -24,10 +24,12 @@ def _timestamp2filename(timestamp):
"modified" property value. This should not include an extension.
Args:
timestamp: A timestamp, as a datetime.datetime object.
timestamp: A timestamp, as a datetime.datetime object or string.
"""
# The format_datetime will determine the correct level of precision.
if isinstance(timestamp, str):
timestamp = parse_into_datetime(timestamp)
ts = format_datetime(timestamp)
ts = re.sub(r"[-T:\.Z ]", "", ts)
return ts
@ -582,9 +584,9 @@ class FileSystemSink(DataSink):
if os.path.isfile(file_path):
raise DataSourceError("Attempted to overwrite file (!) at: {}".format(file_path))
else:
with io.open(file_path, 'w', encoding=encoding) as f:
stix_obj = stix_obj.serialize(pretty=True, encoding=encoding, ensure_ascii=False)
stix_obj = serialize(stix_obj, pretty=True, encoding=encoding, ensure_ascii=False)
f.write(stix_obj)
def add(self, stix_data=None, version=None):
@ -614,8 +616,12 @@ class FileSystemSink(DataSink):
self._check_path_and_write(stix_data)
elif isinstance(stix_data, (str, dict)):
stix_data = parse(stix_data, allow_custom=self.allow_custom, version=version)
self.add(stix_data, version=version)
parsed_data = parse(stix_data, allow_custom=self.allow_custom, version=version)
if isinstance(parsed_data, _STIXBase):
self.add(parsed_data, version=version)
else:
# custom unregistered object type
self._check_path_and_write(parsed_data)
elif isinstance(stix_data, list):
# recursively add individual STIX objects

View File

@ -633,6 +633,26 @@ def test_filesystem_object_with_custom_property_in_bundle(fs_store):
assert camp_r.x_empire == camp.x_empire
def test_filesystem_custom_object_dict(fs_store):
fs_store.sink.allow_custom = True
newobj = {
"type": "x-new-obj-2",
"id": "x-new-obj-2--d08dc866-6149-47db-aae6-7b58a827e7f0",
"created": "2020-07-20T03:45:02.879Z",
"modified": "2020-07-20T03:45:02.879Z",
"property1": "something",
}
fs_store.add(newobj)
newobj_r = fs_store.get(newobj["id"])
assert newobj_r["id"] == newobj["id"]
assert newobj_r["property1"] == 'something'
# remove dir
shutil.rmtree(os.path.join(FS_PATH, "x-new-obj-2"), True)
fs_store.sink.allow_custom = False
def test_filesystem_custom_object(fs_store):
@stix2.v20.CustomObject(
'x-new-obj-2', [

View File

@ -654,6 +654,27 @@ def test_filesystem_object_with_custom_property_in_bundle(fs_store):
assert camp_r.x_empire == camp.x_empire
def test_filesystem_custom_object_dict(fs_store):
fs_store.sink.allow_custom = True
newobj = {
"type": "x-new-obj-2",
"id": "x-new-obj-2--d08dc866-6149-47db-aae6-7b58a827e7f0",
"spec_version": "2.1",
"created": "2020-07-20T03:45:02.879Z",
"modified": "2020-07-20T03:45:02.879Z",
"property1": "something",
}
fs_store.add(newobj)
newobj_r = fs_store.get(newobj["id"])
assert newobj_r["id"] == newobj["id"]
assert newobj_r["property1"] == 'something'
# remove dir
shutil.rmtree(os.path.join(FS_PATH, "x-new-obj-2"), True)
fs_store.sink.allow_custom = False
def test_filesystem_custom_object(fs_store):
@stix2.v21.CustomObject(
'x-new-obj-2', [