Merge branch 'master' of github.com:oasis-open/cti-python-stix2 into main

pull/1/head
chrisr3d 2020-09-09 16:01:44 +02:00
commit 8e19ad0f82
20 changed files with 341 additions and 176 deletions

View File

@ -0,0 +1,5 @@
serialization
================
.. automodule:: stix2.serialization
:members:

View File

@ -52,7 +52,7 @@ setup(
'requests', 'requests',
'simplejson', 'simplejson',
'six>=1.13.0', 'six>=1.13.0',
'stix2-patterns', 'stix2-patterns>=1.2.0',
], ],
project_urls={ project_urls={
'Documentation': 'https://stix2.readthedocs.io/', 'Documentation': 'https://stix2.readthedocs.io/',
@ -60,7 +60,7 @@ setup(
'Bug Tracker': 'https://github.com/oasis-open/cti-python-stix2/issues/', 'Bug Tracker': 'https://github.com/oasis-open/cti-python-stix2/issues/',
}, },
extras_require={ extras_require={
'taxii': ['taxii2-client'], 'taxii': ['taxii2-client>=2.2.1'],
'semantic': ['haversine', 'rapidfuzz'], 'semantic': ['haversine', 'rapidfuzz'],
}, },
) )

View File

@ -12,6 +12,7 @@
pattern_visitor pattern_visitor
patterns patterns
properties properties
serialization
utils utils
v20 v20
v21 v21

View File

@ -1,7 +1,6 @@
"""Base classes for type definitions in the STIX2 library.""" """Base classes for type definitions in the STIX2 library."""
import copy import copy
import datetime as dt
import re import re
import uuid import uuid
@ -18,9 +17,10 @@ from .exceptions import (
) )
from .markings import _MarkingsMixin from .markings import _MarkingsMixin
from .markings.utils import validate from .markings.utils import validate
from .utils import ( from .serialization import (
NOW, PREFIX_21_REGEX, find_property_index, format_datetime, get_timestamp, STIXJSONEncoder, STIXJSONIncludeOptionalDefaultsEncoder, serialize,
) )
from .utils import NOW, PREFIX_21_REGEX, get_timestamp
from .versioning import new_version as _new_version from .versioning import new_version as _new_version
from .versioning import revoke as _revoke from .versioning import revoke as _revoke
@ -29,51 +29,14 @@ try:
except ImportError: except ImportError:
from collections import Mapping from collections import Mapping
# TODO: Remove STIXJSONEncoder, STIXJSONIncludeOptionalDefaultsEncoder, serialize from __all__ on next major release.
__all__ = ['STIXJSONEncoder', '_STIXBase'] # Kept for backwards compatibility.
__all__ = ['STIXJSONEncoder', 'STIXJSONIncludeOptionalDefaultsEncoder', '_STIXBase', 'serialize']
DEFAULT_ERROR = "{type} must have {property}='{expected}'." DEFAULT_ERROR = "{type} must have {property}='{expected}'."
SCO_DET_ID_NAMESPACE = uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7") SCO_DET_ID_NAMESPACE = uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7")
class STIXJSONEncoder(json.JSONEncoder):
"""Custom JSONEncoder subclass for serializing Python ``stix2`` objects.
If an optional property with a default value specified in the STIX 2 spec
is set to that default value, it will be left out of the serialized output.
An example of this type of property include the ``revoked`` common property.
"""
def default(self, obj):
if isinstance(obj, (dt.date, dt.datetime)):
return format_datetime(obj)
elif isinstance(obj, _STIXBase):
tmp_obj = dict(copy.deepcopy(obj))
for prop_name in obj._defaulted_optional_properties:
del tmp_obj[prop_name]
return tmp_obj
else:
return super(STIXJSONEncoder, self).default(obj)
class STIXJSONIncludeOptionalDefaultsEncoder(json.JSONEncoder):
"""Custom JSONEncoder subclass for serializing Python ``stix2`` objects.
Differs from ``STIXJSONEncoder`` in that if an optional property with a default
value specified in the STIX 2 spec is set to that default value, it will be
included in the serialized output.
"""
def default(self, obj):
if isinstance(obj, (dt.date, dt.datetime)):
return format_datetime(obj)
elif isinstance(obj, _STIXBase):
return dict(obj)
else:
return super(STIXJSONIncludeOptionalDefaultsEncoder, self).default(obj)
def get_required_properties(properties): def get_required_properties(properties):
return (k for k, v in properties.items() if v.required) return (k for k, v in properties.items() if v.required)
@ -272,18 +235,10 @@ class _STIXBase(Mapping):
def revoke(self): def revoke(self):
return _revoke(self) return _revoke(self)
def serialize(self, pretty=False, include_optional_defaults=False, **kwargs): def serialize(self, *args, **kwargs):
""" """
Serialize a STIX object. Serialize a STIX object.
Args:
pretty (bool): If True, output properties following the STIX specs
formatting. This includes indentation. Refer to notes for more
details. (Default: ``False``)
include_optional_defaults (bool): Determines whether to include
optional properties set to the default value defined in the spec.
**kwargs: The arguments for a json.dumps() call.
Examples: Examples:
>>> import stix2 >>> import stix2
>>> identity = stix2.Identity(name='Example Corp.', identity_class='organization') >>> identity = stix2.Identity(name='Example Corp.', identity_class='organization')
@ -302,25 +257,10 @@ class _STIXBase(Mapping):
Returns: Returns:
str: The serialized JSON object. str: The serialized JSON object.
Note: See Also:
The argument ``pretty=True`` will output the STIX object following ``stix2.serialization.serialize`` for options.
spec order. Using this argument greatly impacts object serialization
performance. If your use case is centered across machine-to-machine
operation it is recommended to set ``pretty=False``.
When ``pretty=True`` the following key-value pairs will be added or
overridden: indent=4, separators=(",", ": "), item_sort_key=sort_by.
""" """
if pretty: return serialize(self, *args, **kwargs)
def sort_by(element):
return find_property_index(self, *element)
kwargs.update({'indent': 4, 'separators': (',', ': '), 'item_sort_key': sort_by})
if include_optional_defaults:
return json.dumps(self, cls=STIXJSONIncludeOptionalDefaultsEncoder, **kwargs)
else:
return json.dumps(self, cls=STIXJSONEncoder, **kwargs)
class _DomainObject(_STIXBase, _MarkingsMixin): class _DomainObject(_STIXBase, _MarkingsMixin):

View File

@ -35,6 +35,8 @@ def _custom_object_builder(cls, type, properties, version, base_class):
base_class.__init__(self, **kwargs) base_class.__init__(self, **kwargs)
_cls_init(cls, self, kwargs) _cls_init(cls, self, kwargs)
_CustomObject.__name__ = cls.__name__
_register_object(_CustomObject, version=version) _register_object(_CustomObject, version=version)
return _CustomObject return _CustomObject
@ -51,6 +53,8 @@ def _custom_marking_builder(cls, type, properties, version, base_class):
base_class.__init__(self, **kwargs) base_class.__init__(self, **kwargs)
_cls_init(cls, self, kwargs) _cls_init(cls, self, kwargs)
_CustomMarking.__name__ = cls.__name__
_register_marking(_CustomMarking, version=version) _register_marking(_CustomMarking, version=version)
return _CustomMarking return _CustomMarking
@ -72,6 +76,8 @@ def _custom_observable_builder(cls, type, properties, version, base_class, id_co
base_class.__init__(self, **kwargs) base_class.__init__(self, **kwargs)
_cls_init(cls, self, kwargs) _cls_init(cls, self, kwargs)
_CustomObservable.__name__ = cls.__name__
_register_observable(_CustomObservable, version=version) _register_observable(_CustomObservable, version=version)
return _CustomObservable return _CustomObservable
@ -88,5 +94,7 @@ def _custom_extension_builder(cls, observable, type, properties, version, base_c
base_class.__init__(self, **kwargs) base_class.__init__(self, **kwargs)
_cls_init(cls, self, kwargs) _cls_init(cls, self, kwargs)
_CustomExtension.__name__ = cls.__name__
_register_observable_extension(observable, _CustomExtension, version=version) _register_observable_extension(observable, _CustomExtension, version=version)
return _CustomExtension return _CustomExtension

View File

@ -15,7 +15,8 @@ from stix2.datastore import (
) )
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
from stix2.parsing import parse from stix2.parsing import parse
from stix2.utils import format_datetime, get_type_from_id from stix2.serialization import serialize
from stix2.utils import format_datetime, get_type_from_id, parse_into_datetime
def _timestamp2filename(timestamp): def _timestamp2filename(timestamp):
@ -24,10 +25,12 @@ def _timestamp2filename(timestamp):
"modified" property value. This should not include an extension. "modified" property value. This should not include an extension.
Args: Args:
timestamp: A timestamp, as a datetime.datetime object. timestamp: A timestamp, as a datetime.datetime object or string.
""" """
# The format_datetime will determine the correct level of precision. # The format_datetime will determine the correct level of precision.
if isinstance(timestamp, str):
timestamp = parse_into_datetime(timestamp)
ts = format_datetime(timestamp) ts = format_datetime(timestamp)
ts = re.sub(r"[-T:\.Z ]", "", ts) ts = re.sub(r"[-T:\.Z ]", "", ts)
return ts return ts
@ -582,10 +585,10 @@ class FileSystemSink(DataSink):
if os.path.isfile(file_path): if os.path.isfile(file_path):
raise DataSourceError("Attempted to overwrite file (!) at: {}".format(file_path)) raise DataSourceError("Attempted to overwrite file (!) at: {}".format(file_path))
else:
with io.open(file_path, 'w', encoding=encoding) as f: with io.open(file_path, 'w', encoding=encoding) as f:
stix_obj = stix_obj.serialize(pretty=True, encoding=encoding, ensure_ascii=False) stix_obj = serialize(stix_obj, pretty=True, encoding=encoding, ensure_ascii=False)
f.write(stix_obj) f.write(stix_obj)
def add(self, stix_data=None, version=None): def add(self, stix_data=None, version=None):
"""Add STIX objects to file directory. """Add STIX objects to file directory.
@ -614,8 +617,12 @@ class FileSystemSink(DataSink):
self._check_path_and_write(stix_data) self._check_path_and_write(stix_data)
elif isinstance(stix_data, (str, dict)): elif isinstance(stix_data, (str, dict)):
stix_data = parse(stix_data, allow_custom=self.allow_custom, version=version) parsed_data = parse(stix_data, allow_custom=self.allow_custom, version=version)
self.add(stix_data, version=version) if isinstance(parsed_data, _STIXBase):
self.add(parsed_data, version=version)
else:
# custom unregistered object type
self._check_path_and_write(parsed_data)
elif isinstance(stix_data, list): elif isinstance(stix_data, list):
# recursively add individual STIX objects # recursively add individual STIX objects

View File

@ -2,6 +2,7 @@
import importlib import importlib
import inspect import inspect
from six import text_type
from stix2patterns.exceptions import ParseException from stix2patterns.exceptions import ParseException
from stix2patterns.grammars.STIXPatternParser import TerminalNode from stix2patterns.grammars.STIXPatternParser import TerminalNode
@ -49,6 +50,9 @@ def check_for_valid_timetamp_syntax(timestamp_string):
return _TIMESTAMP_RE.match(timestamp_string) return _TIMESTAMP_RE.match(timestamp_string)
def same_boolean_operator(current_op, op_token):
return current_op == op_token.getText()
class STIXPatternVisitorForSTIX2(): class STIXPatternVisitorForSTIX2():
classes = {} classes = {}
@ -131,7 +135,7 @@ class STIXPatternVisitorForSTIX2():
if len(children) == 1: if len(children) == 1:
return children[0] return children[0]
else: else:
if isinstance(children[0], _BooleanExpression): if isinstance(children[0], _BooleanExpression) and same_boolean_operator(children[0].operator, children[1]):
children[0].operands.append(children[2]) children[0].operands.append(children[2])
return children[0] return children[0]
else: else:
@ -256,6 +260,11 @@ class STIXPatternVisitorForSTIX2():
if isinstance(next, TerminalNode): if isinstance(next, TerminalNode):
property_path.append(self.instantiate("ListObjectPathComponent", current.property_name, next.getText())) property_path.append(self.instantiate("ListObjectPathComponent", current.property_name, next.getText()))
i += 2 i += 2
elif isinstance(next, IntegerConstant):
property_path.append(self.instantiate("ListObjectPathComponent",
current.property_name if isinstance(current, BasicObjectPathComponent) else text_type(current),
next.value))
i += 2
else: else:
property_path.append(current) property_path.append(current)
i += 1 i += 1
@ -269,7 +278,12 @@ class STIXPatternVisitorForSTIX2():
# Visit a parse tree produced by STIXPatternParser#firstPathComponent. # Visit a parse tree produced by STIXPatternParser#firstPathComponent.
def visitFirstPathComponent(self, ctx): def visitFirstPathComponent(self, ctx):
children = self.visitChildren(ctx) children = self.visitChildren(ctx)
step = children[0].getText() first_component = children[0]
# hack for when the first component isn't a TerminalNode (see issue #438)
if isinstance(first_component, TerminalNode):
step = first_component.getText()
else:
step = text_type(first_component)
# if step.endswith("_ref"): # if step.endswith("_ref"):
# return stix2.ReferenceObjectPathComponent(step) # return stix2.ReferenceObjectPathComponent(step)
# else: # else:
@ -288,8 +302,8 @@ class STIXPatternVisitorForSTIX2():
def visitKeyPathStep(self, ctx): def visitKeyPathStep(self, ctx):
children = self.visitChildren(ctx) children = self.visitChildren(ctx)
if isinstance(children[1], StringConstant): if isinstance(children[1], StringConstant):
# special case for hashes # special case for hashes and quoted steps
return children[1].value return children[1]
else: else:
return self.instantiate("BasicObjectPathComponent", children[1].getText(), True) return self.instantiate("BasicObjectPathComponent", children[1].getText(), True)

View File

@ -248,7 +248,10 @@ def make_constant(value):
class _ObjectPathComponent(object): class _ObjectPathComponent(object):
@staticmethod @staticmethod
def create_ObjectPathComponent(component_name): def create_ObjectPathComponent(component_name):
if component_name.endswith("_ref"): # first case is to handle if component_name was quoted
if isinstance(component_name, StringConstant):
return BasicObjectPathComponent(component_name.value, False)
elif component_name.endswith("_ref"):
return ReferenceObjectPathComponent(component_name) return ReferenceObjectPathComponent(component_name)
elif component_name.find("[") != -1: elif component_name.find("[") != -1:
parse1 = component_name.split("[") parse1 = component_name.split("[")

162
stix2/serialization.py Normal file
View File

@ -0,0 +1,162 @@
"""STIX2 core serialization methods."""
import copy
import datetime as dt
import simplejson as json
import stix2.base
from .utils import format_datetime
class STIXJSONEncoder(json.JSONEncoder):
"""Custom JSONEncoder subclass for serializing Python ``stix2`` objects.
If an optional property with a default value specified in the STIX 2 spec
is set to that default value, it will be left out of the serialized output.
An example of this type of property include the ``revoked`` common property.
"""
def default(self, obj):
if isinstance(obj, (dt.date, dt.datetime)):
return format_datetime(obj)
elif isinstance(obj, stix2.base._STIXBase):
tmp_obj = dict(copy.deepcopy(obj))
for prop_name in obj._defaulted_optional_properties:
del tmp_obj[prop_name]
return tmp_obj
else:
return super(STIXJSONEncoder, self).default(obj)
class STIXJSONIncludeOptionalDefaultsEncoder(json.JSONEncoder):
"""Custom JSONEncoder subclass for serializing Python ``stix2`` objects.
Differs from ``STIXJSONEncoder`` in that if an optional property with a default
value specified in the STIX 2 spec is set to that default value, it will be
included in the serialized output.
"""
def default(self, obj):
if isinstance(obj, (dt.date, dt.datetime)):
return format_datetime(obj)
elif isinstance(obj, stix2.base._STIXBase):
return dict(obj)
else:
return super(STIXJSONIncludeOptionalDefaultsEncoder, self).default(obj)
def serialize(obj, pretty=False, include_optional_defaults=False, **kwargs):
"""
Serialize a STIX object.
Args:
obj: The STIX object to be serialized.
pretty (bool): If True, output properties following the STIX specs
formatting. This includes indentation. Refer to notes for more
details. (Default: ``False``)
include_optional_defaults (bool): Determines whether to include
optional properties set to the default value defined in the spec.
**kwargs: The arguments for a json.dumps() call.
Returns:
str: The serialized JSON object.
Note:
The argument ``pretty=True`` will output the STIX object following
spec order. Using this argument greatly impacts object serialization
performance. If your use case is centered across machine-to-machine
operation it is recommended to set ``pretty=False``.
When ``pretty=True`` the following key-value pairs will be added or
overridden: indent=4, separators=(",", ": "), item_sort_key=sort_by.
"""
if pretty:
def sort_by(element):
return find_property_index(obj, *element)
kwargs.update({'indent': 4, 'separators': (',', ': '), 'item_sort_key': sort_by})
if include_optional_defaults:
return json.dumps(obj, cls=STIXJSONIncludeOptionalDefaultsEncoder, **kwargs)
else:
return json.dumps(obj, cls=STIXJSONEncoder, **kwargs)
def _find(seq, val):
"""
Search sequence 'seq' for val. This behaves like str.find(): if not found,
-1 is returned instead of throwing an exception.
Args:
seq: The sequence to search
val: The value to search for
Returns:
int: The index of the value if found, or -1 if not found
"""
try:
return seq.index(val)
except ValueError:
return -1
def _find_property_in_seq(seq, search_key, search_value):
"""
Helper for find_property_index(): search for the property in all elements
of the given sequence.
Args:
seq: The sequence
search_key: Property name to find
search_value: Property value to find
Returns:
int: A property index, or -1 if the property was not found
"""
idx = -1
for elem in seq:
idx = find_property_index(elem, search_key, search_value)
if idx >= 0:
break
return idx
def find_property_index(obj, search_key, search_value):
"""
Search (recursively) for the given key and value in the given object.
Return an index for the key, relative to whatever object it's found in.
Args:
obj: The object to search (list, dict, or stix object)
search_key: A search key
search_value: A search value
Returns:
int: An index; -1 if the key and value aren't found
"""
# Special-case keys which are numbers-as-strings, e.g. for cyber-observable
# mappings. Use the int value of the key as the index.
if search_key.isdigit():
return int(search_key)
if isinstance(obj, stix2.base._STIXBase):
if search_key in obj and obj[search_key] == search_value:
idx = _find(obj.object_properties(), search_key)
else:
idx = _find_property_in_seq(obj.values(), search_key, search_value)
elif isinstance(obj, dict):
if search_key in obj and obj[search_key] == search_value:
idx = _find(sorted(obj), search_key)
else:
idx = _find_property_in_seq(obj.values(), search_key, search_value)
elif isinstance(obj, list):
idx = _find_property_in_seq(obj, search_key, search_value)
else:
# Don't know how to search this type
idx = -1
return idx

View File

@ -723,7 +723,7 @@ def test_custom_extension():
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo: with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
NewExtension(property2=42) NewExtension(property2=42)
assert excinfo.value.properties == ['property1'] assert excinfo.value.properties == ['property1']
assert str(excinfo.value) == "No values for required properties for _CustomExtension: (property1)." assert str(excinfo.value) == "No values for required properties for NewExtension: (property1)."
with pytest.raises(ValueError) as excinfo: with pytest.raises(ValueError) as excinfo:
NewExtension(property1='something', property2=4) NewExtension(property1='something', property2=4)

View File

@ -633,6 +633,26 @@ def test_filesystem_object_with_custom_property_in_bundle(fs_store):
assert camp_r.x_empire == camp.x_empire assert camp_r.x_empire == camp.x_empire
def test_filesystem_custom_object_dict(fs_store):
fs_store.sink.allow_custom = True
newobj = {
"type": "x-new-obj-2",
"id": "x-new-obj-2--d08dc866-6149-47db-aae6-7b58a827e7f0",
"created": "2020-07-20T03:45:02.879Z",
"modified": "2020-07-20T03:45:02.879Z",
"property1": "something",
}
fs_store.add(newobj)
newobj_r = fs_store.get(newobj["id"])
assert newobj_r["id"] == newobj["id"]
assert newobj_r["property1"] == 'something'
# remove dir
shutil.rmtree(os.path.join(FS_PATH, "x-new-obj-2"), True)
fs_store.sink.allow_custom = False
def test_filesystem_custom_object(fs_store): def test_filesystem_custom_object(fs_store):
@stix2.v20.CustomObject( @stix2.v20.CustomObject(
'x-new-obj-2', [ 'x-new-obj-2', [

View File

@ -511,6 +511,32 @@ def test_parsing_start_stop_qualified_expression():
) == "[ipv4-addr:value = '1.2.3.4'] START '2016-06-01T00:00:00Z' STOP '2017-03-12T08:30:00Z'" ) == "[ipv4-addr:value = '1.2.3.4'] START '2016-06-01T00:00:00Z' STOP '2017-03-12T08:30:00Z'"
def test_parsing_mixed_boolean_expression_1():
patt_obj = create_pattern_object("[a:b = 1 AND a:b = 2 OR a:b = 3]")
assert str(patt_obj) == "[a:b = 1 AND a:b = 2 OR a:b = 3]"
def test_parsing_mixed_boolean_expression_2():
patt_obj = create_pattern_object("[a:b = 1 OR a:b = 2 AND a:b = 3]")
assert str(patt_obj) == "[a:b = 1 OR a:b = 2 AND a:b = 3]"
def test_parsing_integer_index():
patt_obj = create_pattern_object("[a:b[1]=2]")
assert str(patt_obj) == "[a:b[1] = 2]"
# This should never occur, because the first component will always be a property_name, and they should not be quoted.
def test_parsing_quoted_first_path_component():
patt_obj = create_pattern_object("[a:'b'[1]=2]")
assert str(patt_obj) == "[a:'b'[1] = 2]"
def test_parsing_quoted_second_path_component():
patt_obj = create_pattern_object("[a:b.'b'[1]=2]")
assert str(patt_obj) == "[a:b.'b'[1] = 2]"
def test_parsing_illegal_start_stop_qualified_expression(): def test_parsing_illegal_start_stop_qualified_expression():
with pytest.raises(ValueError): with pytest.raises(ValueError):
create_pattern_object("[ipv4-addr:value = '1.2.3.4'] START '2016-06-01' STOP '2017-03-12T08:30:00Z'", version="2.0") create_pattern_object("[ipv4-addr:value = '1.2.3.4'] START '2016-06-01' STOP '2017-03-12T08:30:00Z'", version="2.0")

View File

@ -6,6 +6,7 @@ from io import StringIO
import pytest import pytest
import pytz import pytz
import stix2.serialization
import stix2.utils import stix2.utils
from .constants import IDENTITY_ID from .constants import IDENTITY_ID
@ -198,7 +199,7 @@ def test_deduplicate(stix_objs1):
], ],
) )
def test_find_property_index(object, tuple_to_find, expected_index): def test_find_property_index(object, tuple_to_find, expected_index):
assert stix2.utils.find_property_index( assert stix2.serialization.find_property_index(
object, object,
*tuple_to_find *tuple_to_find
) == expected_index ) == expected_index
@ -235,4 +236,4 @@ def test_find_property_index(object, tuple_to_find, expected_index):
], ],
) )
def test_iterate_over_values(dict_value, tuple_to_find, expected_index): def test_iterate_over_values(dict_value, tuple_to_find, expected_index):
assert stix2.utils._find_property_in_seq(dict_value.values(), *tuple_to_find) == expected_index assert stix2.serialization._find_property_in_seq(dict_value.values(), *tuple_to_find) == expected_index

View File

@ -920,7 +920,7 @@ def test_custom_extension():
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo: with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
NewExtension(property2=42) NewExtension(property2=42)
assert excinfo.value.properties == ['property1'] assert excinfo.value.properties == ['property1']
assert str(excinfo.value) == "No values for required properties for _CustomExtension: (property1)." assert str(excinfo.value) == "No values for required properties for NewExtension: (property1)."
with pytest.raises(ValueError) as excinfo: with pytest.raises(ValueError) as excinfo:
NewExtension(property1='something', property2=4) NewExtension(property1='something', property2=4)

View File

@ -654,6 +654,27 @@ def test_filesystem_object_with_custom_property_in_bundle(fs_store):
assert camp_r.x_empire == camp.x_empire assert camp_r.x_empire == camp.x_empire
def test_filesystem_custom_object_dict(fs_store):
fs_store.sink.allow_custom = True
newobj = {
"type": "x-new-obj-2",
"id": "x-new-obj-2--d08dc866-6149-47db-aae6-7b58a827e7f0",
"spec_version": "2.1",
"created": "2020-07-20T03:45:02.879Z",
"modified": "2020-07-20T03:45:02.879Z",
"property1": "something",
}
fs_store.add(newobj)
newobj_r = fs_store.get(newobj["id"])
assert newobj_r["id"] == newobj["id"]
assert newobj_r["property1"] == 'something'
# remove dir
shutil.rmtree(os.path.join(FS_PATH, "x-new-obj-2"), True)
fs_store.sink.allow_custom = False
def test_filesystem_custom_object(fs_store): def test_filesystem_custom_object(fs_store):
@stix2.v21.CustomObject( @stix2.v21.CustomObject(
'x-new-obj-2', [ 'x-new-obj-2', [

View File

@ -644,6 +644,31 @@ def test_parsing_boolean():
assert str(patt_obj) == "[network-traffic:is_active = true]" assert str(patt_obj) == "[network-traffic:is_active = true]"
def test_parsing_mixed_boolean_expression_1():
patt_obj = create_pattern_object("[a:b = 1 AND a:b = 2 OR a:b = 3]",)
assert str(patt_obj) == "[a:b = 1 AND a:b = 2 OR a:b = 3]"
def test_parsing_mixed_boolean_expression_2():
patt_obj = create_pattern_object("[a:b = 1 OR a:b = 2 AND a:b = 3]",)
assert str(patt_obj) == "[a:b = 1 OR a:b = 2 AND a:b = 3]"
def test_parsing_integer_index():
patt_obj = create_pattern_object("[a:b[1]=2]")
assert str(patt_obj) == "[a:b[1] = 2]"
# This should never occur, because the first component will always be a property_name, and they should not be quoted.
def test_parsing_quoted_first_path_component():
patt_obj = create_pattern_object("[a:'b'[1]=2]")
assert str(patt_obj) == "[a:'b'[1] = 2]"
def test_parsing_quoted_second_path_component():
patt_obj = create_pattern_object("[a:b.'b'[1]=2]")
assert str(patt_obj) == "[a:b.'b'[1] = 2]"
def test_parsing_multiple_slashes_quotes(): def test_parsing_multiple_slashes_quotes():
patt_obj = create_pattern_object("[ file:name = 'weird_name\\'' ]", version="2.1") patt_obj = create_pattern_object("[ file:name = 'weird_name\\'' ]", version="2.1")
assert str(patt_obj) == "[file:name = 'weird_name\\'']" assert str(patt_obj) == "[file:name = 'weird_name\\'']"

View File

@ -6,6 +6,7 @@ from io import StringIO
import pytest import pytest
import pytz import pytz
import stix2.serialization
import stix2.utils import stix2.utils
from .constants import IDENTITY_ID from .constants import IDENTITY_ID
@ -201,7 +202,7 @@ def test_deduplicate(stix_objs1):
], ],
) )
def test_find_property_index(object, tuple_to_find, expected_index): def test_find_property_index(object, tuple_to_find, expected_index):
assert stix2.utils.find_property_index( assert stix2.serialization.find_property_index(
object, object,
*tuple_to_find *tuple_to_find
) == expected_index ) == expected_index
@ -238,4 +239,4 @@ def test_find_property_index(object, tuple_to_find, expected_index):
], ],
) )
def test_iterate_over_values(dict_value, tuple_to_find, expected_index): def test_iterate_over_values(dict_value, tuple_to_find, expected_index):
assert stix2.utils._find_property_in_seq(dict_value.values(), *tuple_to_find) == expected_index assert stix2.serialization._find_property_in_seq(dict_value.values(), *tuple_to_find) == expected_index

View File

@ -298,83 +298,6 @@ def _get_dict(data):
raise ValueError("Cannot convert '%s' to dictionary." % str(data)) raise ValueError("Cannot convert '%s' to dictionary." % str(data))
def _find(seq, val):
"""
Search sequence 'seq' for val. This behaves like str.find(): if not found,
-1 is returned instead of throwing an exception.
Args:
seq: The sequence to search
val: The value to search for
Returns:
int: The index of the value if found, or -1 if not found
"""
try:
return seq.index(val)
except ValueError:
return -1
def _find_property_in_seq(seq, search_key, search_value):
"""
Helper for find_property_index(): search for the property in all elements
of the given sequence.
Args:
seq: The sequence
search_key: Property name to find
search_value: Property value to find
Returns:
int: A property index, or -1 if the property was not found
"""
idx = -1
for elem in seq:
idx = find_property_index(elem, search_key, search_value)
if idx >= 0:
break
return idx
def find_property_index(obj, search_key, search_value):
"""
Search (recursively) for the given key and value in the given object.
Return an index for the key, relative to whatever object it's found in.
Args:
obj: The object to search (list, dict, or stix object)
search_key: A search key
search_value: A search value
Returns:
int: An index; -1 if the key and value aren't found
"""
# Special-case keys which are numbers-as-strings, e.g. for cyber-observable
# mappings. Use the int value of the key as the index.
if search_key.isdigit():
return int(search_key)
if isinstance(obj, stix2.base._STIXBase):
if search_key in obj and obj[search_key] == search_value:
idx = _find(obj.object_properties(), search_key)
else:
idx = _find_property_in_seq(obj.values(), search_key, search_value)
elif isinstance(obj, dict):
if search_key in obj and obj[search_key] == search_value:
idx = _find(sorted(obj), search_key)
else:
idx = _find_property_in_seq(obj.values(), search_key, search_value)
elif isinstance(obj, list):
idx = _find_property_in_seq(obj, search_key, search_value)
else:
# Don't know how to search this type
idx = -1
return idx
def get_class_hierarchy_names(obj): def get_class_hierarchy_names(obj):
"""Given an object, return the names of the class hierarchy.""" """Given an object, return the names of the class hierarchy."""
names = [] names = []

View File

@ -26,10 +26,14 @@ class Bundle(_STIXBase20):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
# Add any positional arguments to the 'objects' kwarg. # Add any positional arguments to the 'objects' kwarg.
if args: if args:
if isinstance(args[0], list): obj_list = []
kwargs['objects'] = args[0] + list(args[1:]) + kwargs.get('objects', []) for arg in args:
else: if isinstance(arg, list):
kwargs['objects'] = list(args) + kwargs.get('objects', []) obj_list = obj_list + arg
else:
obj_list.append(arg)
kwargs['objects'] = obj_list + kwargs.get('objects', [])
allow_custom = kwargs.get('allow_custom', False) allow_custom = kwargs.get('allow_custom', False)
self._allow_custom = allow_custom self._allow_custom = allow_custom

View File

@ -23,10 +23,14 @@ class Bundle(_STIXBase21):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
# Add any positional arguments to the 'objects' kwarg. # Add any positional arguments to the 'objects' kwarg.
if args: if args:
if isinstance(args[0], list): obj_list = []
kwargs['objects'] = args[0] + list(args[1:]) + kwargs.get('objects', []) for arg in args:
else: if isinstance(arg, list):
kwargs['objects'] = list(args) + kwargs.get('objects', []) obj_list = obj_list + arg
else:
obj_list.append(arg)
kwargs['objects'] = obj_list + kwargs.get('objects', [])
allow_custom = kwargs.get('allow_custom', False) allow_custom = kwargs.get('allow_custom', False)
self._allow_custom = allow_custom self._allow_custom = allow_custom