Merge branch 'master' of github.com:oasis-open/cti-python-stix2 into main

pull/1/head
chrisr3d 2021-03-24 12:36:49 +01:00
commit 8093e07d1b
48 changed files with 3571 additions and 891 deletions

3
.gitignore vendored
View File

@ -55,8 +55,7 @@ coverage.xml
# Sphinx documentation
docs/_build/
.ipynb_checkpoints
graph_default_sem_eq_weights.rst
object_default_sem_eq_weights.rst
similarity_weights.rst
# PyBuilder
target/

View File

@ -10,7 +10,6 @@ known_third_party =
pytz,
requests,
simplejson,
six,
sphinx,
stix2patterns,
taxii2client,

View File

@ -23,3 +23,4 @@ repos:
args: ["-c", "--diff"]
- id: isort
name: Sort python imports (fixes files)
exclude: ^stix2/canonicalization/

View File

@ -21,6 +21,8 @@ Install with `pip <https://pip.pypa.io/en/stable/>`__:
$ pip install stix2
Note: The library requires Python 3.6+.
Usage
-----

View File

@ -4,11 +4,9 @@ import os
import re
import sys
from six import class_types
from sphinx.ext.autodoc import ClassDocumenter
from stix2.base import _STIXBase
from stix2.equivalence.graph import GRAPH_WEIGHTS
from stix2.equivalence.object import WEIGHTS
from stix2.version import __version__
@ -67,16 +65,9 @@ object_default_sem_eq_weights = json.dumps(WEIGHTS, indent=4, default=lambda o:
object_default_sem_eq_weights = object_default_sem_eq_weights.replace('\n', '\n ')
object_default_sem_eq_weights = object_default_sem_eq_weights.replace(' "', ' ')
object_default_sem_eq_weights = object_default_sem_eq_weights.replace('"\n', '\n')
with open('object_default_sem_eq_weights.rst', 'w') as f:
with open('similarity_weights.rst', 'w') as f:
f.write(".. code-block:: python\n\n {}\n\n".format(object_default_sem_eq_weights))
graph_default_sem_eq_weights = json.dumps(GRAPH_WEIGHTS, indent=4, default=lambda o: o.__name__)
graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace('\n', '\n ')
graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace(' "', ' ')
graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace('"\n', '\n')
with open('graph_default_sem_eq_weights.rst', 'w') as f:
f.write(".. code-block:: python\n\n {}\n\n".format(graph_default_sem_eq_weights))
def get_property_type(prop):
"""Convert property classname into pretty string name of property.
@ -107,7 +98,7 @@ class STIXPropertyDocumenter(ClassDocumenter):
@classmethod
def can_document_member(cls, member, membername, isattr, parent):
return isinstance(member, class_types) and \
return isinstance(member, type) and \
issubclass(member, _STIXBase) and \
hasattr(member, '_properties')

File diff suppressed because it is too large Load Diff

View File

@ -47,11 +47,11 @@ setup(
],
keywords='stix stix2 json cti cyber threat intelligence',
packages=find_packages(exclude=['*.test', '*.test.*']),
python_requires='>=3.6',
install_requires=[
'pytz',
'requests',
'simplejson',
'six>=1.13.0',
'stix2-patterns>=1.2.0',
],
project_urls={
@ -60,7 +60,7 @@ setup(
'Bug Tracker': 'https://github.com/oasis-open/cti-python-stix2/issues/',
},
extras_require={
'taxii': ['taxii2-client>=2.2.1'],
'taxii': ['taxii2-client>=2.3.0'],
'semantic': ['haversine', 'rapidfuzz'],
},
)

View File

@ -5,7 +5,6 @@ import re
import uuid
import simplejson as json
import six
import stix2
from stix2.canonicalization.Canonicalize import canonicalize
@ -18,7 +17,8 @@ from .exceptions import (
from .markings import _MarkingsMixin
from .markings.utils import validate
from .serialization import (
STIXJSONEncoder, STIXJSONIncludeOptionalDefaultsEncoder, serialize,
STIXJSONEncoder, STIXJSONIncludeOptionalDefaultsEncoder, fp_serialize,
serialize,
)
from .utils import NOW, PREFIX_21_REGEX, get_timestamp
from .versioning import new_version as _new_version
@ -70,12 +70,9 @@ class _STIXBase(Mapping):
# InvalidValueError... so let those propagate.
raise
except Exception as exc:
six.raise_from(
InvalidValueError(
self.__class__, prop_name, reason=str(exc),
),
exc,
)
raise InvalidValueError(
self.__class__, prop_name, reason=str(exc),
) from exc
# interproperty constraint methods
@ -266,6 +263,35 @@ class _STIXBase(Mapping):
"""
return serialize(self, *args, **kwargs)
def fp_serialize(self, *args, **kwargs):
"""
Serialize a STIX object to ``fp`` (a text stream file-like supporting object).
Examples:
>>> import stix2
>>> identity = stix2.Identity(name='Example Corp.', identity_class='organization')
>>> print(identity.serialize(sort_keys=True))
{"created": "2018-06-08T19:03:54.066Z", ... "name": "Example Corp.", "type": "identity"}
>>> print(identity.serialize(sort_keys=True, indent=4))
{
"created": "2018-06-08T19:03:54.066Z",
"id": "identity--d7f3e25a-ba1c-447a-ab71-6434b092b05e",
"identity_class": "organization",
"modified": "2018-06-08T19:03:54.066Z",
"name": "Example Corp.",
"type": "identity"
}
>>> with open("example.json", mode="w", encoding="utf-8") as f:
>>> identity.fp_serialize(f, pretty=True)
Returns:
None
See Also:
``stix2.serialization.fp_serialize`` for options.
"""
fp_serialize(self, *args, **kwargs)
class _DomainObject(_STIXBase, _MarkingsMixin):
def __init__(self, *args, **kwargs):
@ -386,19 +412,8 @@ class _Observable(_STIXBase):
if json_serializable_object:
data = canonicalize(json_serializable_object, utf8=False)
# The situation is complicated w.r.t. python 2/3 behavior, so
# I'd rather not rely on particular exceptions being raised to
# determine what to do. Better to just check the python version
# directly.
if six.PY3:
uuid_ = uuid.uuid5(SCO_DET_ID_NAMESPACE, data)
else:
uuid_ = uuid.uuid5(
SCO_DET_ID_NAMESPACE, data.encode("utf-8"),
)
id_ = "{}--{}".format(self._type, six.text_type(uuid_))
uuid_ = uuid.uuid5(SCO_DET_ID_NAMESPACE, data)
id_ = "{}--{}".format(self._type, str(uuid_))
return id_
@ -464,7 +479,7 @@ def _make_json_serializable(value):
for v in value
]
elif not isinstance(value, (int, float, six.string_types, bool)):
elif not isinstance(value, (int, float, str, bool)):
# If a "simple" value which is not already JSON-serializable,
# JSON-serialize to a string and use that as our JSON-serializable
# value. This applies to our datetime objects currently (timestamp

View File

@ -20,12 +20,8 @@
# JCS compatible JSON serializer for Python 3.x #
#################################################
# This file has been modified to be compatible with Python 2.x as well
import re
import six
from stix2.canonicalization.NumberToJson import convert2Es6Format
try:
@ -55,10 +51,10 @@ ESCAPE_DCT = {
}
for i in range(0x20):
ESCAPE_DCT.setdefault(chr(i), '\\u{0:04x}'.format(i))
#ESCAPE_DCT.setdefault(chr(i), '\\u%04x' % (i,))
INFINITY = float('inf')
def py_encode_basestring(s):
"""Return a JSON representation of a Python string
@ -70,7 +66,6 @@ def py_encode_basestring(s):
encode_basestring = (c_encode_basestring or py_encode_basestring)
def py_encode_basestring_ascii(s):
"""Return an ASCII-only JSON representation of a Python string
@ -83,6 +78,7 @@ def py_encode_basestring_ascii(s):
n = ord(s)
if n < 0x10000:
return '\\u{0:04x}'.format(n)
#return '\\u%04x' % (n,)
else:
# surrogate pair
n -= 0x10000
@ -96,7 +92,6 @@ encode_basestring_ascii = (
c_encode_basestring_ascii or py_encode_basestring_ascii
)
class JSONEncoder(object):
"""Extensible JSON <http://json.org> encoder for Python data structures.
@ -128,11 +123,10 @@ class JSONEncoder(object):
"""
item_separator = ', '
key_separator = ': '
def __init__(
self, skipkeys=False, ensure_ascii=False,
self, *, skipkeys=False, ensure_ascii=False,
check_circular=True, allow_nan=True, sort_keys=True,
indent=None, separators=(',', ':'), default=None,
indent=None, separators=(',', ':'), default=None
):
"""Constructor for JSONEncoder, with sensible defaults.
@ -277,6 +271,7 @@ class JSONEncoder(object):
return text
if (
_one_shot and c_make_encoder is not None
and self.indent is None
@ -294,11 +289,10 @@ class JSONEncoder(object):
)
return _iterencode(o, 0)
def _make_iterencode(
markers, _default, _encoder, _indent, _floatstr,
_key_separator, _item_separator, _sort_keys, _skipkeys, _one_shot,
# HACK: hand-optimized bytecode; turn globals into locals
## HACK: hand-optimized bytecode; turn globals into locals
ValueError=ValueError,
dict=dict,
float=float,
@ -362,10 +356,7 @@ def _make_iterencode(
chunks = _iterencode_dict(value, _current_indent_level)
else:
chunks = _iterencode(value, _current_indent_level)
# Below line commented-out for python2 compatibility
# yield from chunks
for chunk in chunks:
yield chunk
yield from chunks
if newline_indent is not None:
_current_indent_level -= 1
yield '\n' + _indent * _current_indent_level
@ -397,8 +388,7 @@ def _make_iterencode(
else:
items = dct.items()
for key, value in items:
# Replaced isinstance(key, str) with below to enable simultaneous python 2 & 3 compatibility
if isinstance(key, six.string_types) or isinstance(key, six.binary_type):
if isinstance(key, str):
pass
# JavaScript is weakly typed for these, so it makes sense to
# also allow them. Many encoders seem to do something like this.
@ -445,10 +435,7 @@ def _make_iterencode(
chunks = _iterencode_dict(value, _current_indent_level)
else:
chunks = _iterencode(value, _current_indent_level)
# Below line commented-out for python2 compatibility
# yield from chunks
for chunk in chunks:
yield chunk
yield from chunks
if newline_indent is not None:
_current_indent_level -= 1
yield '\n' + _indent * _current_indent_level
@ -457,8 +444,7 @@ def _make_iterencode(
del markers[markerid]
def _iterencode(o, _current_indent_level):
# Replaced isinstance(o, str) with below to enable simultaneous python 2 & 3 compatibility
if isinstance(o, six.string_types) or isinstance(o, six.binary_type):
if isinstance(o, str):
yield _encoder(o)
elif o is None:
yield 'null'
@ -473,15 +459,9 @@ def _make_iterencode(
# see comment for int/float in _make_iterencode
yield convert2Es6Format(o)
elif isinstance(o, (list, tuple)):
# Below line commented-out for python2 compatibility
# yield from _iterencode_list(o, _current_indent_level)
for thing in _iterencode_list(o, _current_indent_level):
yield thing
yield from _iterencode_list(o, _current_indent_level)
elif isinstance(o, dict):
# Below line commented-out for python2 compatibility
# yield from _iterencode_dict(o, _current_indent_level)
for thing in _iterencode_dict(o, _current_indent_level):
yield thing
yield from _iterencode_dict(o, _current_indent_level)
else:
if markers is not None:
markerid = id(o)
@ -489,23 +469,18 @@ def _make_iterencode(
raise ValueError("Circular reference detected")
markers[markerid] = o
o = _default(o)
# Below line commented-out for python2 compatibility
# yield from _iterencode(o, _current_indent_level)
for thing in _iterencode(o, _current_indent_level):
yield thing
yield from _iterencode(o, _current_indent_level)
if markers is not None:
del markers[markerid]
return _iterencode
def canonicalize(obj, utf8=True):
def canonicalize(obj,utf8=True):
textVal = JSONEncoder(sort_keys=True).encode(obj)
if utf8:
return textVal.encode()
return textVal
def serialize(obj, utf8=True):
def serialize(obj,utf8=True):
textVal = JSONEncoder(sort_keys=False).encode(obj)
if utf8:
return textVal.encode()

View File

@ -21,40 +21,50 @@
# Convert a Python double/float into an ES6/V8 compatible string #
##################################################################
def convert2Es6Format(value):
# Convert double/float to str using the native Python formatter
# Convert double/float to str using the native Python formatter
fvalue = float(value)
# Zero is a special case. The following line takes "-0" case as well
#
# Zero is a special case. The following line takes "-0" case as well
#
if fvalue == 0:
return '0'
# The rest of the algorithm works on the textual representation only
#
# The rest of the algorithm works on the textual representation only
#
pyDouble = str(fvalue)
# The following line catches the "inf" and "nan" values returned by str(fvalue)
#
# The following line catches the "inf" and "nan" values returned by str(fvalue)
#
if pyDouble.find('n') >= 0:
raise ValueError("Invalid JSON number: " + pyDouble)
# Save sign separately, it doesn't have any role in the algorithm
#
# Save sign separately, it doesn't have any role in the algorithm
#
pySign = ''
if pyDouble.find('-') == 0:
pySign = '-'
pyDouble = pyDouble[1:]
# Now we should only have valid non-zero values
#
# Now we should only have valid non-zero values
#
pyExpStr = ''
pyExpVal = 0
q = pyDouble.find('e')
if q > 0:
# Grab the exponent and remove it from the number
#
# Grab the exponent and remove it from the number
#
pyExpStr = pyDouble[q:]
if pyExpStr[2:3] == '0':
# Supress leading zero on exponents
#
# Supress leading zero on exponents
#
pyExpStr = pyExpStr[:2] + pyExpStr[3:]
pyDouble = pyDouble[0:q]
pyExpVal = int(pyExpStr[1:])
# Split number in pyFirst + pyDot + pyLast
#
# Split number in pyFirst + pyDot + pyLast
#
pyFirst = pyDouble
pyDot = ''
pyLast = ''
@ -63,33 +73,40 @@ def convert2Es6Format(value):
pyDot = '.'
pyFirst = pyDouble[:q]
pyLast = pyDouble[q + 1:]
# Now the string is split into: pySign + pyFirst + pyDot + pyLast + pyExpStr
#
# Now the string is split into: pySign + pyFirst + pyDot + pyLast + pyExpStr
#
if pyLast == '0':
# Always remove trailing .0
#
# Always remove trailing .0
#
pyDot = ''
pyLast = ''
if pyExpVal > 0 and pyExpVal < 21:
# Integers are shown as is with up to 21 digits
#
# Integers are shown as is with up to 21 digits
#
pyFirst += pyLast
pyLast = ''
pyDot = ''
pyExpStr = ''
q = pyExpVal - len(pyFirst)
while q >= 0:
q -= 1
q -= 1;
pyFirst += '0'
elif pyExpVal < 0 and pyExpVal > -7:
# Small numbers are shown as 0.etc with e-6 as lower limit
#
# Small numbers are shown as 0.etc with e-6 as lower limit
#
pyLast = pyFirst + pyLast
pyFirst = '0'
pyDot = '.'
pyExpStr = ''
q = pyExpVal
while q < -1:
q += 1
q += 1;
pyLast = '0' + pyLast
# The resulting sub-strings are concatenated
#
# The resulting sub-strings are concatenated
#
return pySign + pyFirst + pyDot + pyLast + pyExpStr

View File

@ -1,7 +1,5 @@
from collections import OrderedDict
import six
from .base import _cls_init
from .registration import (
_register_marking, _register_object, _register_observable,
@ -13,14 +11,11 @@ def _get_properties_dict(properties):
try:
return OrderedDict(properties)
except TypeError as e:
six.raise_from(
ValueError(
"properties must be dict-like, e.g. a list "
"containing tuples. For example, "
"[('property1', IntegerProperty())]",
),
e,
)
raise ValueError(
"properties must be dict-like, e.g. a list "
"containing tuples. For example, "
"[('property1', IntegerProperty())]",
) from e
def _custom_object_builder(cls, type, properties, version, base_class):

View File

@ -15,8 +15,6 @@ Python STIX2 DataStore API.
from abc import ABCMeta, abstractmethod
import uuid
from six import with_metaclass
from stix2.datastore.filters import Filter, FilterSet
from stix2.utils import deduplicate
@ -219,7 +217,7 @@ class DataStoreMixin(object):
raise AttributeError(msg % self.__class__.__name__)
class DataSink(with_metaclass(ABCMeta)):
class DataSink(metaclass=ABCMeta):
"""An implementer will create a concrete subclass from
this class for the specific DataSink.
@ -245,7 +243,7 @@ class DataSink(with_metaclass(ABCMeta)):
"""
class DataSource(with_metaclass(ABCMeta)):
class DataSource(metaclass=ABCMeta):
"""An implementer will create a concrete subclass from
this class for the specific DataSource.

View File

@ -6,8 +6,6 @@ import os
import re
import stat
import six
from stix2 import v20, v21
from stix2.base import _STIXBase
from stix2.datastore import (
@ -15,7 +13,7 @@ from stix2.datastore import (
)
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
from stix2.parsing import parse
from stix2.serialization import serialize
from stix2.serialization import fp_serialize
from stix2.utils import format_datetime, get_type_from_id, parse_into_datetime
@ -116,7 +114,7 @@ def _update_allow(allow_set, value):
"""
adding_seq = hasattr(value, "__iter__") and \
not isinstance(value, six.string_types)
not isinstance(value, str)
if allow_set is None:
allow_set = set()
@ -586,9 +584,8 @@ class FileSystemSink(DataSink):
if os.path.isfile(file_path):
raise DataSourceError("Attempted to overwrite file (!) at: {}".format(file_path))
with io.open(file_path, 'w', encoding=encoding) as f:
stix_obj = serialize(stix_obj, pretty=True, encoding=encoding, ensure_ascii=False)
f.write(stix_obj)
with io.open(file_path, mode='w', encoding=encoding) as f:
fp_serialize(stix_obj, f, pretty=True, encoding=encoding, ensure_ascii=False)
def add(self, stix_data=None, version=None):
"""Add STIX objects to file directory.

View File

@ -3,8 +3,6 @@
import collections
from datetime import datetime
import six
import stix2.utils
"""Supported filter operations"""
@ -12,8 +10,7 @@ FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=', 'contains']
"""Supported filter value types"""
FILTER_VALUE_TYPES = (
bool, dict, float, int, list, tuple, six.string_types,
datetime,
bool, dict, float, int, list, tuple, str, datetime,
)
@ -84,7 +81,7 @@ class Filter(collections.namedtuple('Filter', ['property', 'op', 'value'])):
# If filtering on a timestamp property and the filter value is a string,
# try to convert the filter value to a datetime instance.
if isinstance(stix_obj_property, datetime) and \
isinstance(self.value, six.string_types):
isinstance(self.value, str):
filter_value = stix2.utils.parse_into_datetime(self.value)
else:
filter_value = self.value

View File

@ -12,6 +12,8 @@ from stix2.parsing import parse
from stix2.utils import deduplicate
try:
from taxii2client import v20 as tcv20
from taxii2client import v21 as tcv21
from taxii2client.exceptions import ValidationError
_taxii2_client = True
except ImportError:
@ -33,9 +35,12 @@ class TAXIICollectionStore(DataStoreMixin):
side(retrieving data) and False for TAXIICollectionSink
side(pushing data). However, when parameter is supplied, it will
be applied to both TAXIICollectionSource/Sink.
items_per_page (int): How many STIX objects to request per call
to TAXII Server. The value can be tuned, but servers may override
if their internal limit is surpassed. Used by TAXIICollectionSource
"""
def __init__(self, collection, allow_custom=None):
def __init__(self, collection, allow_custom=None, items_per_page=5000):
if allow_custom is None:
allow_custom_source = True
allow_custom_sink = False
@ -43,7 +48,7 @@ class TAXIICollectionStore(DataStoreMixin):
allow_custom_sink = allow_custom_source = allow_custom
super(TAXIICollectionStore, self).__init__(
source=TAXIICollectionSource(collection, allow_custom=allow_custom_source),
source=TAXIICollectionSource(collection, allow_custom=allow_custom_source, items_per_page=items_per_page),
sink=TAXIICollectionSink(collection, allow_custom=allow_custom_sink),
)
@ -144,9 +149,12 @@ class TAXIICollectionSource(DataSource):
collection (taxii2.Collection): TAXII Collection instance
allow_custom (bool): Whether to allow custom STIX content to be
added to the FileSystemSink. Default: True
items_per_page (int): How many STIX objects to request per call
to TAXII Server. The value can be tuned, but servers may override
if their internal limit is surpassed.
"""
def __init__(self, collection, allow_custom=True):
def __init__(self, collection, allow_custom=True, items_per_page=5000):
super(TAXIICollectionSource, self).__init__()
if not _taxii2_client:
raise ImportError("taxii2client library is required for usage of TAXIICollectionSource")
@ -167,6 +175,7 @@ class TAXIICollectionSource(DataSource):
)
self.allow_custom = allow_custom
self.items_per_page = items_per_page
def get(self, stix_id, version=None, _composite_filters=None):
"""Retrieve STIX object from local/remote STIX Collection
@ -286,8 +295,12 @@ class TAXIICollectionSource(DataSource):
taxii_filters_dict = dict((f.property, f.value) for f in taxii_filters)
# query TAXII collection
all_data = []
try:
all_data = self.collection.get_objects(**taxii_filters_dict).get('objects', [])
paged_request = tcv21.as_pages if isinstance(self.collection, tcv21.Collection) else tcv20.as_pages
for resource in paged_request(self.collection.get_objects, per_request=self.items_per_page, **taxii_filters_dict):
all_data.extend(resource.get("objects", []))
# deduplicate data (before filtering as reduces wasted filtering)
all_data = deduplicate(all_data)

View File

@ -2,18 +2,10 @@
import copy
from .datastore import CompositeDataSource, DataStoreMixin
from .equivalence.graph import graphically_equivalent
from .equivalence.object import ( # noqa: F401
WEIGHTS, check_property_present, custom_pattern_based, exact_match,
list_reference_check, partial_external_reference_based, partial_list_based,
partial_location_distance, partial_string_based, partial_timestamp_based,
reference_check, semantically_equivalent,
)
from .equivalence.graph import graph_equivalence, graph_similarity
from .equivalence.object import object_equivalence, object_similarity
from .parsing import parse as _parse
# TODO: Remove all unused imports that now belong to the equivalence module in the next major release.
# Kept for backwards compatibility.
class ObjectFactory(object):
"""Easily create STIX objects with default values for certain properties.
@ -197,23 +189,37 @@ class Environment(DataStoreMixin):
return None
@staticmethod
def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
"""This method verifies if two objects of the same type are
semantically equivalent.
def object_similarity(
obj1, obj2, prop_scores={}, ds1=None, ds2=None,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a measure of how similar the two objects are.
Args:
obj1: A stix2 object instance
obj2: A stix2 object instance
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the semantic equivalence process
ds1 (optional): A DataStore object instance from which to pull related objects
ds2 (optional): A DataStore object instance from which to pull related objects
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns:
float: A number between 0.0 and 100.0 as a measurement of equivalence.
float: A number between 0.0 and 100.0 as a measurement of similarity.
Warning:
Object types need to have property weights defined for the equivalence process.
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.object` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
@ -222,21 +228,85 @@ class Environment(DataStoreMixin):
Note:
Default weight_dict:
.. include:: ../object_default_sem_eq_weights.rst
.. include:: ../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
return semantically_equivalent(obj1, obj2, prop_scores, **weight_dict)
return object_similarity(
obj1, obj2, prop_scores, ds1, ds2, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)
@staticmethod
def graphically_equivalent(ds1, ds2, prop_scores={}, **weight_dict):
"""This method verifies if two graphs are semantically equivalent.
def object_equivalence(
obj1, obj2, prop_scores={}, threshold=70, ds1=None, ds2=None,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a true/false value if two objects are semantically equivalent.
Internally, it calls the object_similarity function and compares it against the given
threshold value.
Args:
obj1: A stix2 object instance
obj2: A stix2 object instance
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both objects equivalent. This
value can be tuned.
ds1 (optional): A DataStore object instance from which to pull related objects
ds2 (optional): A DataStore object instance from which to pull related objects
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns:
bool: True if the result of the object similarity is greater than or equal to
the threshold value. False otherwise.
Warning:
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.object` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weight_dict:
.. include:: ../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
return object_equivalence(
obj1, obj2, prop_scores, threshold, ds1, ds2,
ignore_spec_version, versioning_checks, max_depth, **weight_dict
)
@staticmethod
def graph_similarity(
ds1, ds2, prop_scores={}, ignore_spec_version=False,
versioning_checks=False, max_depth=1, **weight_dict
):
"""This method returns a similarity score for two given graphs.
Each DataStore can contain a connected or disconnected graph and the
final result is weighted over the amount of objects we managed to compare.
This approach builds on top of the object-based semantic equivalence process
This approach builds on top of the object-based similarity process
and each comparison can return a value between 0 and 100.
Args:
@ -244,14 +314,23 @@ class Environment(DataStoreMixin):
ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the semantic equivalence process
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns:
float: A number between 0.0 and 100.0 as a measurement of equivalence.
float: A number between 0.0 and 100.0 as a measurement of similarity.
Warning:
Object types need to have property weights defined for the equivalence process.
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.graph` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
@ -260,11 +339,70 @@ class Environment(DataStoreMixin):
Note:
Default weight_dict:
.. include:: ../graph_default_sem_eq_weights.rst
.. include:: ../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
return graphically_equivalent(ds1, ds2, prop_scores, **weight_dict)
return graph_similarity(
ds1, ds2, prop_scores, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)
@staticmethod
def graph_equivalence(
ds1, ds2, prop_scores={}, threshold=70,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a true/false value if two graphs are semantically equivalent.
Internally, it calls the graph_similarity function and compares it against the given
threshold value.
Args:
ds1: A DataStore object instance representing your graph
ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both graphs equivalent. This
value can be tuned.
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns:
bool: True if the result of the graph similarity is greater than or equal to
the threshold value. False otherwise.
Warning:
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.graph` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weight_dict:
.. include:: ../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
return graph_equivalence(
ds1, ds2, prop_scores, threshold, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)

View File

@ -1,4 +1,4 @@
"""Python APIs for STIX 2 Semantic Equivalence.
"""Python APIs for STIX 2 Semantic Equivalence and Similarity.
.. autosummary::
:toctree: equivalence

View File

@ -1,19 +1,80 @@
"""Python APIs for STIX 2 Graph-based Semantic Equivalence."""
"""Python APIs for STIX 2 Graph-based Semantic Equivalence and Similarity."""
import logging
from ..object import (
WEIGHTS, exact_match, list_reference_check, partial_string_based,
partial_timestamp_based, reference_check, semantically_equivalent,
WEIGHTS, _bucket_per_type, _object_pairs, object_similarity,
)
logger = logging.getLogger(__name__)
def graphically_equivalent(ds1, ds2, prop_scores={}, **weight_dict):
"""This method verifies if two graphs are semantically equivalent.
def graph_equivalence(
ds1, ds2, prop_scores={}, threshold=70,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a true/false value if two graphs are semantically equivalent.
Internally, it calls the graph_similarity function and compares it against the given
threshold value.
Args:
ds1: A DataStore object instance representing your graph
ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both graphs equivalent. This
value can be tuned.
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns:
bool: True if the result of the graph similarity is greater than or equal to
the threshold value. False otherwise.
Warning:
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.graph` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weight_dict:
.. include:: ../../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
similarity_result = graph_similarity(
ds1, ds2, prop_scores, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)
if similarity_result >= threshold:
return True
return False
def graph_similarity(
ds1, ds2, prop_scores={}, ignore_spec_version=False,
versioning_checks=False, max_depth=1, **weight_dict
):
"""This method returns a similarity score for two given graphs.
Each DataStore can contain a connected or disconnected graph and the
final result is weighted over the amount of objects we managed to compare.
This approach builds on top of the object-based semantic equivalence process
This approach builds on top of the object-based similarity process
and each comparison can return a value between 0 and 100.
Args:
@ -21,117 +82,97 @@ def graphically_equivalent(ds1, ds2, prop_scores={}, **weight_dict):
ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the semantic equivalence process
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns:
float: A number between 0.0 and 100.0 as a measurement of equivalence.
float: A number between 0.0 and 100.0 as a measurement of similarity.
Warning:
Object types need to have property weights defined for the equivalence process.
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.graph` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weights_dict:
Default weight_dict:
.. include:: ../../graph_default_sem_eq_weights.rst
.. include:: ../../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
weights = GRAPH_WEIGHTS.copy()
results = {}
similarity_score = 0
weights = WEIGHTS.copy()
if weight_dict:
weights.update(weight_dict)
results = {}
depth = weights["_internal"]["max_depth"]
weights["_internal"] = {
"ignore_spec_version": ignore_spec_version,
"versioning_checks": versioning_checks,
"ds1": ds1,
"ds2": ds2,
"max_depth": max_depth,
}
graph1 = ds1.query([])
graph2 = ds2.query([])
if max_depth <= 0:
raise ValueError("'max_depth' must be greater than 0")
graph1.sort(key=lambda x: x["type"])
graph2.sort(key=lambda x: x["type"])
pairs = _object_pairs(
_bucket_per_type(ds1.query([])),
_bucket_per_type(ds2.query([])),
weights,
)
if len(graph1) < len(graph2):
weights["_internal"]["ds1"] = ds1
weights["_internal"]["ds2"] = ds2
g1 = graph1
g2 = graph2
else:
weights["_internal"]["ds1"] = ds2
weights["_internal"]["ds2"] = ds1
g1 = graph2
g2 = graph1
logger.debug("Starting graph similarity process between DataStores: '%s' and '%s'", ds1.id, ds2.id)
for object1, object2 in pairs:
iprop_score = {}
object1_id = object1["id"]
object2_id = object2["id"]
for object1 in g1:
for object2 in g2:
if object1["type"] == object2["type"] and object1["type"] in weights:
iprop_score = {}
result = semantically_equivalent(object1, object2, iprop_score, **weights)
objects1_id = object1["id"]
weights["_internal"]["max_depth"] = depth
result = object_similarity(
object1, object2, iprop_score, ds1, ds2,
ignore_spec_version, versioning_checks,
max_depth, **weights
)
if objects1_id not in results:
results[objects1_id] = {"matched": object2["id"], "prop_score": iprop_score, "value": result}
elif result > results[objects1_id]["value"]:
results[objects1_id] = {"matched": object2["id"], "prop_score": iprop_score, "value": result}
if object1_id not in results:
results[object1_id] = {"lhs": object1_id, "rhs": object2_id, "prop_score": iprop_score, "value": result}
elif result > results[object1_id]["value"]:
results[object1_id] = {"lhs": object1_id, "rhs": object2_id, "prop_score": iprop_score, "value": result}
if object2_id not in results:
results[object2_id] = {"lhs": object2_id, "rhs": object1_id, "prop_score": iprop_score, "value": result}
elif result > results[object2_id]["value"]:
results[object2_id] = {"lhs": object2_id, "rhs": object1_id, "prop_score": iprop_score, "value": result}
equivalence_score = 0
matching_score = sum(x["value"] for x in results.values())
sum_weights = len(results) * 100.0
if sum_weights > 0:
equivalence_score = (matching_score / sum_weights) * 100
len_pairs = len(results)
if len_pairs > 0:
similarity_score = matching_score / len_pairs
prop_scores["matching_score"] = matching_score
prop_scores["sum_weights"] = sum_weights
prop_scores["len_pairs"] = len_pairs
prop_scores["summary"] = results
logger.debug(
"DONE\t\tSUM_WEIGHT: %.2f\tMATCHING_SCORE: %.2f\t SCORE: %.2f",
sum_weights,
"DONE\t\tLEN_PAIRS: %.2f\tMATCHING_SCORE: %.2f\t SIMILARITY_SCORE: %.2f",
len_pairs,
matching_score,
equivalence_score,
similarity_score,
)
return equivalence_score
# default weights used for the graph semantic equivalence process
GRAPH_WEIGHTS = WEIGHTS.copy()
GRAPH_WEIGHTS.update({
"grouping": {
"name": (20, partial_string_based),
"context": (20, partial_string_based),
"object_refs": (60, list_reference_check),
},
"relationship": {
"relationship_type": (20, exact_match),
"source_ref": (40, reference_check),
"target_ref": (40, reference_check),
},
"report": {
"name": (30, partial_string_based),
"published": (10, partial_timestamp_based),
"object_refs": (60, list_reference_check),
"tdelta": 1, # One day interval
},
"sighting": {
"first_seen": (5, partial_timestamp_based),
"last_seen": (5, partial_timestamp_based),
"sighting_of_ref": (40, reference_check),
"observed_data_refs": (20, list_reference_check),
"where_sighted_refs": (20, list_reference_check),
"summary": (10, exact_match),
},
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"ds1": None,
"ds2": None,
"max_depth": 1,
},
}) # :autodoc-skip:
return similarity_score

View File

@ -1,40 +1,118 @@
"""Python APIs for STIX 2 Object-based Semantic Equivalence."""
"""Python APIs for STIX 2 Object-based Semantic Equivalence and Similarity."""
import collections
import itertools
import logging
import time
from ...datastore import Filter
from ...datastore import DataSource, DataStoreMixin, Filter
from ...utils import STIXdatetime, parse_into_datetime
from ..pattern import equivalent_patterns
logger = logging.getLogger(__name__)
def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
"""This method verifies if two objects of the same type are
semantically equivalent.
def object_equivalence(
obj1, obj2, prop_scores={}, threshold=70, ds1=None,
ds2=None, ignore_spec_version=False,
versioning_checks=False, max_depth=1, **weight_dict
):
"""This method returns a true/false value if two objects are semantically equivalent.
Internally, it calls the object_similarity function and compares it against the given
threshold value.
Args:
obj1: A stix2 object instance
obj2: A stix2 object instance
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the semantic equivalence process
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both objects equivalent. This
value can be tuned.
ds1 (optional): A DataStore object instance from which to pull related objects
ds2 (optional): A DataStore object instance from which to pull related objects
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns:
float: A number between 0.0 and 100.0 as a measurement of equivalence.
bool: True if the result of the object similarity is greater than or equal to
the threshold value. False otherwise.
Warning:
Object types need to have property weights defined for the equivalence process.
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.object` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weights_dict:
Default weight_dict:
.. include:: ../../object_default_sem_eq_weights.rst
.. include:: ../../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
similarity_result = object_similarity(
obj1, obj2, prop_scores, ds1, ds2, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)
if similarity_result >= threshold:
return True
return False
def object_similarity(
obj1, obj2, prop_scores={}, ds1=None, ds2=None,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a measure of similarity depending on how
similar the two objects are.
Args:
obj1: A stix2 object instance
obj2: A stix2 object instance
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
ds1 (optional): A DataStore object instance from which to pull related objects
ds2 (optional): A DataStore object instance from which to pull related objects
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity.
Warning:
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.object` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weight_dict:
.. include:: ../../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
@ -46,8 +124,15 @@ def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
if weight_dict:
weights.update(weight_dict)
weights["_internal"] = {
"ignore_spec_version": ignore_spec_version,
"versioning_checks": versioning_checks,
"ds1": ds1,
"ds2": ds2,
"max_depth": max_depth,
}
type1, type2 = obj1["type"], obj2["type"]
ignore_spec_version = weights["_internal"]["ignore_spec_version"]
if type1 != type2:
raise ValueError('The objects to compare must be of the same type!')
@ -58,13 +143,13 @@ def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
try:
weights[type1]
except KeyError:
logger.warning("'%s' type has no 'weights' dict specified & thus no semantic equivalence method to call!", type1)
logger.warning("'%s' type has no 'weights' dict specified & thus no object similarity method to call!", type1)
sum_weights = matching_score = 0
else:
try:
method = weights[type1]["method"]
except KeyError:
logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"])
logger.debug("Starting object similarity process between: '%s' and '%s'", obj1["id"], obj2["id"])
matching_score = 0.0
sum_weights = 0.0
@ -72,6 +157,7 @@ def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
if check_property_present(prop, obj1, obj2):
w = weights[type1][prop][0]
comp_funct = weights[type1][prop][1]
prop_scores[prop] = {}
if comp_funct == partial_timestamp_based:
contributing_score = w * comp_funct(obj1[prop], obj2[prop], weights[type1]["tdelta"])
@ -79,30 +165,36 @@ def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
threshold = weights[type1]["threshold"]
contributing_score = w * comp_funct(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], threshold)
elif comp_funct == reference_check or comp_funct == list_reference_check:
max_depth = weights["_internal"]["max_depth"]
if max_depth < 0:
continue # prevent excessive recursion
if max_depth > 0:
weights["_internal"]["max_depth"] = max_depth - 1
ds1, ds2 = weights["_internal"]["ds1"], weights["_internal"]["ds2"]
if _datastore_check(ds1, ds2):
contributing_score = w * comp_funct(obj1[prop], obj2[prop], ds1, ds2, **weights)
elif comp_funct == reference_check:
comp_funct = exact_match
contributing_score = w * comp_funct(obj1[prop], obj2[prop])
elif comp_funct == list_reference_check:
comp_funct = partial_list_based
contributing_score = w * comp_funct(obj1[prop], obj2[prop])
prop_scores[prop]["check_type"] = comp_funct.__name__
else:
weights["_internal"]["max_depth"] -= 1
ds1, ds2 = weights["_internal"]["ds1"], weights["_internal"]["ds2"]
contributing_score = w * comp_funct(obj1[prop], obj2[prop], ds1, ds2, **weights)
continue # prevent excessive recursion
weights["_internal"]["max_depth"] = max_depth
else:
contributing_score = w * comp_funct(obj1[prop], obj2[prop])
sum_weights += w
matching_score += contributing_score
prop_scores[prop] = {
"weight": w,
"contributing_score": contributing_score,
}
prop_scores[prop]["weight"] = w
prop_scores[prop]["contributing_score"] = contributing_score
logger.debug("'%s' check -- weight: %s, contributing score: %s", prop, w, contributing_score)
prop_scores["matching_score"] = matching_score
prop_scores["sum_weights"] = sum_weights
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
else:
logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"])
logger.debug("Starting object similarity process between: '%s' and '%s'", obj1["id"], obj2["id"])
try:
matching_score, sum_weights = method(obj1, obj2, prop_scores, **weights[type1])
except TypeError:
@ -119,7 +211,7 @@ def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
def check_property_present(prop, obj1, obj2):
"""Helper method checks if a property is present on both objects."""
if prop == "longitude_latitude":
if all(x in obj1 and x in obj2 for x in ['latitude', 'longitude']):
if all(x in obj1 and x in obj2 for x in ('latitude', 'longitude')):
return True
elif prop in obj1 and prop in obj2:
return True
@ -150,7 +242,9 @@ def partial_timestamp_based(t1, t2, tdelta):
def partial_list_based(l1, l2):
"""Performs a partial list matching via finding the intersection between common values.
"""Performs a partial list matching via finding the intersection between
common values. Repeated values are counted only once. This method can be
used for *_refs equality checks when de-reference is not possible.
Args:
l1: A list of values.
@ -167,7 +261,8 @@ def partial_list_based(l1, l2):
def exact_match(val1, val2):
"""Performs an exact value match based on two values
"""Performs an exact value match based on two values. This method can be
used for *_ref equality check when de-reference is not possible.
Args:
val1: A value suitable for an equality test.
@ -215,12 +310,12 @@ def custom_pattern_based(pattern1, pattern2):
return equivalent_patterns(pattern1, pattern2)
def partial_external_reference_based(refs1, refs2):
def partial_external_reference_based(ext_refs1, ext_refs2):
"""Performs a matching on External References.
Args:
refs1: A list of external references.
refs2: A list of external references.
ext_refs1: A list of external references.
ext_refs2: A list of external references.
Returns:
float: Number between 0.0 and 1.0 depending on matches.
@ -229,51 +324,47 @@ def partial_external_reference_based(refs1, refs2):
allowed = {"veris", "cve", "capec", "mitre-attack"}
matches = 0
if len(refs1) >= len(refs2):
l1 = refs1
l2 = refs2
else:
l1 = refs2
l2 = refs1
ref_pairs = itertools.chain(
itertools.product(ext_refs1, ext_refs2),
)
for ext_ref1 in l1:
for ext_ref2 in l2:
sn_match = False
ei_match = False
url_match = False
source_name = None
for ext_ref1, ext_ref2 in ref_pairs:
sn_match = False
ei_match = False
url_match = False
source_name = None
if check_property_present("source_name", ext_ref1, ext_ref2):
if ext_ref1["source_name"] == ext_ref2["source_name"]:
source_name = ext_ref1["source_name"]
sn_match = True
if check_property_present("external_id", ext_ref1, ext_ref2):
if ext_ref1["external_id"] == ext_ref2["external_id"]:
ei_match = True
if check_property_present("url", ext_ref1, ext_ref2):
if ext_ref1["url"] == ext_ref2["url"]:
url_match = True
if check_property_present("source_name", ext_ref1, ext_ref2):
if ext_ref1["source_name"] == ext_ref2["source_name"]:
source_name = ext_ref1["source_name"]
sn_match = True
if check_property_present("external_id", ext_ref1, ext_ref2):
if ext_ref1["external_id"] == ext_ref2["external_id"]:
ei_match = True
if check_property_present("url", ext_ref1, ext_ref2):
if ext_ref1["url"] == ext_ref2["url"]:
url_match = True
# Special case: if source_name is a STIX defined name and either
# external_id or url match then its a perfect match and other entries
# can be ignored.
if sn_match and (ei_match or url_match) and source_name in allowed:
result = 1.0
logger.debug(
"--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
refs1, refs2, result,
)
return result
# Special case: if source_name is a STIX defined name and either
# external_id or url match then its a perfect match and other entries
# can be ignored.
if sn_match and (ei_match or url_match) and source_name in allowed:
result = 1.0
logger.debug(
"--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
ext_refs1, ext_refs2, result,
)
return result
# Regular check. If the source_name (not STIX-defined) or external_id or
# url matches then we consider the entry a match.
if (sn_match or ei_match or url_match) and source_name not in allowed:
matches += 1
# Regular check. If the source_name (not STIX-defined) or external_id or
# url matches then we consider the entry a match.
if (sn_match or ei_match or url_match) and source_name not in allowed:
matches += 1
result = matches / max(len(refs1), len(refs2))
result = matches / max(len(ext_refs1), len(ext_refs2))
logger.debug(
"--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
refs1, refs2, result,
ext_refs1, ext_refs2, result,
)
return result
@ -304,19 +395,30 @@ def partial_location_distance(lat1, long1, lat2, long2, threshold):
def _versioned_checks(ref1, ref2, ds1, ds2, **weights):
"""Checks multiple object versions if present in graph.
Maximizes for the semantic equivalence score of a particular version."""
Maximizes for the similarity score of a particular version."""
results = {}
objects1 = ds1.query([Filter("id", "=", ref1)])
objects2 = ds2.query([Filter("id", "=", ref2)])
if len(objects1) > 0 and len(objects2) > 0:
for o1 in objects1:
for o2 in objects2:
result = semantically_equivalent(o1, o2, **weights)
if ref1 not in results:
results[ref1] = {"matched": ref2, "value": result}
elif result > results[ref1]["value"]:
results[ref1] = {"matched": ref2, "value": result}
pairs = _object_pairs(
_bucket_per_type(ds1.query([Filter("id", "=", ref1)])),
_bucket_per_type(ds2.query([Filter("id", "=", ref2)])),
weights,
)
ignore_spec_version = weights["_internal"]["ignore_spec_version"]
versioning_checks = weights["_internal"]["versioning_checks"]
max_depth = weights["_internal"]["max_depth"]
for object1, object2 in pairs:
result = object_similarity(
object1, object2, ds1=ds1, ds2=ds2,
ignore_spec_version=ignore_spec_version,
versioning_checks=versioning_checks,
max_depth=max_depth, **weights,
)
if ref1 not in results:
results[ref1] = {"matched": ref2, "value": result}
elif result > results[ref1]["value"]:
results[ref1] = {"matched": ref2, "value": result}
result = results.get(ref1, {}).get("value", 0.0)
logger.debug(
"--\t\t_versioned_checks '%s' '%s'\tresult: '%s'",
@ -326,18 +428,26 @@ def _versioned_checks(ref1, ref2, ds1, ds2, **weights):
def reference_check(ref1, ref2, ds1, ds2, **weights):
"""For two references, de-reference the object and perform object-based
semantic equivalence. The score influences the result of an edge check."""
"""For two references, de-reference the object and perform object_similarity.
The score influences the result of an edge check."""
type1, type2 = ref1.split("--")[0], ref2.split("--")[0]
result = 0.0
if type1 == type2:
if weights["_internal"]["versioning_checks"]:
if type1 == type2 and type1 in weights:
ignore_spec_version = weights["_internal"]["ignore_spec_version"]
versioning_checks = weights["_internal"]["versioning_checks"]
max_depth = weights["_internal"]["max_depth"]
if versioning_checks:
result = _versioned_checks(ref1, ref2, ds1, ds2, **weights) / 100.0
else:
o1, o2 = ds1.get(ref1), ds2.get(ref2)
if o1 and o2:
result = semantically_equivalent(o1, o2, **weights) / 100.0
result = object_similarity(
o1, o2, ds1=ds1, ds2=ds2,
ignore_spec_version=ignore_spec_version,
versioning_checks=versioning_checks,
max_depth=max_depth, **weights,
) / 100.0
logger.debug(
"--\t\treference_check '%s' '%s'\tresult: '%s'",
@ -348,38 +458,35 @@ def reference_check(ref1, ref2, ds1, ds2, **weights):
def list_reference_check(refs1, refs2, ds1, ds2, **weights):
"""For objects that contain multiple references (i.e., object_refs) perform
the same de-reference procedure and perform object-based semantic equivalence.
the same de-reference procedure and perform object_similarity.
The score influences the objects containing these references. The result is
weighted on the amount of unique objects that could 1) be de-referenced 2) """
results = {}
if len(refs1) >= len(refs2):
l1 = refs1
l2 = refs2
b1 = ds1
b2 = ds2
else:
l1 = refs2
l2 = refs1
b1 = ds2
b2 = ds1
l1.sort()
l2.sort()
pairs = _object_pairs(
_bucket_per_type(refs1, "id-split"),
_bucket_per_type(refs2, "id-split"),
weights,
)
for ref1 in l1:
for ref2 in l2:
type1, type2 = ref1.split("--")[0], ref2.split("--")[0]
if type1 == type2:
score = reference_check(ref1, ref2, b1, b2, **weights) * 100.0
for ref1, ref2 in pairs:
type1, type2 = ref1.split("--")[0], ref2.split("--")[0]
if type1 == type2:
score = reference_check(ref1, ref2, ds1, ds2, **weights)
if ref1 not in results:
results[ref1] = {"matched": ref2, "value": score}
elif score > results[ref1]["value"]:
results[ref1] = {"matched": ref2, "value": score}
if ref1 not in results:
results[ref1] = {"matched": ref2, "value": score}
elif score > results[ref1]["value"]:
results[ref1] = {"matched": ref2, "value": score}
if ref2 not in results:
results[ref2] = {"matched": ref1, "value": score}
elif score > results[ref2]["value"]:
results[ref2] = {"matched": ref1, "value": score}
result = 0.0
total_sum = sum(x["value"] for x in results.values())
max_score = len(results) * 100.0
max_score = len(results)
if max_score > 0:
result = total_sum / max_score
@ -391,7 +498,43 @@ def list_reference_check(refs1, refs2, ds1, ds2, **weights):
return result
# default weights used for the semantic equivalence process
def _datastore_check(ds1, ds2):
if (
issubclass(ds1.__class__, (DataStoreMixin, DataSource)) or
issubclass(ds2.__class__, (DataStoreMixin, DataSource))
):
return True
return False
def _bucket_per_type(graph, mode="type"):
"""Given a list of objects or references, bucket them by type.
Depending on the list type: extract from 'type' property or using
the 'id'.
"""
buckets = collections.defaultdict(list)
if mode == "type":
[buckets[obj["type"]].append(obj) for obj in graph]
elif mode == "id-split":
[buckets[obj.split("--")[0]].append(obj) for obj in graph]
return buckets
def _object_pairs(graph1, graph2, weights):
"""Returns a generator with the product of the comparable
objects for the graph similarity process. It determines
objects in common between graphs and objects with weights.
"""
types_in_common = set(graph1.keys()).intersection(graph2.keys())
testable_types = types_in_common.intersection(weights.keys())
return itertools.chain.from_iterable(
itertools.product(graph1[stix_type], graph2[stix_type])
for stix_type in testable_types
)
# default weights used for the similarity process
WEIGHTS = {
"attack-pattern": {
"name": (30, partial_string_based),
@ -405,11 +548,20 @@ WEIGHTS = {
"name": (60, partial_string_based),
"external_references": (40, partial_external_reference_based),
},
"grouping": {
"name": (20, partial_string_based),
"context": (20, partial_string_based),
"object_refs": (60, list_reference_check),
},
"identity": {
"name": (60, partial_string_based),
"identity_class": (20, exact_match),
"sectors": (20, partial_list_based),
},
"incident": {
"name": (30, partial_string_based),
"external_references": (70, partial_external_reference_based),
},
"indicator": {
"indicator_types": (15, partial_list_based),
"pattern": (80, custom_pattern_based),
@ -436,6 +588,25 @@ WEIGHTS = {
"definition": (60, exact_match),
"definition_type": (20, exact_match),
},
"relationship": {
"relationship_type": (20, exact_match),
"source_ref": (40, reference_check),
"target_ref": (40, reference_check),
},
"report": {
"name": (30, partial_string_based),
"published": (10, partial_timestamp_based),
"object_refs": (60, list_reference_check),
"tdelta": 1, # One day interval
},
"sighting": {
"first_seen": (5, partial_timestamp_based),
"last_seen": (5, partial_timestamp_based),
"sighting_of_ref": (40, reference_check),
"observed_data_refs": (20, list_reference_check),
"where_sighted_refs": (20, list_reference_check),
"summary": (10, exact_match),
},
"threat-actor": {
"name": (60, partial_string_based),
"threat_actor_types": (20, partial_list_based),
@ -449,7 +620,4 @@ WEIGHTS = {
"name": (30, partial_string_based),
"external_references": (70, partial_external_reference_based),
},
"_internal": {
"ignore_spec_version": False,
},
} # :autodoc-skip:

View File

@ -14,17 +14,17 @@ from ...version import DEFAULT_VERSION
from .compare.observation import observation_expression_cmp
from .transform import ChainTransformer, SettleTransformer
from .transform.observation import (
AbsorptionTransformer, CanonicalizeComparisonExpressionsTransformer,
DNFTransformer, FlattenTransformer, OrderDedupeTransformer,
AbsorptionTransformer, DNFTransformer, FlattenTransformer,
NormalizeComparisonExpressionsTransformer, OrderDedupeTransformer,
)
# Lazy-initialize
_pattern_canonicalizer = None
_pattern_normalizer = None
def _get_pattern_canonicalizer():
def _get_pattern_normalizer():
"""
Get a canonicalization transformer for STIX patterns.
Get a normalization transformer for STIX patterns.
Returns:
The transformer
@ -33,11 +33,11 @@ def _get_pattern_canonicalizer():
# The transformers are either stateless or contain no state which changes
# with each use. So we can setup the transformers once and keep reusing
# them.
global _pattern_canonicalizer
global _pattern_normalizer
if not _pattern_canonicalizer:
canonicalize_comp_expr = \
CanonicalizeComparisonExpressionsTransformer()
if not _pattern_normalizer:
normalize_comp_expr = \
NormalizeComparisonExpressionsTransformer()
obs_expr_flatten = FlattenTransformer()
obs_expr_order = OrderDedupeTransformer()
@ -49,12 +49,12 @@ def _get_pattern_canonicalizer():
obs_dnf = DNFTransformer()
_pattern_canonicalizer = ChainTransformer(
canonicalize_comp_expr,
_pattern_normalizer = ChainTransformer(
normalize_comp_expr,
obs_settle_simplify, obs_dnf, obs_settle_simplify,
)
return _pattern_canonicalizer
return _pattern_normalizer
def equivalent_patterns(pattern1, pattern2, stix_version=DEFAULT_VERSION):
@ -77,11 +77,11 @@ def equivalent_patterns(pattern1, pattern2, stix_version=DEFAULT_VERSION):
pattern2, version=stix_version,
)
pattern_canonicalizer = _get_pattern_canonicalizer()
canon_patt1, _ = pattern_canonicalizer.transform(patt_ast1)
canon_patt2, _ = pattern_canonicalizer.transform(patt_ast2)
pattern_normalizer = _get_pattern_normalizer()
norm_patt1, _ = pattern_normalizer.transform(patt_ast1)
norm_patt2, _ = pattern_normalizer.transform(patt_ast2)
result = observation_expression_cmp(canon_patt1, canon_patt2)
result = observation_expression_cmp(norm_patt1, norm_patt2)
return result == 0
@ -92,7 +92,7 @@ def find_equivalent_patterns(
"""
Find patterns from a sequence which are equivalent to a given pattern.
This is more efficient than using equivalent_patterns() in a loop, because
it doesn't re-canonicalize the search pattern over and over. This works
it doesn't re-normalize the search pattern over and over. This works
on an input iterable and is implemented as a generator of matches. So you
can "stream" patterns in and matching patterns will be streamed out.
@ -109,8 +109,8 @@ def find_equivalent_patterns(
search_pattern, version=stix_version,
)
pattern_canonicalizer = _get_pattern_canonicalizer()
canon_search_pattern_ast, _ = pattern_canonicalizer.transform(
pattern_normalizer = _get_pattern_normalizer()
norm_search_pattern_ast, _ = pattern_normalizer.transform(
search_pattern_ast,
)
@ -118,10 +118,10 @@ def find_equivalent_patterns(
pattern_ast = pattern_visitor.create_pattern_object(
pattern, version=stix_version,
)
canon_pattern_ast, _ = pattern_canonicalizer.transform(pattern_ast)
norm_pattern_ast, _ = pattern_normalizer.transform(pattern_ast)
result = observation_expression_cmp(
canon_search_pattern_ast, canon_pattern_ast,
norm_search_pattern_ast, norm_pattern_ast,
)
if result == 0:

View File

@ -346,7 +346,7 @@ def comparison_expression_cmp(expr1, expr2):
"""
Compare two comparison expressions. This is sensitive to the order of the
expressions' sub-components. To achieve an order-insensitive comparison,
the ASTs must be canonically ordered first.
the sub-component ASTs must be ordered first.
Args:
expr1: The first comparison expression

View File

@ -62,7 +62,7 @@ def observation_expression_cmp(expr1, expr2):
"""
Compare two observation expression ASTs. This is sensitive to the order of
the expressions' sub-components. To achieve an order-insensitive
comparison, the ASTs must be canonically ordered first.
comparison, the sub-component ASTs must be ordered first.
Args:
expr1: The first observation expression

View File

@ -46,7 +46,7 @@ def _dupe_ast(ast):
elif isinstance(ast, _ComparisonExpression):
# Change this to create a dupe, if we ever need to change simple
# comparison expressions as part of canonicalization.
# comparison expressions as part of normalization.
result = ast
else:
@ -147,9 +147,8 @@ class OrderDedupeTransformer(
ComparisonExpressionTransformer,
):
"""
Canonically order the children of all nodes in the AST. Because the
deduping algorithm is based on sorted data, this transformation also does
deduping.
Order the children of all nodes in the AST. Because the deduping algorithm
is based on sorted data, this transformation also does deduping.
E.g.:
A and A => A

View File

@ -234,7 +234,7 @@ class OrderDedupeTransformer(
ObservationExpressionTransformer,
):
"""
Canonically order AND/OR expressions, and dedupe ORs. E.g.:
Order AND/OR expressions, and dedupe ORs. E.g.:
A or A => A
B or A => A or B
@ -282,6 +282,7 @@ class AbsorptionTransformer(
A or (A and B) = A
A or (A followedby B) = A
A or (B followedby A) = A
Other variants do not hold for observation expressions.
"""
@ -435,28 +436,35 @@ class DNFTransformer(ObservationExpressionTransformer):
A and (B or C) => (A and B) or (A and C)
A followedby (B or C) => (A followedby B) or (A followedby C)
(A or B) followedby C => (A followedby C) or (B followedby C)
"""
def __transform(self, ast):
root_type = type(ast) # will be AST class for AND or FOLLOWEDBY
changed = False
or_children = []
other_children = []
for child in ast.operands:
if isinstance(child, OrObservationExpression):
or_children.append(child.operands)
else:
other_children.append(child)
# If no OR children, nothing to do
if any(
isinstance(child, OrObservationExpression)
for child in ast.operands
):
# When we distribute FOLLOWEDBY over OR, it is important to
# preserve the original FOLLOWEDBY order! We don't need to do that
# for AND, but we do it anyway because it doesn't hurt, and we can
# use the same code for both.
iterables = []
for child in ast.operands:
if isinstance(child, OrObservationExpression):
iterables.append(child.operands)
else:
iterables.append((child,))
if or_children:
root_type = type(ast) # will be AST class for AND or FOLLOWEDBY
distributed_children = [
root_type([
_dupe_ast(sub_ast) for sub_ast in itertools.chain(
other_children, prod_seq,
prod_seq,
)
])
for prod_seq in itertools.product(*or_children)
for prod_seq in itertools.product(*iterables)
]
# Need to recursively continue to distribute AND/FOLLOWEDBY over OR
@ -470,6 +478,7 @@ class DNFTransformer(ObservationExpressionTransformer):
else:
result = ast
changed = False
return result, changed
@ -480,11 +489,11 @@ class DNFTransformer(ObservationExpressionTransformer):
return self.__transform(ast)
class CanonicalizeComparisonExpressionsTransformer(
class NormalizeComparisonExpressionsTransformer(
ObservationExpressionTransformer,
):
"""
Canonicalize all comparison expressions.
Normalize all comparison expressions.
"""
def __init__(self):
comp_flatten = CFlattenTransformer()
@ -495,13 +504,13 @@ class CanonicalizeComparisonExpressionsTransformer(
comp_special = SpecialValueCanonicalization()
comp_dnf = CDNFTransformer()
self.__comp_canonicalize = ChainTransformer(
self.__comp_normalize = ChainTransformer(
comp_special, settle_simplify, comp_dnf, settle_simplify,
)
def transform_observation(self, ast):
comp_expr = ast.operand
canon_comp_expr, changed = self.__comp_canonicalize.transform(comp_expr)
ast.operand = canon_comp_expr
norm_comp_expr, changed = self.__comp_normalize.transform(comp_expr)
ast.operand = norm_comp_expr
return ast, changed

View File

@ -1,5 +1,5 @@
"""
Some simple comparison expression canonicalization functions.
Some simple comparison expression normalization functions.
"""
import socket

View File

@ -175,7 +175,14 @@ class ImmutableError(STIXError):
return msg.format(self)
class UnmodifiablePropertyError(STIXError):
class VersioningError(STIXError):
"""
Base class for object versioning errors.
"""
pass
class UnmodifiablePropertyError(VersioningError):
"""Attempted to modify an unmodifiable property of object when creating a new version."""
def __init__(self, unchangable_properties):
@ -187,6 +194,40 @@ class UnmodifiablePropertyError(STIXError):
return msg.format(", ".join(self.unchangable_properties))
class TypeNotVersionableError(VersioningError):
"""
An object couldn't be versioned because it lacked the versioning properties
and its type does not support them.
"""
def __init__(self, obj):
if isinstance(obj, dict):
type_name = obj.get("type")
else:
# try standard attribute of _STIXBase subclasses/instances
type_name = getattr(obj, "_type", None)
self.object = obj
msg = "Object type{}is not versionable. Try a dictionary or " \
"instance of an SDO or SRO class.".format(
" '{}' ".format(type_name) if type_name else " ",
)
super().__init__(msg)
class ObjectNotVersionableError(VersioningError):
"""
An object's type supports versioning, but the object couldn't be versioned
because it lacked sufficient versioning properties.
"""
def __init__(self, obj):
self.object = obj
msg = "Creating a new object version requires at least the 'created'" \
" property: " + str(obj)
super().__init__(msg)
class RevokeError(STIXError):
"""Attempted an operation on a revoked object."""

View File

@ -2,8 +2,6 @@
import collections
import six
from stix2 import exceptions, utils
@ -129,7 +127,7 @@ def compress_markings(granular_markings):
{'marking_ref': item, 'selectors': sorted(selectors)}
if utils.is_marking(item) else
{'lang': item, 'selectors': sorted(selectors)}
for item, selectors in six.iteritems(map_)
for item, selectors in map_.items()
]
return compressed
@ -230,7 +228,7 @@ def iterpath(obj, path=None):
if path is None:
path = []
for varname, varobj in iter(sorted(six.iteritems(obj))):
for varname, varobj in iter(sorted(obj.items())):
path.append(varname)
yield (path, varobj)

View File

@ -3,7 +3,6 @@
import importlib
import inspect
from six import text_type
from stix2patterns.exceptions import ParseException
from stix2patterns.grammars.STIXPatternParser import TerminalNode
from stix2patterns.v20.grammars.STIXPatternParser import \
@ -263,7 +262,7 @@ class STIXPatternVisitorForSTIX2():
property_path.append(
self.instantiate(
"ListObjectPathComponent",
current.property_name if isinstance(current, BasicObjectPathComponent) else text_type(current),
current.property_name if isinstance(current, BasicObjectPathComponent) else str(current),
next.value,
),
)
@ -286,7 +285,7 @@ class STIXPatternVisitorForSTIX2():
if isinstance(first_component, TerminalNode):
step = first_component.getText()
else:
step = text_type(first_component)
step = str(first_component)
# if step.endswith("_ref"):
# return stix2.ReferenceObjectPathComponent(step)
# else:

View File

@ -5,8 +5,6 @@ import binascii
import datetime
import re
import six
from .utils import parse_into_datetime
@ -15,7 +13,7 @@ def escape_quotes_and_backslashes(s):
def quote_if_needed(x):
if isinstance(x, six.string_types):
if isinstance(x, str):
if x.find("-") != -1:
if not x.startswith("'"):
return "'" + x + "'"

View File

@ -7,8 +7,6 @@ import inspect
import re
import uuid
from six import string_types, text_type
from .base import _STIXBase
from .exceptions import (
CustomContentError, DictionaryKeyError, MissingPropertiesError,
@ -236,7 +234,7 @@ class ListProperty(Property):
except TypeError:
raise ValueError("must be an iterable.")
if isinstance(value, (_STIXBase, string_types)):
if isinstance(value, (_STIXBase, str)):
value = [value]
if isinstance(self.contained, Property):
@ -277,8 +275,8 @@ class StringProperty(Property):
super(StringProperty, self).__init__(**kwargs)
def clean(self, value):
if not isinstance(value, string_types):
return text_type(value)
if not isinstance(value, str):
return str(value)
return value

View File

@ -2,6 +2,7 @@
import copy
import datetime as dt
import io
import simplejson as json
@ -64,6 +65,37 @@ def serialize(obj, pretty=False, include_optional_defaults=False, **kwargs):
Returns:
str: The serialized JSON object.
Note:
The argument ``pretty=True`` will output the STIX object following
spec order. Using this argument greatly impacts object serialization
performance. If your use case is centered across machine-to-machine
operation it is recommended to set ``pretty=False``.
When ``pretty=True`` the following key-value pairs will be added or
overridden: indent=4, separators=(",", ": "), item_sort_key=sort_by.
"""
with io.StringIO() as fp:
fp_serialize(obj, fp, pretty, include_optional_defaults, **kwargs)
return fp.getvalue()
def fp_serialize(obj, fp, pretty=False, include_optional_defaults=False, **kwargs):
"""
Serialize a STIX object to ``fp`` (a text stream file-like supporting object).
Args:
obj: The STIX object to be serialized.
fp: A text stream file-like object supporting ``.write()``.
pretty (bool): If True, output properties following the STIX specs
formatting. This includes indentation. Refer to notes for more
details. (Default: ``False``)
include_optional_defaults (bool): Determines whether to include
optional properties set to the default value defined in the spec.
**kwargs: The arguments for a json.dumps() call.
Returns:
None
Note:
The argument ``pretty=True`` will output the STIX object following
spec order. Using this argument greatly impacts object serialization
@ -80,9 +112,9 @@ def serialize(obj, pretty=False, include_optional_defaults=False, **kwargs):
kwargs.update({'indent': 4, 'separators': (',', ': '), 'item_sort_key': sort_by})
if include_optional_defaults:
return json.dumps(obj, cls=STIXJSONIncludeOptionalDefaultsEncoder, **kwargs)
json.dump(obj, fp, cls=STIXJSONIncludeOptionalDefaultsEncoder, **kwargs)
else:
return json.dumps(obj, cls=STIXJSONEncoder, **kwargs)
json.dump(obj, fp, cls=STIXJSONEncoder, **kwargs)
def _find(seq, val):

View File

@ -223,6 +223,10 @@ def test_obs_absorb_not_equivalent(patt1, patt2):
"([a:b=1] OR [a:b=2]) FOLLOWEDBY ([a:b=3] OR [a:b=4])",
"([a:b=1] FOLLOWEDBY [a:b=3]) OR ([a:b=1] FOLLOWEDBY [a:b=4]) OR ([a:b=2] FOLLOWEDBY [a:b=3]) OR ([a:b=2] FOLLOWEDBY [a:b=4])",
),
(
"([a:b=1] OR [a:b=2]) FOLLOWEDBY ([a:b=5] AND [a:b=6])",
"([a:b=1] FOLLOWEDBY ([a:b=5] AND [a:b=6])) OR ([a:b=2] FOLLOWEDBY ([a:b=5] AND [a:b=6]))",
),
],
)
def test_obs_dnf_equivalent(patt1, patt2):
@ -243,6 +247,10 @@ def test_obs_dnf_equivalent(patt1, patt2):
"[a:b=1] WITHIN 2 SECONDS",
"[a:b=1] REPEATS 2 TIMES",
),
(
"[a:b=1] FOLLOWEDBY ([a:b=2] OR [a:b=3])",
"([a:b=2] FOLLOWEDBY [a:b=1]) OR ([a:b=1] FOLLOWEDBY [a:b=3])",
),
],
)
def test_obs_not_equivalent(patt1, patt2):

View File

@ -1,3 +1,4 @@
import io
import json
import pytest
@ -113,6 +114,27 @@ def test_bundle_id_must_start_with_bundle():
assert str(excinfo.value) == "Invalid value for Bundle 'id': must start with 'bundle--'."
def test_create_bundle_fp_serialize_pretty(indicator, malware, relationship):
bundle = stix2.v20.Bundle(objects=[indicator, malware, relationship])
buffer = io.StringIO()
bundle.fp_serialize(buffer, pretty=True)
assert str(bundle) == EXPECTED_BUNDLE
assert bundle.serialize(pretty=True) == EXPECTED_BUNDLE
assert buffer.getvalue() == EXPECTED_BUNDLE
def test_create_bundle_fp_serialize_nonpretty(indicator, malware, relationship):
bundle = stix2.v20.Bundle(objects=[indicator, malware, relationship])
buffer = io.StringIO()
bundle.fp_serialize(buffer, sort_keys=True)
assert bundle.serialize(sort_keys=True) == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
assert buffer.getvalue() == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
def test_create_bundle1(indicator, malware, relationship):
bundle = stix2.v20.Bundle(objects=[indicator, malware, relationship])

View File

@ -128,18 +128,17 @@ def test_filter_value_type_check():
with pytest.raises(TypeError) as excinfo:
Filter('created', '=', object())
# On Python 2, the type of object() is `<type 'object'>` On Python 3, it's `<class 'object'>`.
assert any([s in str(excinfo.value) for s in ["<type 'object'>", "'<class 'object'>'"]])
assert "'<class 'object'>'" in str(excinfo.value)
assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
with pytest.raises(TypeError) as excinfo:
Filter("type", "=", complex(2, -1))
assert any([s in str(excinfo.value) for s in ["<type 'complex'>", "'<class 'complex'>'"]])
assert "'<class 'complex'>'" in str(excinfo.value)
assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
with pytest.raises(TypeError) as excinfo:
Filter("type", "=", set([16, 23]))
assert any([s in str(excinfo.value) for s in ["<type 'set'>", "'<class 'set'>'"]])
assert "'<class 'set'>'" in str(excinfo.value)
assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)

View File

@ -3,9 +3,8 @@ import json
from medallion.filters.basic_filter import BasicFilter
import pytest
from requests.models import Response
import six
from taxii2client.common import _filter_kwargs_to_query_params
from taxii2client.v20 import Collection
from taxii2client.v20 import MEDIA_TYPE_STIX_V20, Collection
import stix2
from stix2.datastore import DataSourceError
@ -27,7 +26,7 @@ class MockTAXIICollectionEndpoint(Collection):
def add_objects(self, bundle):
self._verify_can_write()
if isinstance(bundle, six.string_types):
if isinstance(bundle, str):
bundle = json.loads(bundle)
for object in bundle.get("objects", []):
self.objects.append(object)
@ -35,12 +34,12 @@ class MockTAXIICollectionEndpoint(Collection):
{
"date_added": get_timestamp(),
"id": object["id"],
"media_type": "application/stix+json;version=2.1",
"media_type": "application/stix+json;version=2.0",
"version": object.get("modified", object.get("created", get_timestamp())),
},
)
def get_objects(self, **filter_kwargs):
def get_objects(self, accept=MEDIA_TYPE_STIX_V20, start=0, per_request=0, **filter_kwargs):
self._verify_can_read()
query_params = _filter_kwargs_to_query_params(filter_kwargs)
assert isinstance(query_params, dict)
@ -52,7 +51,12 @@ class MockTAXIICollectionEndpoint(Collection):
100,
)[0]
if objs:
return stix2.v20.Bundle(objects=objs)
resp = Response()
resp.status_code = 200
resp.headers["Content-Range"] = f"items 0-{len(objs)}/{len(objs)}"
resp.encoding = "utf-8"
resp._content = bytes(stix2.v20.Bundle(objects=objs).serialize(ensure_ascii=False), resp.encoding)
return resp
else:
resp = Response()
resp.status_code = 404

View File

@ -1,3 +1,4 @@
import json
import os
import pytest
@ -67,6 +68,11 @@ def ds2():
yield stix2.MemoryStore(stix_objs)
@pytest.fixture
def fs():
yield stix2.FileSystemSource(FS_PATH)
def test_object_factory_created_by_ref_str():
factory = stix2.ObjectFactory(created_by_ref=IDENTITY_ID)
ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
@ -418,7 +424,7 @@ def test_related_to_by_target(ds):
def test_versioned_checks(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": True,
@ -431,7 +437,7 @@ def test_versioned_checks(ds, ds2):
def test_semantic_check_with_versioning(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": False,
@ -461,13 +467,11 @@ def test_semantic_check_with_versioning(ds, ds2):
def test_list_semantic_check(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"ds1": ds,
"ds2": ds2,
"max_depth": 1,
},
})
@ -497,62 +501,149 @@ def test_list_semantic_check(ds, ds2):
assert round(score) == 1
def test_graph_equivalence_with_filesystem_source(ds):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
def test_graph_similarity_raises_value_error(ds):
with pytest.raises(ValueError):
prop_scores1 = {}
stix2.Environment().graph_similarity(ds, ds2, prop_scores1, max_depth=-1)
def test_graph_similarity_with_filesystem_source(ds, fs):
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, ignore_spec_version=True)
# Switching parameters
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, ignore_spec_version=True)
assert round(env1) == 25
assert round(prop_scores1["matching_score"]) == 451
assert round(prop_scores1["len_pairs"]) == 18
assert round(env2) == 25
assert round(prop_scores2["matching_score"]) == 451
assert round(prop_scores2["len_pairs"]) == 18
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_similarity_with_duplicate_graph(ds):
prop_scores = {}
fs = stix2.FileSystemSource(FS_PATH)
env = stix2.Environment().graphically_equivalent(fs, ds, prop_scores, **weights)
assert round(env) == 28
assert round(prop_scores["matching_score"]) == 139
assert round(prop_scores["sum_weights"]) == 500
env = stix2.Environment().graph_similarity(ds, ds, prop_scores)
assert round(env) == 100
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_similarity_with_versioning_check_on(ds2, ds):
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, versioning_checks=True)
# Switching parameters
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, versioning_checks=True)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_similarity_with_versioning_check_off(ds2, ds):
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1)
# Switching parameters
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_equivalence_with_filesystem_source(ds, fs):
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, ignore_spec_version=True)
# Switching parameters
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, ignore_spec_version=True)
assert env1 is False
assert round(prop_scores1["matching_score"]) == 451
assert round(prop_scores1["len_pairs"]) == 18
assert env2 is False
assert round(prop_scores2["matching_score"]) == 451
assert round(prop_scores2["len_pairs"]) == 18
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_equivalence_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds, prop_scores, **weights)
assert round(env) == 100
env = stix2.Environment().graph_equivalence(ds, ds, prop_scores)
assert env is True
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["sum_weights"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_equivalence_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
assert round(env) == 93
assert round(prop_scores["matching_score"]) == 745
assert round(prop_scores["sum_weights"]) == 800
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, versioning_checks=True)
# Switching parameters
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, versioning_checks=True)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
assert env2 is True
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_equivalence_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
assert round(env) == 93
assert round(prop_scores["matching_score"]) == 745
assert round(prop_scores["sum_weights"]) == 800
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1)
# Switching parameters
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
assert env2 is True
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)

View File

@ -170,6 +170,60 @@ def test_versioning_error_dict_bad_modified_value():
"but have the same id and modified timestamp do not have defined consumer behavior."
def test_versioning_dict_unregistered_no_modified():
d = {
"type": "not-registered",
"id": "not-registered--4da54535-47b7-468c-88fa-d13b04033c4b",
"created": "1995-04-07T15:37:48.178Z",
}
new_d = stix2.versioning.new_version(d)
assert "modified" in new_d
assert new_d["modified"] > stix2.utils.parse_into_datetime(d["created"])
new_d = stix2.versioning.new_version(d, modified="1996-11-20T01:19:29.134Z")
assert new_d["modified"] == "1996-11-20T01:19:29.134Z"
def test_versioning_dict_unregistered_unversionable():
d = {
"type": "not-registered",
"id": "not-registered--4da54535-47b7-468c-88fa-d13b04033c4b",
"modified": "1995-04-07T15:37:48.178Z",
}
with pytest.raises(stix2.exceptions.ObjectNotVersionableError):
stix2.versioning.new_version(d)
with pytest.raises(stix2.exceptions.ObjectNotVersionableError):
# should fail even if we provide a "created" kwarg.
stix2.versioning.new_version(d, created="1985-06-29T06:09:51.157Z")
def test_versioning_custom_object():
@stix2.v20.CustomObject(
"x-versionable-all-optional-20", [
("created", stix2.properties.TimestampProperty()),
("modified", stix2.properties.TimestampProperty()),
("revoked", stix2.properties.BooleanProperty()),
],
)
class CustomSDO:
pass
obj = CustomSDO(created="1990-12-18T17:56:11.346234Z")
new_obj = stix2.versioning.new_version(obj)
assert "modified" in new_obj
assert new_obj["modified"] > new_obj["created"]
obj = CustomSDO()
with pytest.raises(stix2.exceptions.ObjectNotVersionableError):
# fails due to insufficient properties on the object, even though its
# type supports versioning.
stix2.versioning.new_version(obj)
def test_versioning_error_dict_no_modified_value():
campaign_v1 = {
'type': 'campaign',
@ -184,10 +238,10 @@ def test_versioning_error_dict_no_modified_value():
def test_making_new_version_invalid_cls():
campaign_v1 = "This is a campaign."
with pytest.raises(ValueError) as excinfo:
with pytest.raises(stix2.exceptions.TypeNotVersionableError) as excinfo:
stix2.versioning.new_version(campaign_v1, name="fred")
assert 'cannot create new version of object of this type' in str(excinfo.value)
assert excinfo.value.object is campaign_v1
def test_revoke_dict():
@ -206,7 +260,7 @@ def test_revoke_dict():
def test_revoke_unversionable():
sco = stix2.v20.File(name="data.txt")
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
sco.revoke()
@ -277,7 +331,7 @@ def test_version_unversionable_dict():
"name": "data.txt",
}
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(f)
@ -295,10 +349,10 @@ def test_version_sco_with_modified():
"modified": "1991-05-13T19:24:57Z",
}
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(file_sco, name="newname.txt")
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.revoke(file_sco)
file_sco_obj = stix2.v20.File(
@ -307,10 +361,10 @@ def test_version_sco_with_modified():
modified="1991-05-13T19:24:57Z",
)
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(file_sco_obj, name="newname.txt")
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.revoke(file_sco_obj)
@ -337,6 +391,45 @@ def test_version_sco_with_custom():
revoked_obj = stix2.versioning.revoke(new_file_sco_obj)
assert revoked_obj.revoked
# Same thing with a dict
d = {
"type": "file",
"name": "data.txt",
"created": "1973-11-23T02:31:37Z",
"modified": "1991-05-13T19:24:57Z",
"revoked": False,
}
new_d = stix2.versioning.new_version(d, size=1234)
assert new_d["size"] == 1234
revoked_d = stix2.versioning.revoke(new_d)
assert revoked_d["revoked"]
def test_version_marking():
m = stix2.v20.MarkingDefinition(
created="1982-11-29T12:20:13.723Z",
definition_type="statement",
definition={"statement": "Copyright (c) 2000-2020 Acme Corp"},
)
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(m)
m = {
"type": "marking-definition",
"id": "marking-definition--2a9f3f6e-5cbd-423b-a40d-02aefd29e612",
"created": "1982-11-29T12:20:13.723Z",
"definition_type": "statement",
"definition": {
"statement": "Copyright (c) 2000-2020 Acme Corp",
},
}
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(m)
def test_version_disable_custom():
m = stix2.v20.Malware(

View File

@ -1,3 +1,4 @@
import io
import json
import pytest
@ -123,6 +124,27 @@ def test_bundle_id_must_start_with_bundle():
assert str(excinfo.value) == "Invalid value for Bundle 'id': must start with 'bundle--'."
def test_create_bundle_fp_serialize_pretty(indicator, malware, relationship):
bundle = stix2.v21.Bundle(objects=[indicator, malware, relationship])
buffer = io.StringIO()
bundle.fp_serialize(buffer, pretty=True)
assert str(bundle) == EXPECTED_BUNDLE
assert bundle.serialize(pretty=True) == EXPECTED_BUNDLE
assert buffer.getvalue() == EXPECTED_BUNDLE
def test_create_bundle_fp_serialize_nonpretty(indicator, malware, relationship):
bundle = stix2.v21.Bundle(objects=[indicator, malware, relationship])
buffer = io.StringIO()
bundle.fp_serialize(buffer, sort_keys=True)
assert bundle.serialize(sort_keys=True) == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
assert buffer.getvalue() == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
def test_create_bundle1(indicator, malware, relationship):
bundle = stix2.v21.Bundle(objects=[indicator, malware, relationship])

View File

@ -146,18 +146,17 @@ def test_filter_value_type_check():
with pytest.raises(TypeError) as excinfo:
Filter('created', '=', object())
# On Python 2, the type of object() is `<type 'object'>` On Python 3, it's `<class 'object'>`.
assert any([s in str(excinfo.value) for s in ["<type 'object'>", "'<class 'object'>'"]])
assert "'<class 'object'>'" in str(excinfo.value)
assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
with pytest.raises(TypeError) as excinfo:
Filter("type", "=", complex(2, -1))
assert any([s in str(excinfo.value) for s in ["<type 'complex'>", "'<class 'complex'>'"]])
assert "'<class 'complex'>'" in str(excinfo.value)
assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
with pytest.raises(TypeError) as excinfo:
Filter("type", "=", set([16, 23]))
assert any([s in str(excinfo.value) for s in ["<type 'set'>", "'<class 'set'>'"]])
assert "'<class 'set'>'" in str(excinfo.value)
assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)

View File

@ -3,7 +3,6 @@ import json
from medallion.filters.basic_filter import BasicFilter
import pytest
from requests.models import Response
import six
from taxii2client.common import _filter_kwargs_to_query_params
from taxii2client.v21 import Collection
@ -27,16 +26,16 @@ class MockTAXIICollectionEndpoint(Collection):
def add_objects(self, bundle):
self._verify_can_write()
if isinstance(bundle, six.string_types):
if isinstance(bundle, str):
bundle = json.loads(bundle)
for object in bundle.get("objects", []):
self.objects.append(object)
for obj in bundle.get("objects", []):
self.objects.append(obj)
self.manifests.append(
{
"date_added": get_timestamp(),
"id": object["id"],
"id": obj["id"],
"media_type": "application/stix+json;version=2.1",
"version": object.get("modified", object.get("created", get_timestamp())),
"version": obj.get("modified", obj.get("created", get_timestamp())),
},
)
@ -52,7 +51,10 @@ class MockTAXIICollectionEndpoint(Collection):
100,
)[0]
if objs:
return stix2.v21.Bundle(objects=objs)
return {
"objects": objs,
"more": False,
}
else:
resp = Response()
resp.status_code = 404
@ -76,7 +78,10 @@ class MockTAXIICollectionEndpoint(Collection):
else:
filtered_objects = []
if filtered_objects:
return stix2.v21.Bundle(objects=filtered_objects)
return {
"objects": filtered_objects,
"more": False,
}
else:
resp = Response()
resp.status_code = 404

View File

@ -3,7 +3,6 @@ import datetime
import uuid
import pytest
import six
import stix2.base
import stix2.canonicalization.Canonicalize
@ -31,12 +30,7 @@ def _make_uuid5(name):
"""
Make a STIX 2.1+ compliant UUIDv5 from a "name".
"""
if six.PY3:
uuid_ = uuid.uuid5(SCO_DET_ID_NAMESPACE, name)
else:
uuid_ = uuid.uuid5(
SCO_DET_ID_NAMESPACE, name.encode("utf-8"),
)
uuid_ = uuid.uuid5(SCO_DET_ID_NAMESPACE, name)
return uuid_

View File

@ -1,3 +1,4 @@
import json
import os
import pytest
@ -37,7 +38,7 @@ def ds():
@pytest.fixture
def ds2():
def ds2_objects():
cam = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
idy = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
ind = stix2.v21.Indicator(id=INDICATOR_ID, created_by_ref=idy.id, **INDICATOR_KWARGS)
@ -68,7 +69,17 @@ def ds2():
published="2021-04-09T08:22:22Z", object_refs=stix_objs,
)
stix_objs.append(reprt)
yield stix2.MemoryStore(stix_objs)
yield stix_objs
@pytest.fixture
def ds2(ds2_objects):
yield stix2.MemoryStore(ds2_objects)
@pytest.fixture
def fs():
yield stix2.FileSystemSource(FS_PATH)
def test_object_factory_created_by_ref_str():
@ -426,14 +437,14 @@ def test_related_to_by_target(ds):
assert any(x['id'] == INDICATOR_ID for x in resp)
def test_semantic_equivalence_on_same_attack_pattern1():
def test_object_similarity_on_same_attack_pattern1():
ap1 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS)
ap2 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS)
env = stix2.Environment().semantically_equivalent(ap1, ap2)
env = stix2.Environment().object_similarity(ap1, ap2)
assert round(env) == 100
def test_semantic_equivalence_on_same_attack_pattern2():
def test_object_similarity_on_same_attack_pattern2():
ATTACK_KWARGS = dict(
name="Phishing",
external_references=[
@ -445,18 +456,18 @@ def test_semantic_equivalence_on_same_attack_pattern2():
)
ap1 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_KWARGS)
ap2 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_KWARGS)
env = stix2.Environment().semantically_equivalent(ap1, ap2)
env = stix2.Environment().object_similarity(ap1, ap2)
assert round(env) == 100
def test_semantic_equivalence_on_same_campaign1():
def test_object_similarity_on_same_campaign1():
camp1 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
camp2 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
env = stix2.Environment().semantically_equivalent(camp1, camp2)
env = stix2.Environment().object_similarity(camp1, camp2)
assert round(env) == 100
def test_semantic_equivalence_on_same_campaign2():
def test_object_similarity_on_same_campaign2():
CAMP_KWARGS = dict(
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector.",
@ -464,18 +475,18 @@ def test_semantic_equivalence_on_same_campaign2():
)
camp1 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMP_KWARGS)
camp2 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMP_KWARGS)
env = stix2.Environment().semantically_equivalent(camp1, camp2)
env = stix2.Environment().object_similarity(camp1, camp2)
assert round(env) == 100
def test_semantic_equivalence_on_same_identity1():
def test_object_similarity_on_same_identity1():
iden1 = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
iden2 = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
env = stix2.Environment().semantically_equivalent(iden1, iden2)
env = stix2.Environment().object_similarity(iden1, iden2)
assert round(env) == 100
def test_semantic_equivalence_on_same_identity2():
def test_object_similarity_on_same_identity2():
IDEN_KWARGS = dict(
name="John Smith",
identity_class="individual",
@ -483,26 +494,26 @@ def test_semantic_equivalence_on_same_identity2():
)
iden1 = stix2.v21.Identity(id=IDENTITY_ID, **IDEN_KWARGS)
iden2 = stix2.v21.Identity(id=IDENTITY_ID, **IDEN_KWARGS)
env = stix2.Environment().semantically_equivalent(iden1, iden2)
env = stix2.Environment().object_similarity(iden1, iden2)
assert round(env) == 100
def test_semantic_equivalence_on_same_indicator():
def test_object_similarity_on_same_indicator():
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
env = stix2.Environment().semantically_equivalent(ind1, ind2)
env = stix2.Environment().object_similarity(ind1, ind2)
assert round(env) == 100
def test_semantic_equivalence_on_same_location1():
def test_object_similarity_on_same_location1():
location_kwargs = dict(latitude=45, longitude=179)
loc1 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
loc2 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
env = stix2.Environment().semantically_equivalent(loc1, loc2)
env = stix2.Environment().object_similarity(loc1, loc2)
assert round(env) == 100
def test_semantic_equivalence_on_same_location2():
def test_object_similarity_on_same_location2():
location_kwargs = dict(
latitude=38.889,
longitude=-77.023,
@ -511,33 +522,33 @@ def test_semantic_equivalence_on_same_location2():
)
loc1 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
loc2 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
env = stix2.Environment().semantically_equivalent(loc1, loc2)
env = stix2.Environment().object_similarity(loc1, loc2)
assert round(env) == 100
def test_semantic_equivalence_location_with_no_latlong():
def test_object_similarity_location_with_no_latlong():
loc_kwargs = dict(country="US", administrative_area="US-DC")
loc1 = stix2.v21.Location(id=LOCATION_ID, **LOCATION_KWARGS)
loc2 = stix2.v21.Location(id=LOCATION_ID, **loc_kwargs)
env = stix2.Environment().semantically_equivalent(loc1, loc2)
env = stix2.Environment().object_similarity(loc1, loc2)
assert round(env) != 100
def test_semantic_equivalence_on_same_malware():
def test_object_similarity_on_same_malware():
malw1 = stix2.v21.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
malw2 = stix2.v21.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
env = stix2.Environment().semantically_equivalent(malw1, malw2)
env = stix2.Environment().object_similarity(malw1, malw2)
assert round(env) == 100
def test_semantic_equivalence_on_same_threat_actor1():
def test_object_similarity_on_same_threat_actor1():
ta1 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS)
ta2 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS)
env = stix2.Environment().semantically_equivalent(ta1, ta2)
env = stix2.Environment().object_similarity(ta1, ta2)
assert round(env) == 100
def test_semantic_equivalence_on_same_threat_actor2():
def test_object_similarity_on_same_threat_actor2():
THREAT_KWARGS = dict(
threat_actor_types=["crime-syndicate"],
aliases=["super-evil"],
@ -545,25 +556,38 @@ def test_semantic_equivalence_on_same_threat_actor2():
)
ta1 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_KWARGS)
ta2 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_KWARGS)
env = stix2.Environment().semantically_equivalent(ta1, ta2)
env = stix2.Environment().object_similarity(ta1, ta2)
assert round(env) == 100
def test_semantic_equivalence_on_same_tool():
def test_object_similarity_on_same_tool():
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
env = stix2.Environment().semantically_equivalent(tool1, tool2)
env = stix2.Environment().object_similarity(tool1, tool2)
assert round(env) == 100
def test_semantic_equivalence_on_same_vulnerability1():
def test_object_similarity_on_same_vulnerability1():
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
env = stix2.Environment().semantically_equivalent(vul1, vul2)
prop_scores = {}
env = stix2.Environment().object_similarity(vul1, vul2, prop_scores)
assert round(env) == 100
assert round(prop_scores["matching_score"]) == 30
assert round(prop_scores["sum_weights"]) == 30
def test_semantic_equivalence_on_same_vulnerability2():
def test_object_equivalence_on_same_vulnerability1():
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
prop_scores = {}
env = stix2.Environment().object_equivalence(vul1, vul2, prop_scores)
assert env is True
assert round(prop_scores["matching_score"]) == 30
assert round(prop_scores["sum_weights"]) == 30
def test_object_similarity_on_same_vulnerability2():
VULN_KWARGS1 = dict(
name="Heartbleed",
external_references=[
@ -584,11 +608,42 @@ def test_semantic_equivalence_on_same_vulnerability2():
)
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS1)
vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS2)
env = stix2.Environment().semantically_equivalent(vul1, vul2)
prop_scores = {}
env = stix2.Environment().object_similarity(vul1, vul2, prop_scores)
assert round(env) == 0.0
assert round(prop_scores["matching_score"]) == 0
assert round(prop_scores["sum_weights"]) == 100
def test_semantic_equivalence_on_unknown_object():
def test_object_equivalence_on_same_vulnerability2():
VULN_KWARGS1 = dict(
name="Heartbleed",
external_references=[
{
"url": "https://example",
"source_name": "some-source",
},
],
)
VULN_KWARGS2 = dict(
name="Foo",
external_references=[
{
"url": "https://example2",
"source_name": "some-source2",
},
],
)
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS1)
vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS2)
prop_scores = {}
env = stix2.Environment().object_equivalence(vul1, vul2, prop_scores)
assert env is False
assert round(prop_scores["matching_score"]) == 0
assert round(prop_scores["sum_weights"]) == 100
def test_object_similarity_on_unknown_object():
CUSTOM_KWARGS1 = dict(
type="x-foobar",
id="x-foobar--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061",
@ -615,17 +670,17 @@ def test_semantic_equivalence_on_unknown_object():
def _x_foobar_checks(obj1, obj2, **weights):
matching_score = 0.0
sum_weights = 0.0
if stix2.environment.check_property_present("external_references", obj1, obj2):
if stix2.equivalence.object.check_property_present("external_references", obj1, obj2):
w = weights["external_references"]
sum_weights += w
matching_score += w * stix2.environment.partial_external_reference_based(
matching_score += w * stix2.equivalence.object.partial_external_reference_based(
obj1["external_references"],
obj2["external_references"],
)
if stix2.environment.check_property_present("name", obj1, obj2):
if stix2.equivalence.object.check_property_present("name", obj1, obj2):
w = weights["name"]
sum_weights += w
matching_score += w * stix2.environment.partial_string_based(obj1["name"], obj2["name"])
matching_score += w * stix2.equivalence.object.partial_string_based(obj1["name"], obj2["name"])
return matching_score, sum_weights
weights = {
@ -640,20 +695,20 @@ def test_semantic_equivalence_on_unknown_object():
}
cust1 = stix2.parse(CUSTOM_KWARGS1, allow_custom=True)
cust2 = stix2.parse(CUSTOM_KWARGS2, allow_custom=True)
env = stix2.Environment().semantically_equivalent(cust1, cust2, **weights)
env = stix2.Environment().object_similarity(cust1, cust2, **weights)
assert round(env) == 0
def test_semantic_equivalence_different_type_raises():
def test_object_similarity_different_type_raises():
with pytest.raises(ValueError) as excinfo:
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
stix2.Environment().semantically_equivalent(vul1, ind1)
stix2.Environment().object_similarity(vul1, ind1)
assert str(excinfo.value) == "The objects to compare must be of the same type!"
def test_semantic_equivalence_different_spec_version_raises():
def test_object_similarity_different_spec_version_raises():
with pytest.raises(ValueError) as excinfo:
V20_KWARGS = dict(
labels=['malicious-activity'],
@ -661,23 +716,24 @@ def test_semantic_equivalence_different_spec_version_raises():
)
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **V20_KWARGS)
stix2.Environment().semantically_equivalent(ind1, ind2)
stix2.Environment().object_similarity(ind1, ind2)
assert str(excinfo.value) == "The objects to compare must be of the same spec version!"
def test_semantic_equivalence_zero_match():
def test_object_similarity_zero_match():
IND_KWARGS = dict(
indicator_types=["APTX"],
indicator_types=["malicious-activity", "bar"],
pattern="[ipv4-addr:value = '192.168.1.1']",
pattern_type="stix",
valid_from="2019-01-01T12:34:56Z",
labels=["APTX", "foo"],
)
weights = {
"indicator": {
"indicator_types": (15, stix2.environment.partial_list_based),
"pattern": (80, stix2.environment.custom_pattern_based),
"valid_from": (5, stix2.environment.partial_timestamp_based),
"indicator_types": (15, stix2.equivalence.object.partial_list_based),
"pattern": (80, stix2.equivalence.object.custom_pattern_based),
"valid_from": (5, stix2.equivalence.object.partial_timestamp_based),
"tdelta": 1, # One day interval
},
"_internal": {
@ -686,29 +742,31 @@ def test_semantic_equivalence_zero_match():
}
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v21.Indicator(id=INDICATOR_ID, **IND_KWARGS)
env = stix2.Environment().semantically_equivalent(ind1, ind2, **weights)
assert round(env) == 0
env = stix2.Environment().object_similarity(ind1, ind2, **weights)
assert round(env) == 8
env = stix2.Environment().object_similarity(ind2, ind1, **weights)
assert round(env) == 8
def test_semantic_equivalence_different_spec_version():
def test_object_similarity_different_spec_version():
IND_KWARGS = dict(
labels=["APTX"],
pattern="[ipv4-addr:value = '192.168.1.1']",
)
weights = {
"indicator": {
"indicator_types": (15, stix2.environment.partial_list_based),
"pattern": (80, stix2.environment.custom_pattern_based),
"valid_from": (5, stix2.environment.partial_timestamp_based),
"indicator_types": (15, stix2.equivalence.object.partial_list_based),
"pattern": (80, stix2.equivalence.object.custom_pattern_based),
"valid_from": (5, stix2.equivalence.object.partial_timestamp_based),
"tdelta": 1, # One day interval
},
"_internal": {
"ignore_spec_version": True, # Disables spec_version check.
},
}
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **IND_KWARGS)
env = stix2.Environment().semantically_equivalent(ind1, ind2, **weights)
env = stix2.Environment().object_similarity(ind1, ind2, ignore_spec_version=True, **weights)
assert round(env) == 0
env = stix2.Environment().object_similarity(ind2, ind1, ignore_spec_version=True, **weights)
assert round(env) == 0
@ -780,34 +838,36 @@ def test_semantic_equivalence_different_spec_version():
),
],
)
def test_semantic_equivalence_external_references(refs1, refs2, ret_val):
value = stix2.environment.partial_external_reference_based(refs1, refs2)
def test_object_similarity_external_references(refs1, refs2, ret_val):
value = stix2.equivalence.object.partial_external_reference_based(refs1, refs2)
assert value == ret_val
def test_semantic_equivalence_timestamp():
def test_object_similarity_timestamp():
t1 = "2018-10-17T00:14:20.652Z"
t2 = "2018-10-17T12:14:20.652Z"
assert stix2.environment.partial_timestamp_based(t1, t2, 1) == 0.5
assert stix2.equivalence.object.partial_timestamp_based(t1, t2, 1) == 0.5
def test_semantic_equivalence_exact_match():
def test_object_similarity_exact_match():
t1 = "2018-10-17T00:14:20.652Z"
t2 = "2018-10-17T12:14:20.652Z"
assert stix2.environment.exact_match(t1, t2) == 0.0
assert stix2.equivalence.object.exact_match(t1, t2) == 0.0
def test_non_existent_config_for_object():
def test_no_datastore_fallsback_list_based_check_for_refs_check():
r1 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
r2 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
assert stix2.Environment().semantically_equivalent(r1, r2) == 0.0
prop_scores = {}
assert stix2.Environment().object_similarity(r1, r2, prop_scores) == 100.0
assert prop_scores["object_refs"]["check_type"] == "partial_list_based"
def custom_semantic_equivalence_method(obj1, obj2, **weights):
return 96.0, 100.0
def test_semantic_equivalence_method_provided():
def test_object_similarity_method_provided():
# Because `method` is provided, `partial_list_based` will be ignored
TOOL2_KWARGS = dict(
name="Random Software",
@ -816,19 +876,19 @@ def test_semantic_equivalence_method_provided():
weights = {
"tool": {
"tool_types": (20, stix2.environment.partial_list_based),
"name": (80, stix2.environment.partial_string_based),
"tool_types": (20, stix2.equivalence.object.partial_list_based),
"name": (80, stix2.equivalence.object.partial_string_based),
"method": custom_semantic_equivalence_method,
},
}
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
env = stix2.Environment().semantically_equivalent(tool1, tool2, **weights)
env = stix2.Environment().object_similarity(tool1, tool2, **weights)
assert round(env) == 96
def test_semantic_equivalence_prop_scores():
def test_object_similarity_prop_scores():
TOOL2_KWARGS = dict(
name="Random Software",
tool_types=["information-gathering"],
@ -838,7 +898,7 @@ def test_semantic_equivalence_prop_scores():
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
stix2.Environment().semantically_equivalent(tool1, tool2, prop_scores)
stix2.Environment().object_similarity(tool1, tool2, prop_scores)
assert len(prop_scores) == 4
assert round(prop_scores["matching_score"], 1) == 8.9
assert round(prop_scores["sum_weights"], 1) == 100.0
@ -850,7 +910,7 @@ def custom_semantic_equivalence_method_prop_scores(obj1, obj2, prop_scores, **we
return 96.0, 100.0
def test_semantic_equivalence_prop_scores_method_provided():
def test_object_similarity_prop_scores_method_provided():
TOOL2_KWARGS = dict(
name="Random Software",
tool_types=["information-gathering"],
@ -868,7 +928,7 @@ def test_semantic_equivalence_prop_scores_method_provided():
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
env = stix2.Environment().semantically_equivalent(tool1, tool2, prop_scores, **weights)
env = stix2.Environment().object_similarity(tool1, tool2, prop_scores, **weights)
assert round(env) == 96
assert len(prop_scores) == 2
assert prop_scores["matching_score"] == 96.0
@ -876,7 +936,8 @@ def test_semantic_equivalence_prop_scores_method_provided():
def test_versioned_checks(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
# Testing internal method
weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": True,
@ -889,7 +950,7 @@ def test_versioned_checks(ds, ds2):
def test_semantic_check_with_versioning(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": False,
@ -920,7 +981,7 @@ def test_semantic_check_with_versioning(ds, ds2):
def test_list_semantic_check(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": False,
@ -955,63 +1016,272 @@ def test_list_semantic_check(ds, ds2):
)
assert round(score) == 1
score = stix2.equivalence.object.list_reference_check(
object_refs2,
object_refs1,
ds2,
ds,
**weights,
)
assert round(score) == 1
def test_graph_equivalence_with_filesystem_source(ds):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
def test_graph_similarity_raises_value_error(ds):
with pytest.raises(ValueError):
prop_scores1 = {}
stix2.Environment().graph_similarity(ds, ds2, prop_scores1, max_depth=-1)
def test_graph_similarity_with_filesystem_source(ds, fs):
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(
fs, ds, prop_scores1,
ignore_spec_version=True,
versioning_checks=False,
max_depth=1,
)
# Switching parameters
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(
ds, fs, prop_scores2,
ignore_spec_version=True,
versioning_checks=False,
max_depth=1,
)
assert round(env1) == 23
assert round(prop_scores1["matching_score"]) == 411
assert round(prop_scores1["len_pairs"]) == 18
assert round(env2) == 23
assert round(prop_scores2["matching_score"]) == 411
assert round(prop_scores2["len_pairs"]) == 18
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_depth_limiting():
g1 = [
{
"type": "foo",
"id": "foo--07f9dd2a-1cce-45bb-8cbe-dba3f007aafd",
"spec_version": "2.1",
"created": "1986-02-08T00:20:17Z",
"modified": "1989-12-11T06:54:29Z",
"some1_ref": "foo--700a8a3c-9936-412f-b4eb-ede466476180",
"some2_ref": "foo--f4a999a3-df94-499d-9cac-6c02e21775ee",
},
{
"type": "foo",
"id": "foo--700a8a3c-9936-412f-b4eb-ede466476180",
"spec_version": "2.1",
"created": "1989-01-06T10:31:54Z",
"modified": "1995-06-18T10:25:01Z",
"some1_ref": "foo--705afd45-eb56-43fc-a214-313d63d199a3",
},
{
"type": "foo",
"id": "foo--705afd45-eb56-43fc-a214-313d63d199a3",
"spec_version": "2.1",
"created": "1977-11-06T21:19:29Z",
"modified": "1997-12-02T20:33:34Z",
},
{
"type": "foo",
"id": "foo--f4a999a3-df94-499d-9cac-6c02e21775ee",
"spec_version": "2.1",
"created": "1991-09-17T00:40:52Z",
"modified": "1992-12-06T11:02:47Z",
"name": "alice",
},
]
g2 = [
{
"type": "foo",
"id": "foo--71570479-3e6e-48d2-81fb-897454dec55d",
"spec_version": "2.1",
"created": "1975-12-22T05:20:38Z",
"modified": "1980-11-11T01:09:03Z",
"some1_ref": "foo--4aeda39b-31fa-4ffb-a847-d8edc175a579",
"some2_ref": "foo--941e48d6-3100-4419-9e8c-cf1eb59e71b2",
},
{
"type": "foo",
"id": "foo--4aeda39b-31fa-4ffb-a847-d8edc175a579",
"spec_version": "2.1",
"created": "1976-01-05T08:32:03Z",
"modified": "1980-11-09T05:41:02Z",
"some1_ref": "foo--689252c3-5d20-43ff-bbf7-c8e45d713768",
},
{
"type": "foo",
"id": "foo--689252c3-5d20-43ff-bbf7-c8e45d713768",
"spec_version": "2.1",
"created": "1974-09-11T18:56:30Z",
"modified": "1976-10-31T11:59:43Z",
},
{
"type": "foo",
"id": "foo--941e48d6-3100-4419-9e8c-cf1eb59e71b2",
"spec_version": "2.1",
"created": "1985-01-03T01:07:03Z",
"modified": "1992-07-20T21:32:31Z",
"name": "alice",
},
]
mem_store1 = stix2.MemorySource(g1)
mem_store2 = stix2.MemorySource(g2)
custom_weights = {
"foo": {
"some1_ref": (33, stix2.equivalence.object.reference_check),
"some2_ref": (33, stix2.equivalence.object.reference_check),
"name": (34, stix2.equivalence.object.partial_string_based),
},
}
prop_scores1 = {}
env1 = stix2.equivalence.graph.graph_similarity(
mem_store1, mem_store2, prop_scores1, **custom_weights
)
assert round(env1) == 38
assert round(prop_scores1["matching_score"]) == 300
assert round(prop_scores1["len_pairs"]) == 8
# from 'alice' check in de-reference
assert prop_scores1['summary']['foo--71570479-3e6e-48d2-81fb-897454dec55d']['prop_score']['some2_ref']['weight'] == 33
assert prop_scores1['summary']['foo--07f9dd2a-1cce-45bb-8cbe-dba3f007aafd']['prop_score']['some2_ref']['weight'] == 33
# Switching parameters
prop_scores2 = {}
env2 = stix2.equivalence.graph.graph_similarity(
mem_store2, mem_store1, prop_scores2, **custom_weights
)
assert round(env2) == 38
assert round(prop_scores2["matching_score"]) == 300
assert round(prop_scores2["len_pairs"]) == 8
# from 'alice' check in de-reference
assert prop_scores2['summary']['foo--71570479-3e6e-48d2-81fb-897454dec55d']['prop_score']['some2_ref']['weight'] == 33
assert prop_scores2['summary']['foo--07f9dd2a-1cce-45bb-8cbe-dba3f007aafd']['prop_score']['some2_ref']['weight'] == 33
def test_graph_similarity_with_duplicate_graph(ds):
prop_scores = {}
fs = stix2.FileSystemSource(FS_PATH)
env = stix2.Environment().graphically_equivalent(fs, ds, prop_scores, **weights)
assert round(env) == 24
assert round(prop_scores["matching_score"]) == 122
assert round(prop_scores["sum_weights"]) == 500
env = stix2.Environment().graph_similarity(ds, ds, prop_scores)
assert round(env) == 100
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_similarity_with_versioning_check_on(ds2, ds):
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, versioning_checks=True)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
# Switching parameters
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, versioning_checks=True)
assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_similarity_with_versioning_check_off(ds2, ds):
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
# Switching parameters
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2)
assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_equivalence_with_filesystem_source(ds, fs):
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, ignore_spec_version=True)
# Switching parameters
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, ignore_spec_version=True)
assert env1 is False
assert round(prop_scores1["matching_score"]) == 411
assert round(prop_scores1["len_pairs"]) == 18
assert env2 is False
assert round(prop_scores2["matching_score"]) == 411
assert round(prop_scores2["len_pairs"]) == 18
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_equivalence_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds, prop_scores, **weights)
assert round(env) == 100
env = stix2.Environment().graph_equivalence(ds, ds, prop_scores)
assert env is True
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["sum_weights"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_equivalence_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
assert round(env) == 93
assert round(prop_scores["matching_score"]) == 745
assert round(prop_scores["sum_weights"]) == 800
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, versioning_checks=True)
# Switching parameters
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, versioning_checks=True)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
assert env2 is True
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_equivalence_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
assert round(env) == 93
assert round(prop_scores["matching_score"]) == 745
assert round(prop_scores["sum_weights"]) == 800
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1)
# Switching parameters
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
assert env2 is True
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)

View File

@ -4,6 +4,7 @@ import pytest
import stix2
import stix2.exceptions
import stix2.properties
import stix2.utils
import stix2.v21
import stix2.versioning
@ -179,6 +180,62 @@ def test_versioning_error_dict_bad_modified_value():
"but have the same id and modified timestamp do not have defined consumer behavior."
def test_versioning_dict_unregistered_no_modified():
d = {
"type": "not-registered",
"id": "not-registered--4da54535-47b7-468c-88fa-d13b04033c4b",
"spec_version": "2.1",
"created": "1995-04-07T15:37:48.178Z",
}
new_d = stix2.versioning.new_version(d)
assert "modified" in new_d
assert new_d["modified"] > stix2.utils.parse_into_datetime(d["created"])
new_d = stix2.versioning.new_version(d, modified="1996-11-20T01:19:29.134Z")
assert new_d["modified"] == "1996-11-20T01:19:29.134Z"
def test_versioning_dict_unregistered_unversionable():
d = {
"type": "not-registered",
"id": "not-registered--4da54535-47b7-468c-88fa-d13b04033c4b",
"spec_version": "2.1",
"modified": "1995-04-07T15:37:48.178Z",
}
with pytest.raises(stix2.exceptions.ObjectNotVersionableError):
stix2.versioning.new_version(d)
with pytest.raises(stix2.exceptions.ObjectNotVersionableError):
# should fail even if we provide a "created" kwarg.
stix2.versioning.new_version(d, created="1985-06-29T06:09:51.157Z")
def test_versioning_custom_object():
@stix2.v21.CustomObject(
"x-versionable-all-optional-21", [
("created", stix2.properties.TimestampProperty()),
("modified", stix2.properties.TimestampProperty()),
("revoked", stix2.properties.BooleanProperty()),
],
)
class CustomSDO:
pass
obj = CustomSDO(created="1990-12-18T17:56:11.346234Z")
new_obj = stix2.versioning.new_version(obj)
assert "modified" in new_obj
assert new_obj["modified"] > new_obj["created"]
obj = CustomSDO()
with pytest.raises(stix2.exceptions.ObjectNotVersionableError):
# fails due to insufficient properties on the object, even though its
# type supports versioning.
stix2.versioning.new_version(obj)
def test_versioning_error_dict_no_modified_value():
campaign_v1 = {
'type': 'campaign',
@ -193,10 +250,10 @@ def test_versioning_error_dict_no_modified_value():
def test_making_new_version_invalid_cls():
campaign_v1 = "This is a campaign."
with pytest.raises(ValueError) as excinfo:
with pytest.raises(stix2.exceptions.TypeNotVersionableError) as excinfo:
stix2.versioning.new_version(campaign_v1, name="fred")
assert 'cannot create new version of object of this type' in str(excinfo.value)
assert excinfo.value.object is campaign_v1
def test_revoke_dict():
@ -216,7 +273,7 @@ def test_revoke_dict():
def test_revoke_unversionable():
sco = stix2.v21.File(name="data.txt")
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
sco.revoke()
@ -318,7 +375,7 @@ def test_version_unversionable_dict():
"name": "data.txt",
}
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(f)
@ -345,6 +402,23 @@ def test_version_sco_with_custom():
revoked_obj = stix2.versioning.revoke(new_file_sco_obj)
assert revoked_obj.revoked
# Same thing with a dict
d = {
"type": "file",
"id": "file--d287f10a-98b4-4a47-8fa0-64b12695ea58",
"spec_version": "2.1",
"name": "data.txt",
"created": "1973-11-23T02:31:37Z",
"modified": "1991-05-13T19:24:57Z",
"revoked": False,
}
new_d = stix2.versioning.new_version(d, size=1234)
assert new_d["size"] == 1234
revoked_d = stix2.versioning.revoke(new_d)
assert revoked_d["revoked"]
def test_version_sco_id_contributing_properties():
file_sco_obj = stix2.v21.File(
@ -378,6 +452,33 @@ def test_version_sco_id_contributing_properties_dict():
assert e.value.unchangable_properties == {"name"}
def test_version_marking():
m = stix2.v21.MarkingDefinition(
name="a name",
created="1982-11-29T12:20:13.723Z",
definition_type="statement",
definition={"statement": "Copyright (c) 2000-2020 Acme Corp"},
)
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(m)
m = {
"type": "marking-definition",
"id": "marking-definition--2a9f3f6e-5cbd-423b-a40d-02aefd29e612",
"spec_version": "2.1",
"name": "a name",
"created": "1982-11-29T12:20:13.723Z",
"definition_type": "statement",
"definition": {
"statement": "Copyright (c) 2000-2020 Acme Corp",
},
}
with pytest.raises(stix2.exceptions.TypeNotVersionableError):
stix2.versioning.new_version(m)
def test_version_disable_custom():
m = stix2.v21.Malware(
name="foo", description="Steals your identity!", is_family=False,

View File

@ -7,7 +7,6 @@ import json
import re
import pytz
import six
import stix2.registry as mappings
import stix2.version
@ -70,7 +69,7 @@ def _to_enum(value, enum_type, enum_default=None):
if not isinstance(value, enum_type):
if value is None and enum_default is not None:
value = enum_default
elif isinstance(value, six.string_types):
elif isinstance(value, str):
value = enum_type[value.upper()]
else:
raise TypeError(

View File

@ -3,8 +3,6 @@
from collections import OrderedDict
import copy
import six
from ..custom import _custom_marking_builder
from ..markings import _MarkingsMixin
from ..markings.utils import check_tlp_marking
@ -21,7 +19,7 @@ def _should_set_millisecond(cr, marking_type):
if marking_type == TLPMarking:
return True
# otherwise, precision is kept from how it was given
if isinstance(cr, six.string_types):
if isinstance(cr, str):
if '.' in cr:
return True
else:

View File

@ -2,9 +2,9 @@
from collections import OrderedDict
import itertools
from urllib.parse import quote_plus
import warnings
from six.moves.urllib.parse import quote_plus
from stix2patterns.validator import run_validator
from ..custom import _custom_object_builder

View File

@ -9,13 +9,13 @@ import uuid
import stix2.base
import stix2.registry
from stix2.utils import (
detect_spec_version, get_timestamp, is_sco, is_sdo, is_sro,
parse_into_datetime,
detect_spec_version, get_timestamp, is_sco, parse_into_datetime,
)
import stix2.v20
from .exceptions import (
InvalidValueError, RevokeError, UnmodifiablePropertyError,
InvalidValueError, ObjectNotVersionableError, RevokeError,
TypeNotVersionableError, UnmodifiablePropertyError,
)
# STIX object properties that cannot be modified
@ -56,27 +56,16 @@ def _fudge_modified(old_modified, new_modified, use_stix21):
return new_modified
def _is_versionable(data):
def _get_stix_version(data):
"""
Determine whether the given object is versionable. This check is done on
the basis of support for three properties for the object type: "created",
"modified", and "revoked". If all three are supported, the object is
versionable; otherwise it is not. Dicts must have a "type" property whose
value is for a registered object type. This is used to determine a
complete set of supported properties for the type.
Bit of factored out functionality for getting/detecting the STIX version
of the given value.
Also, detect whether it represents a STIX 2.1 or greater spec version.
:param data: The object to check. Must be either a stix object, or a dict
with a "type" property.
:return: A 2-tuple of bools: the first is True if the object is versionable
and False if not; the second is True if the object is STIX 2.1+ and
False if not.
:param data: An object, e.g. _STIXBase instance or dict
:return: The STIX version as a string in "X.Y" notation, or None if the
version could not be determined.
"""
is_versionable = False
stix_version = None
if isinstance(data, Mapping):
# First, determine spec version. It's easy for our stix2 objects; more
@ -88,36 +77,112 @@ def _is_versionable(data):
elif isinstance(data, dict):
stix_version = detect_spec_version(data)
return stix_version
def _is_versionable_type(data):
"""
Determine whether type of the given object is versionable. This check is
done on the basis of support for three properties for the object type:
"created", "modified", and "revoked". If all three are supported, the
object type is versionable; otherwise it is not. Dicts must have a "type"
property. This is used in STIX version detection and to determine a
complete set of supported properties for the type.
If a dict is passed whose "type" is unregistered, then this library has no
knowledge of the type. It can't determine what properties are "supported".
This function will be lax and treat the type as versionable.
Note that this support check is not sufficient for creating a new object
version. Support for the versioning properties does not mean that
sufficient properties are actually present on the object.
Also, detect whether it represents a STIX 2.1 or greater spec version.
:param data: The object to check. Must be either a stix object, or a dict
with a "type" property.
:return: A 2-tuple: the first element is True if the object is versionable
and False if not; the second is the STIX version as a string in "X.Y"
notation.
"""
is_versionable = False
stix_version = None
if isinstance(data, Mapping):
# First, determine spec version
stix_version = _get_stix_version(data)
# Then, determine versionability.
if isinstance(data, stix2.base._STIXBase):
is_versionable = _VERSIONING_PROPERTIES.issubset(
data._properties,
)
# This should be sufficient for STIX objects; maybe we get lucky with
# dicts here but probably not.
if data.keys() >= _VERSIONING_PROPERTIES:
is_versionable = True
# Tougher to handle dicts. We need to consider STIX version, map to a
# registered class, and from that get a more complete picture of its
# properties.
elif isinstance(data, dict):
obj_type = data["type"]
# Tougher to handle dicts. We need to consider STIX version,
# map to a registered class, and from that get a more complete
# picture of its properties.
if is_sdo(obj_type, stix_version) or is_sro(obj_type, stix_version):
# Should we bother checking properties for SDOs/SROs?
# They were designed to be versionable.
is_versionable = True
elif is_sco(obj_type, stix_version):
# but do check SCOs
cls = stix2.registry.class_for_type(
obj_type, stix_version, "observables",
)
cls = stix2.registry.class_for_type(data.get("type"), stix_version)
if cls:
is_versionable = _VERSIONING_PROPERTIES.issubset(
cls._properties,
)
else:
# The type is not registered, so we have no knowledge of
# what properties are supported. Let's be lax and let them
# version it.
is_versionable = True
return is_versionable, stix_version
def _check_versionable_object(data):
"""
Determine whether there are or may be sufficient properties present on
an object to allow versioning. Raises an exception if the object can't be
versioned.
Also detect STIX spec version.
:param data: The object to check, e.g. dict with a "type" property, or
_STIXBase instance
:return: True if the object is STIX 2.1+, or False if not
:raises TypeNotVersionableError: If the object didn't have the versioning
properties and the type was found to not support them
:raises ObjectNotVersionableError: If the type was found to support
versioning but there were insufficient properties on the object
"""
if isinstance(data, Mapping):
if data.keys() >= _VERSIONING_PROPERTIES:
# If the properties all already exist in the object, assume they
# are either supported by the type, or are custom properties, and
# allow versioning.
stix_version = _get_stix_version(data)
else:
is_versionable_type, stix_version = _is_versionable_type(data)
if is_versionable_type:
# The type supports the versioning properties (or we don't
# recognize it and just assume it does). The question shifts
# to whether the object has sufficient properties to create a
# new version. Just require "created" for now. We need at
# least that as a starting point for new version timestamps.
is_versionable = "created" in data
if not is_versionable:
raise ObjectNotVersionableError(data)
else:
raise TypeNotVersionableError(data)
else:
raise TypeNotVersionableError(data)
return stix_version
def new_version(data, allow_custom=None, **kwargs):
"""
Create a new version of a STIX object, by modifying properties and
@ -134,13 +199,7 @@ def new_version(data, allow_custom=None, **kwargs):
:return: The new object.
"""
is_versionable, stix_version = _is_versionable(data)
if not is_versionable:
raise ValueError(
"cannot create new version of object of this type! "
"Try a dictionary or instance of an SDO or SRO class.",
)
stix_version = _check_versionable_object(data)
if data.get('revoked'):
raise RevokeError("new_version")
@ -178,36 +237,34 @@ def new_version(data, allow_custom=None, **kwargs):
# to know which rules to apply.
precision_constraint = "min" if stix_version == "2.1" else "exact"
old_modified = data.get("modified") or data.get("created")
old_modified = parse_into_datetime(
old_modified, precision="millisecond",
precision_constraint=precision_constraint,
)
cls = type(data)
if 'modified' not in kwargs:
old_modified = parse_into_datetime(
data["modified"], precision="millisecond",
precision_constraint=precision_constraint,
)
new_modified = get_timestamp()
new_modified = _fudge_modified(
old_modified, new_modified, stix_version == "2.1",
)
kwargs['modified'] = new_modified
elif 'modified' in data:
old_modified_property = parse_into_datetime(
data.get('modified'), precision='millisecond',
precision_constraint=precision_constraint,
)
new_modified_property = parse_into_datetime(
if 'modified' in kwargs:
new_modified = parse_into_datetime(
kwargs['modified'], precision='millisecond',
precision_constraint=precision_constraint,
)
if new_modified_property <= old_modified_property:
if new_modified <= old_modified:
raise InvalidValueError(
cls, 'modified',
"The new modified datetime cannot be before than or equal to the current modified datetime."
"It cannot be equal, as according to STIX 2 specification, objects that are different "
"but have the same id and modified timestamp do not have defined consumer behavior.",
)
else:
new_modified = get_timestamp()
new_modified = _fudge_modified(
old_modified, new_modified, stix_version != "2.0",
)
kwargs['modified'] = new_modified
new_obj_inner.update(kwargs)
# Set allow_custom appropriately if versioning an object. We will ignore

View File

@ -161,6 +161,13 @@ def _setup_workbench():
new_class = type(obj_type.__name__, (obj_type,), new_class_dict)
factory_func = functools.partial(_environ.create, new_class)
# Copy over some class attributes that other code expects to find
factory_func._type = obj_type._type
factory_func._properties = obj_type._properties
if hasattr(obj_type, "_id_contributing_properties"):
factory_func._id_contributing_properties = \
obj_type._id_contributing_properties
# Add our new "class" to this module's globals and to the library-wide
# mapping. This allows parse() to use the wrapped classes.
globals()[obj_type.__name__] = factory_func