commit
2743b90fc0
|
@ -17,7 +17,8 @@ from .exceptions import (
|
||||||
from .markings import _MarkingsMixin
|
from .markings import _MarkingsMixin
|
||||||
from .markings.utils import validate
|
from .markings.utils import validate
|
||||||
from .serialization import (
|
from .serialization import (
|
||||||
STIXJSONEncoder, STIXJSONIncludeOptionalDefaultsEncoder, serialize,
|
STIXJSONEncoder, STIXJSONIncludeOptionalDefaultsEncoder, fp_serialize,
|
||||||
|
serialize,
|
||||||
)
|
)
|
||||||
from .utils import NOW, PREFIX_21_REGEX, get_timestamp
|
from .utils import NOW, PREFIX_21_REGEX, get_timestamp
|
||||||
from .versioning import new_version as _new_version
|
from .versioning import new_version as _new_version
|
||||||
|
@ -260,6 +261,35 @@ class _STIXBase(Mapping):
|
||||||
"""
|
"""
|
||||||
return serialize(self, *args, **kwargs)
|
return serialize(self, *args, **kwargs)
|
||||||
|
|
||||||
|
def fp_serialize(self, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
Serialize a STIX object to ``fp`` (a text stream file-like supporting object).
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
>>> import stix2
|
||||||
|
>>> identity = stix2.Identity(name='Example Corp.', identity_class='organization')
|
||||||
|
>>> print(identity.serialize(sort_keys=True))
|
||||||
|
{"created": "2018-06-08T19:03:54.066Z", ... "name": "Example Corp.", "type": "identity"}
|
||||||
|
>>> print(identity.serialize(sort_keys=True, indent=4))
|
||||||
|
{
|
||||||
|
"created": "2018-06-08T19:03:54.066Z",
|
||||||
|
"id": "identity--d7f3e25a-ba1c-447a-ab71-6434b092b05e",
|
||||||
|
"identity_class": "organization",
|
||||||
|
"modified": "2018-06-08T19:03:54.066Z",
|
||||||
|
"name": "Example Corp.",
|
||||||
|
"type": "identity"
|
||||||
|
}
|
||||||
|
>>> with open("example.json", mode="w", encoding="utf-8") as f:
|
||||||
|
>>> identity.fp_serialize(f, pretty=True)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
None
|
||||||
|
|
||||||
|
See Also:
|
||||||
|
``stix2.serialization.fp_serialize`` for options.
|
||||||
|
"""
|
||||||
|
fp_serialize(self, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
class _DomainObject(_STIXBase, _MarkingsMixin):
|
class _DomainObject(_STIXBase, _MarkingsMixin):
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -13,7 +13,7 @@ from stix2.datastore import (
|
||||||
)
|
)
|
||||||
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
|
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
|
||||||
from stix2.parsing import parse
|
from stix2.parsing import parse
|
||||||
from stix2.serialization import serialize
|
from stix2.serialization import fp_serialize
|
||||||
from stix2.utils import format_datetime, get_type_from_id, parse_into_datetime
|
from stix2.utils import format_datetime, get_type_from_id, parse_into_datetime
|
||||||
|
|
||||||
|
|
||||||
|
@ -584,9 +584,8 @@ class FileSystemSink(DataSink):
|
||||||
if os.path.isfile(file_path):
|
if os.path.isfile(file_path):
|
||||||
raise DataSourceError("Attempted to overwrite file (!) at: {}".format(file_path))
|
raise DataSourceError("Attempted to overwrite file (!) at: {}".format(file_path))
|
||||||
|
|
||||||
with io.open(file_path, 'w', encoding=encoding) as f:
|
with io.open(file_path, mode='w', encoding=encoding) as f:
|
||||||
stix_obj = serialize(stix_obj, pretty=True, encoding=encoding, ensure_ascii=False)
|
fp_serialize(stix_obj, f, pretty=True, encoding=encoding, ensure_ascii=False)
|
||||||
f.write(stix_obj)
|
|
||||||
|
|
||||||
def add(self, stix_data=None, version=None):
|
def add(self, stix_data=None, version=None):
|
||||||
"""Add STIX objects to file directory.
|
"""Add STIX objects to file directory.
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
import copy
|
import copy
|
||||||
import datetime as dt
|
import datetime as dt
|
||||||
|
import io
|
||||||
|
|
||||||
import simplejson as json
|
import simplejson as json
|
||||||
|
|
||||||
|
@ -64,6 +65,37 @@ def serialize(obj, pretty=False, include_optional_defaults=False, **kwargs):
|
||||||
Returns:
|
Returns:
|
||||||
str: The serialized JSON object.
|
str: The serialized JSON object.
|
||||||
|
|
||||||
|
Note:
|
||||||
|
The argument ``pretty=True`` will output the STIX object following
|
||||||
|
spec order. Using this argument greatly impacts object serialization
|
||||||
|
performance. If your use case is centered across machine-to-machine
|
||||||
|
operation it is recommended to set ``pretty=False``.
|
||||||
|
|
||||||
|
When ``pretty=True`` the following key-value pairs will be added or
|
||||||
|
overridden: indent=4, separators=(",", ": "), item_sort_key=sort_by.
|
||||||
|
"""
|
||||||
|
with io.StringIO() as fp:
|
||||||
|
fp_serialize(obj, fp, pretty, include_optional_defaults, **kwargs)
|
||||||
|
return fp.getvalue()
|
||||||
|
|
||||||
|
|
||||||
|
def fp_serialize(obj, fp, pretty=False, include_optional_defaults=False, **kwargs):
|
||||||
|
"""
|
||||||
|
Serialize a STIX object to ``fp`` (a text stream file-like supporting object).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
obj: The STIX object to be serialized.
|
||||||
|
fp: A text stream file-like object supporting ``.write()``.
|
||||||
|
pretty (bool): If True, output properties following the STIX specs
|
||||||
|
formatting. This includes indentation. Refer to notes for more
|
||||||
|
details. (Default: ``False``)
|
||||||
|
include_optional_defaults (bool): Determines whether to include
|
||||||
|
optional properties set to the default value defined in the spec.
|
||||||
|
**kwargs: The arguments for a json.dumps() call.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
None
|
||||||
|
|
||||||
Note:
|
Note:
|
||||||
The argument ``pretty=True`` will output the STIX object following
|
The argument ``pretty=True`` will output the STIX object following
|
||||||
spec order. Using this argument greatly impacts object serialization
|
spec order. Using this argument greatly impacts object serialization
|
||||||
|
@ -80,9 +112,9 @@ def serialize(obj, pretty=False, include_optional_defaults=False, **kwargs):
|
||||||
kwargs.update({'indent': 4, 'separators': (',', ': '), 'item_sort_key': sort_by})
|
kwargs.update({'indent': 4, 'separators': (',', ': '), 'item_sort_key': sort_by})
|
||||||
|
|
||||||
if include_optional_defaults:
|
if include_optional_defaults:
|
||||||
return json.dumps(obj, cls=STIXJSONIncludeOptionalDefaultsEncoder, **kwargs)
|
json.dump(obj, fp, cls=STIXJSONIncludeOptionalDefaultsEncoder, **kwargs)
|
||||||
else:
|
else:
|
||||||
return json.dumps(obj, cls=STIXJSONEncoder, **kwargs)
|
json.dump(obj, fp, cls=STIXJSONEncoder, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
def _find(seq, val):
|
def _find(seq, val):
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import io
|
||||||
import json
|
import json
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
@ -113,6 +114,27 @@ def test_bundle_id_must_start_with_bundle():
|
||||||
assert str(excinfo.value) == "Invalid value for Bundle 'id': must start with 'bundle--'."
|
assert str(excinfo.value) == "Invalid value for Bundle 'id': must start with 'bundle--'."
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_bundle_fp_serialize_pretty(indicator, malware, relationship):
|
||||||
|
bundle = stix2.v20.Bundle(objects=[indicator, malware, relationship])
|
||||||
|
buffer = io.StringIO()
|
||||||
|
|
||||||
|
bundle.fp_serialize(buffer, pretty=True)
|
||||||
|
|
||||||
|
assert str(bundle) == EXPECTED_BUNDLE
|
||||||
|
assert bundle.serialize(pretty=True) == EXPECTED_BUNDLE
|
||||||
|
assert buffer.getvalue() == EXPECTED_BUNDLE
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_bundle_fp_serialize_nonpretty(indicator, malware, relationship):
|
||||||
|
bundle = stix2.v20.Bundle(objects=[indicator, malware, relationship])
|
||||||
|
buffer = io.StringIO()
|
||||||
|
|
||||||
|
bundle.fp_serialize(buffer, sort_keys=True)
|
||||||
|
|
||||||
|
assert bundle.serialize(sort_keys=True) == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
|
||||||
|
assert buffer.getvalue() == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
|
||||||
|
|
||||||
|
|
||||||
def test_create_bundle1(indicator, malware, relationship):
|
def test_create_bundle1(indicator, malware, relationship):
|
||||||
bundle = stix2.v20.Bundle(objects=[indicator, malware, relationship])
|
bundle = stix2.v20.Bundle(objects=[indicator, malware, relationship])
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import io
|
||||||
import json
|
import json
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
@ -123,6 +124,27 @@ def test_bundle_id_must_start_with_bundle():
|
||||||
assert str(excinfo.value) == "Invalid value for Bundle 'id': must start with 'bundle--'."
|
assert str(excinfo.value) == "Invalid value for Bundle 'id': must start with 'bundle--'."
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_bundle_fp_serialize_pretty(indicator, malware, relationship):
|
||||||
|
bundle = stix2.v21.Bundle(objects=[indicator, malware, relationship])
|
||||||
|
buffer = io.StringIO()
|
||||||
|
|
||||||
|
bundle.fp_serialize(buffer, pretty=True)
|
||||||
|
|
||||||
|
assert str(bundle) == EXPECTED_BUNDLE
|
||||||
|
assert bundle.serialize(pretty=True) == EXPECTED_BUNDLE
|
||||||
|
assert buffer.getvalue() == EXPECTED_BUNDLE
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_bundle_fp_serialize_nonpretty(indicator, malware, relationship):
|
||||||
|
bundle = stix2.v21.Bundle(objects=[indicator, malware, relationship])
|
||||||
|
buffer = io.StringIO()
|
||||||
|
|
||||||
|
bundle.fp_serialize(buffer, sort_keys=True)
|
||||||
|
|
||||||
|
assert bundle.serialize(sort_keys=True) == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
|
||||||
|
assert buffer.getvalue() == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
|
||||||
|
|
||||||
|
|
||||||
def test_create_bundle1(indicator, malware, relationship):
|
def test_create_bundle1(indicator, malware, relationship):
|
||||||
bundle = stix2.v21.Bundle(objects=[indicator, malware, relationship])
|
bundle = stix2.v21.Bundle(objects=[indicator, malware, relationship])
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue