Merge pull request #599 from PedroHenriqueFernandes/add-pretty-parameter

feat: add 'pretty' parameter to optimize JSON serialization performance
main
Alexandre Dulaunoy 2024-10-15 11:43:40 +02:00 committed by GitHub
commit 68ba448a9e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 45 additions and 8 deletions

View File

@ -554,7 +554,7 @@ class FileSystemSink(DataSink):
def stix_dir(self):
return self._stix_dir
def _check_path_and_write(self, stix_obj, encoding='utf-8'):
def _check_path_and_write(self, stix_obj, encoding='utf-8', pretty=True):
"""Write the given STIX object to a file in the STIX file directory.
"""
type_dir = os.path.join(self._stix_dir, stix_obj["type"])
@ -585,9 +585,9 @@ class FileSystemSink(DataSink):
raise DataSourceError("Attempted to overwrite file (!) at: {}".format(file_path))
with io.open(file_path, mode='w', encoding=encoding) as f:
fp_serialize(stix_obj, f, pretty=True, encoding=encoding, ensure_ascii=False)
fp_serialize(stix_obj, f, pretty=pretty, encoding=encoding, ensure_ascii=False)
def add(self, stix_data=None, version=None):
def add(self, stix_data=None, version=None, pretty=True):
"""Add STIX objects to file directory.
Args:
@ -597,6 +597,7 @@ class FileSystemSink(DataSink):
version (str): If present, it forces the parser to use the version
provided. Otherwise, the library will make the best effort based
on checking the "spec_version" property.
pretty (bool): If True, the resulting JSON will be "pretty printed"
Note:
``stix_data`` can be a Bundle object, but each object in it will be
@ -607,24 +608,24 @@ class FileSystemSink(DataSink):
if isinstance(stix_data, (v20.Bundle, v21.Bundle)):
# recursively add individual STIX objects
for stix_obj in stix_data.get("objects", []):
self.add(stix_obj, version=version)
self.add(stix_obj, version=version, pretty=pretty)
elif isinstance(stix_data, _STIXBase):
# adding python STIX object
self._check_path_and_write(stix_data)
self._check_path_and_write(stix_data, pretty=pretty)
elif isinstance(stix_data, (str, dict)):
parsed_data = parse(stix_data, allow_custom=self.allow_custom, version=version)
if isinstance(parsed_data, _STIXBase):
self.add(parsed_data, version=version)
self.add(parsed_data, version=version, pretty=pretty)
else:
# custom unregistered object type
self._check_path_and_write(parsed_data)
self._check_path_and_write(parsed_data, pretty=pretty)
elif isinstance(stix_data, list):
# recursively add individual STIX objects
for stix_obj in stix_data:
self.add(stix_obj)
self.add(stix_obj, version=version, pretty=pretty)
else:
raise TypeError(

View File

@ -151,6 +151,42 @@ def test_filesystem_source_bad_stix_file(fs_source, bad_stix_files):
except STIXError as e:
assert "Can't parse object with no 'type' property" in str(e)
def test_filesystem_sink_add_pretty_true(fs_sink, fs_source):
"""Test adding a STIX object with pretty=True."""
camp1 = stix2.v21.Campaign(
name="Hannibal",
objective="Targeting Italian and Spanish Diplomat internet accounts",
aliases=["War Elephant"],
)
fs_sink.add(camp1, pretty=True)
filepath = os.path.join(
FS_PATH, "campaign", camp1.id, _timestamp2filename(camp1.modified) + ".json",
)
assert os.path.exists(filepath)
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
assert '\n' in content # Check for pretty-printed output
os.remove(filepath)
def test_filesystem_sink_add_pretty_false(fs_sink, fs_source):
"""Test adding a STIX object with pretty=False."""
camp1 = stix2.v21.Campaign(
name="Hannibal",
objective="Targeting Italian and Spanish Diplomat internet accounts",
aliases=["War Elephant"],
)
fs_sink.add(camp1, pretty=False)
filepath = os.path.join(
FS_PATH, "campaign", camp1.id, _timestamp2filename(camp1.modified) + ".json",
)
assert os.path.exists(filepath)
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
assert '\n' not in content # Check for non-pretty-printed output
os.remove(filepath)
def test_filesystem_source_get_object(fs_source):
# get (latest) object