provide ability to stream STIX output to fp ``.write()`` file-like object

pull/1/head
Emmanuelle Vargas-Gonzalez 2021-03-17 15:01:49 -04:00
parent f155e3e571
commit 827f622c04
4 changed files with 113 additions and 1 deletions

View File

@ -17,7 +17,8 @@ from .exceptions import (
from .markings import _MarkingsMixin
from .markings.utils import validate
from .serialization import (
STIXJSONEncoder, STIXJSONIncludeOptionalDefaultsEncoder, serialize,
STIXJSONEncoder, STIXJSONIncludeOptionalDefaultsEncoder, fp_serialize,
serialize,
)
from .utils import NOW, PREFIX_21_REGEX, get_timestamp
from .versioning import new_version as _new_version
@ -260,6 +261,35 @@ class _STIXBase(Mapping):
"""
return serialize(self, *args, **kwargs)
def fp_serialize(self, *args, **kwargs):
"""
Serialize a STIX object to a file-like supporting object.
Examples:
>>> import stix2
>>> identity = stix2.Identity(name='Example Corp.', identity_class='organization')
>>> print(identity.serialize(sort_keys=True))
{"created": "2018-06-08T19:03:54.066Z", ... "name": "Example Corp.", "type": "identity"}
>>> print(identity.serialize(sort_keys=True, indent=4))
{
"created": "2018-06-08T19:03:54.066Z",
"id": "identity--d7f3e25a-ba1c-447a-ab71-6434b092b05e",
"identity_class": "organization",
"modified": "2018-06-08T19:03:54.066Z",
"name": "Example Corp.",
"type": "identity"
}
>>> with open("example.json", mode="w", encoding="utf-8") as f:
>>> identity.fp_serialize(f, pretty=True)
Returns:
None
See Also:
``stix2.serialization.fp_serialize`` for options.
"""
fp_serialize(self, *args, **kwargs)
class _DomainObject(_STIXBase, _MarkingsMixin):
pass

View File

@ -85,6 +85,44 @@ def serialize(obj, pretty=False, include_optional_defaults=False, **kwargs):
return json.dumps(obj, cls=STIXJSONEncoder, **kwargs)
def fp_serialize(obj, fp, pretty=False, include_optional_defaults=False, **kwargs):
"""
Serialize a STIX object as a stream to file-like supporting object.
Args:
obj: The STIX object to be serialized.
fp: A ``.write()``-supporting file-like object.
pretty (bool): If True, output properties following the STIX specs
formatting. This includes indentation. Refer to notes for more
details. (Default: ``False``)
include_optional_defaults (bool): Determines whether to include
optional properties set to the default value defined in the spec.
**kwargs: The arguments for a json.dumps() call.
Returns:
None
Note:
The argument ``pretty=True`` will output the STIX object following
spec order. Using this argument greatly impacts object serialization
performance. If your use case is centered across machine-to-machine
operation it is recommended to set ``pretty=False``.
When ``pretty=True`` the following key-value pairs will be added or
overridden: indent=4, separators=(",", ": "), item_sort_key=sort_by.
"""
if pretty:
def sort_by(element):
return find_property_index(obj, *element)
kwargs.update({'indent': 4, 'separators': (',', ': '), 'item_sort_key': sort_by})
if include_optional_defaults:
json.dump(obj, fp, cls=STIXJSONIncludeOptionalDefaultsEncoder, **kwargs)
else:
json.dump(obj, fp, cls=STIXJSONEncoder, **kwargs)
def _find(seq, val):
"""
Search sequence 'seq' for val. This behaves like str.find(): if not found,

View File

@ -1,3 +1,4 @@
import io
import json
import pytest
@ -113,6 +114,27 @@ def test_bundle_id_must_start_with_bundle():
assert str(excinfo.value) == "Invalid value for Bundle 'id': must start with 'bundle--'."
def test_create_bundle_fp_serialize_true(indicator, malware, relationship):
bundle = stix2.v20.Bundle(objects=[indicator, malware, relationship])
buffer = io.StringIO()
bundle.fp_serialize(buffer, pretty=True)
assert str(bundle) == EXPECTED_BUNDLE
assert bundle.serialize(pretty=True) == EXPECTED_BUNDLE
assert buffer.getvalue() == EXPECTED_BUNDLE
def test_create_bundle_fp_serialize_false(indicator, malware, relationship):
bundle = stix2.v20.Bundle(objects=[indicator, malware, relationship])
buffer = io.StringIO()
bundle.fp_serialize(buffer, sort_keys=True)
assert bundle.serialize(sort_keys=True) == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
assert buffer.getvalue() == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
def test_create_bundle1(indicator, malware, relationship):
bundle = stix2.v20.Bundle(objects=[indicator, malware, relationship])

View File

@ -1,3 +1,4 @@
import io
import json
import pytest
@ -123,6 +124,27 @@ def test_bundle_id_must_start_with_bundle():
assert str(excinfo.value) == "Invalid value for Bundle 'id': must start with 'bundle--'."
def test_create_bundle_fp_serialize_true(indicator, malware, relationship):
bundle = stix2.v21.Bundle(objects=[indicator, malware, relationship])
buffer = io.StringIO()
bundle.fp_serialize(buffer, pretty=True)
assert str(bundle) == EXPECTED_BUNDLE
assert bundle.serialize(pretty=True) == EXPECTED_BUNDLE
assert buffer.getvalue() == EXPECTED_BUNDLE
def test_create_bundle_fp_serialize_false(indicator, malware, relationship):
bundle = stix2.v21.Bundle(objects=[indicator, malware, relationship])
buffer = io.StringIO()
bundle.fp_serialize(buffer, sort_keys=True)
assert bundle.serialize(sort_keys=True) == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
assert buffer.getvalue() == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
def test_create_bundle1(indicator, malware, relationship):
bundle = stix2.v21.Bundle(objects=[indicator, malware, relationship])