feat: add 'pretty' parameter to optimize JSON serialization performance

main
Pedro Henrique Fernandes 2024-07-11 10:58:10 -03:00
parent 50fd81fd6b
commit 280d34531c
2 changed files with 45 additions and 8 deletions

View File

@ -554,7 +554,7 @@ class FileSystemSink(DataSink):
def stix_dir(self): def stix_dir(self):
return self._stix_dir return self._stix_dir
def _check_path_and_write(self, stix_obj, encoding='utf-8'): def _check_path_and_write(self, stix_obj, encoding='utf-8', pretty=True):
"""Write the given STIX object to a file in the STIX file directory. """Write the given STIX object to a file in the STIX file directory.
""" """
type_dir = os.path.join(self._stix_dir, stix_obj["type"]) type_dir = os.path.join(self._stix_dir, stix_obj["type"])
@ -585,9 +585,9 @@ class FileSystemSink(DataSink):
raise DataSourceError("Attempted to overwrite file (!) at: {}".format(file_path)) raise DataSourceError("Attempted to overwrite file (!) at: {}".format(file_path))
with io.open(file_path, mode='w', encoding=encoding) as f: with io.open(file_path, mode='w', encoding=encoding) as f:
fp_serialize(stix_obj, f, pretty=True, encoding=encoding, ensure_ascii=False) fp_serialize(stix_obj, f, pretty=pretty, encoding=encoding, ensure_ascii=False)
def add(self, stix_data=None, version=None): def add(self, stix_data=None, version=None, pretty=True):
"""Add STIX objects to file directory. """Add STIX objects to file directory.
Args: Args:
@ -597,6 +597,7 @@ class FileSystemSink(DataSink):
version (str): If present, it forces the parser to use the version version (str): If present, it forces the parser to use the version
provided. Otherwise, the library will make the best effort based provided. Otherwise, the library will make the best effort based
on checking the "spec_version" property. on checking the "spec_version" property.
pretty (bool): If True, the resulting JSON will be "pretty printed"
Note: Note:
``stix_data`` can be a Bundle object, but each object in it will be ``stix_data`` can be a Bundle object, but each object in it will be
@ -607,24 +608,24 @@ class FileSystemSink(DataSink):
if isinstance(stix_data, (v20.Bundle, v21.Bundle)): if isinstance(stix_data, (v20.Bundle, v21.Bundle)):
# recursively add individual STIX objects # recursively add individual STIX objects
for stix_obj in stix_data.get("objects", []): for stix_obj in stix_data.get("objects", []):
self.add(stix_obj, version=version) self.add(stix_obj, version=version, pretty=pretty)
elif isinstance(stix_data, _STIXBase): elif isinstance(stix_data, _STIXBase):
# adding python STIX object # adding python STIX object
self._check_path_and_write(stix_data) self._check_path_and_write(stix_data, pretty=pretty)
elif isinstance(stix_data, (str, dict)): elif isinstance(stix_data, (str, dict)):
parsed_data = parse(stix_data, allow_custom=self.allow_custom, version=version) parsed_data = parse(stix_data, allow_custom=self.allow_custom, version=version)
if isinstance(parsed_data, _STIXBase): if isinstance(parsed_data, _STIXBase):
self.add(parsed_data, version=version) self.add(parsed_data, version=version, pretty=pretty)
else: else:
# custom unregistered object type # custom unregistered object type
self._check_path_and_write(parsed_data) self._check_path_and_write(parsed_data, pretty=pretty)
elif isinstance(stix_data, list): elif isinstance(stix_data, list):
# recursively add individual STIX objects # recursively add individual STIX objects
for stix_obj in stix_data: for stix_obj in stix_data:
self.add(stix_obj) self.add(stix_obj, version=version, pretty=pretty)
else: else:
raise TypeError( raise TypeError(

View File

@ -151,6 +151,42 @@ def test_filesystem_source_bad_stix_file(fs_source, bad_stix_files):
except STIXError as e: except STIXError as e:
assert "Can't parse object with no 'type' property" in str(e) assert "Can't parse object with no 'type' property" in str(e)
def test_filesystem_sink_add_pretty_true(fs_sink, fs_source):
"""Test adding a STIX object with pretty=True."""
camp1 = stix2.v21.Campaign(
name="Hannibal",
objective="Targeting Italian and Spanish Diplomat internet accounts",
aliases=["War Elephant"],
)
fs_sink.add(camp1, pretty=True)
filepath = os.path.join(
FS_PATH, "campaign", camp1.id, _timestamp2filename(camp1.modified) + ".json",
)
assert os.path.exists(filepath)
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
assert '\n' in content # Check for pretty-printed output
os.remove(filepath)
def test_filesystem_sink_add_pretty_false(fs_sink, fs_source):
"""Test adding a STIX object with pretty=False."""
camp1 = stix2.v21.Campaign(
name="Hannibal",
objective="Targeting Italian and Spanish Diplomat internet accounts",
aliases=["War Elephant"],
)
fs_sink.add(camp1, pretty=False)
filepath = os.path.join(
FS_PATH, "campaign", camp1.id, _timestamp2filename(camp1.modified) + ".json",
)
assert os.path.exists(filepath)
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
assert '\n' not in content # Check for non-pretty-printed output
os.remove(filepath)
def test_filesystem_source_get_object(fs_source): def test_filesystem_source_get_object(fs_source):
# get (latest) object # get (latest) object