Merge branch 'master' of github.com:oasis-open/cti-python-stix2 into dev-extensions-proposal

pull/1/head
Emmanuelle Vargas-Gonzalez 2021-02-19 21:54:46 -05:00
commit 9043a9dc8e
31 changed files with 2873 additions and 571 deletions

View File

@ -10,7 +10,6 @@ known_third_party =
pytz,
requests,
simplejson,
six,
sphinx,
stix2patterns,
taxii2client,

View File

@ -23,3 +23,4 @@ repos:
args: ["-c", "--diff"]
- id: isort
name: Sort python imports (fixes files)
exclude: ^stix2/canonicalization/

View File

@ -21,6 +21,8 @@ Install with `pip <https://pip.pypa.io/en/stable/>`__:
$ pip install stix2
Note: The library requires Python 3.6+.
Usage
-----

View File

@ -4,7 +4,6 @@ import os
import re
import sys
from six import class_types
from sphinx.ext.autodoc import ClassDocumenter
from stix2.base import _STIXBase
@ -107,7 +106,7 @@ class STIXPropertyDocumenter(ClassDocumenter):
@classmethod
def can_document_member(cls, member, membername, isattr, parent):
return isinstance(member, class_types) and \
return isinstance(member, type) and \
issubclass(member, _STIXBase) and \
hasattr(member, '_properties')

File diff suppressed because it is too large Load Diff

View File

@ -47,11 +47,11 @@ setup(
],
keywords='stix stix2 json cti cyber threat intelligence',
packages=find_packages(exclude=['*.test', '*.test.*']),
python_requires='>=3.6',
install_requires=[
'pytz',
'requests',
'simplejson',
'six>=1.13.0',
'stix2-patterns>=1.2.0',
],
project_urls={

View File

@ -5,7 +5,6 @@ import re
import uuid
import simplejson as json
import six
import stix2
from stix2.canonicalization.Canonicalize import canonicalize
@ -70,12 +69,9 @@ class _STIXBase(Mapping):
# InvalidValueError... so let those propagate.
raise
except Exception as exc:
six.raise_from(
InvalidValueError(
self.__class__, prop_name, reason=str(exc),
),
exc,
)
raise InvalidValueError(
self.__class__, prop_name, reason=str(exc),
) from exc
# interproperty constraint methods
@ -370,19 +366,8 @@ class _Observable(_STIXBase):
if json_serializable_object:
data = canonicalize(json_serializable_object, utf8=False)
# The situation is complicated w.r.t. python 2/3 behavior, so
# I'd rather not rely on particular exceptions being raised to
# determine what to do. Better to just check the python version
# directly.
if six.PY3:
uuid_ = uuid.uuid5(SCO_DET_ID_NAMESPACE, data)
else:
uuid_ = uuid.uuid5(
SCO_DET_ID_NAMESPACE, data.encode("utf-8"),
)
id_ = "{}--{}".format(self._type, six.text_type(uuid_))
uuid_ = uuid.uuid5(SCO_DET_ID_NAMESPACE, data)
id_ = "{}--{}".format(self._type, str(uuid_))
return id_
@ -448,7 +433,7 @@ def _make_json_serializable(value):
for v in value
]
elif not isinstance(value, (int, float, six.string_types, bool)):
elif not isinstance(value, (int, float, str, bool)):
# If a "simple" value which is not already JSON-serializable,
# JSON-serialize to a string and use that as our JSON-serializable
# value. This applies to our datetime objects currently (timestamp

View File

@ -20,12 +20,8 @@
# JCS compatible JSON serializer for Python 3.x #
#################################################
# This file has been modified to be compatible with Python 2.x as well
import re
import six
from stix2.canonicalization.NumberToJson import convert2Es6Format
try:
@ -55,10 +51,10 @@ ESCAPE_DCT = {
}
for i in range(0x20):
ESCAPE_DCT.setdefault(chr(i), '\\u{0:04x}'.format(i))
#ESCAPE_DCT.setdefault(chr(i), '\\u%04x' % (i,))
INFINITY = float('inf')
def py_encode_basestring(s):
"""Return a JSON representation of a Python string
@ -70,7 +66,6 @@ def py_encode_basestring(s):
encode_basestring = (c_encode_basestring or py_encode_basestring)
def py_encode_basestring_ascii(s):
"""Return an ASCII-only JSON representation of a Python string
@ -83,6 +78,7 @@ def py_encode_basestring_ascii(s):
n = ord(s)
if n < 0x10000:
return '\\u{0:04x}'.format(n)
#return '\\u%04x' % (n,)
else:
# surrogate pair
n -= 0x10000
@ -96,7 +92,6 @@ encode_basestring_ascii = (
c_encode_basestring_ascii or py_encode_basestring_ascii
)
class JSONEncoder(object):
"""Extensible JSON <http://json.org> encoder for Python data structures.
@ -128,11 +123,10 @@ class JSONEncoder(object):
"""
item_separator = ', '
key_separator = ': '
def __init__(
self, skipkeys=False, ensure_ascii=False,
self, *, skipkeys=False, ensure_ascii=False,
check_circular=True, allow_nan=True, sort_keys=True,
indent=None, separators=(',', ':'), default=None,
indent=None, separators=(',', ':'), default=None
):
"""Constructor for JSONEncoder, with sensible defaults.
@ -277,6 +271,7 @@ class JSONEncoder(object):
return text
if (
_one_shot and c_make_encoder is not None
and self.indent is None
@ -294,11 +289,10 @@ class JSONEncoder(object):
)
return _iterencode(o, 0)
def _make_iterencode(
markers, _default, _encoder, _indent, _floatstr,
_key_separator, _item_separator, _sort_keys, _skipkeys, _one_shot,
# HACK: hand-optimized bytecode; turn globals into locals
## HACK: hand-optimized bytecode; turn globals into locals
ValueError=ValueError,
dict=dict,
float=float,
@ -362,10 +356,7 @@ def _make_iterencode(
chunks = _iterencode_dict(value, _current_indent_level)
else:
chunks = _iterencode(value, _current_indent_level)
# Below line commented-out for python2 compatibility
# yield from chunks
for chunk in chunks:
yield chunk
yield from chunks
if newline_indent is not None:
_current_indent_level -= 1
yield '\n' + _indent * _current_indent_level
@ -397,8 +388,7 @@ def _make_iterencode(
else:
items = dct.items()
for key, value in items:
# Replaced isinstance(key, str) with below to enable simultaneous python 2 & 3 compatibility
if isinstance(key, six.string_types) or isinstance(key, six.binary_type):
if isinstance(key, str):
pass
# JavaScript is weakly typed for these, so it makes sense to
# also allow them. Many encoders seem to do something like this.
@ -445,10 +435,7 @@ def _make_iterencode(
chunks = _iterencode_dict(value, _current_indent_level)
else:
chunks = _iterencode(value, _current_indent_level)
# Below line commented-out for python2 compatibility
# yield from chunks
for chunk in chunks:
yield chunk
yield from chunks
if newline_indent is not None:
_current_indent_level -= 1
yield '\n' + _indent * _current_indent_level
@ -457,8 +444,7 @@ def _make_iterencode(
del markers[markerid]
def _iterencode(o, _current_indent_level):
# Replaced isinstance(o, str) with below to enable simultaneous python 2 & 3 compatibility
if isinstance(o, six.string_types) or isinstance(o, six.binary_type):
if isinstance(o, str):
yield _encoder(o)
elif o is None:
yield 'null'
@ -473,15 +459,9 @@ def _make_iterencode(
# see comment for int/float in _make_iterencode
yield convert2Es6Format(o)
elif isinstance(o, (list, tuple)):
# Below line commented-out for python2 compatibility
# yield from _iterencode_list(o, _current_indent_level)
for thing in _iterencode_list(o, _current_indent_level):
yield thing
yield from _iterencode_list(o, _current_indent_level)
elif isinstance(o, dict):
# Below line commented-out for python2 compatibility
# yield from _iterencode_dict(o, _current_indent_level)
for thing in _iterencode_dict(o, _current_indent_level):
yield thing
yield from _iterencode_dict(o, _current_indent_level)
else:
if markers is not None:
markerid = id(o)
@ -489,23 +469,18 @@ def _make_iterencode(
raise ValueError("Circular reference detected")
markers[markerid] = o
o = _default(o)
# Below line commented-out for python2 compatibility
# yield from _iterencode(o, _current_indent_level)
for thing in _iterencode(o, _current_indent_level):
yield thing
yield from _iterencode(o, _current_indent_level)
if markers is not None:
del markers[markerid]
return _iterencode
def canonicalize(obj, utf8=True):
def canonicalize(obj,utf8=True):
textVal = JSONEncoder(sort_keys=True).encode(obj)
if utf8:
return textVal.encode()
return textVal
def serialize(obj, utf8=True):
def serialize(obj,utf8=True):
textVal = JSONEncoder(sort_keys=False).encode(obj)
if utf8:
return textVal.encode()

View File

@ -21,40 +21,50 @@
# Convert a Python double/float into an ES6/V8 compatible string #
##################################################################
def convert2Es6Format(value):
# Convert double/float to str using the native Python formatter
# Convert double/float to str using the native Python formatter
fvalue = float(value)
# Zero is a special case. The following line takes "-0" case as well
#
# Zero is a special case. The following line takes "-0" case as well
#
if fvalue == 0:
return '0'
# The rest of the algorithm works on the textual representation only
#
# The rest of the algorithm works on the textual representation only
#
pyDouble = str(fvalue)
# The following line catches the "inf" and "nan" values returned by str(fvalue)
#
# The following line catches the "inf" and "nan" values returned by str(fvalue)
#
if pyDouble.find('n') >= 0:
raise ValueError("Invalid JSON number: " + pyDouble)
# Save sign separately, it doesn't have any role in the algorithm
#
# Save sign separately, it doesn't have any role in the algorithm
#
pySign = ''
if pyDouble.find('-') == 0:
pySign = '-'
pyDouble = pyDouble[1:]
# Now we should only have valid non-zero values
#
# Now we should only have valid non-zero values
#
pyExpStr = ''
pyExpVal = 0
q = pyDouble.find('e')
if q > 0:
# Grab the exponent and remove it from the number
#
# Grab the exponent and remove it from the number
#
pyExpStr = pyDouble[q:]
if pyExpStr[2:3] == '0':
# Supress leading zero on exponents
#
# Supress leading zero on exponents
#
pyExpStr = pyExpStr[:2] + pyExpStr[3:]
pyDouble = pyDouble[0:q]
pyExpVal = int(pyExpStr[1:])
# Split number in pyFirst + pyDot + pyLast
#
# Split number in pyFirst + pyDot + pyLast
#
pyFirst = pyDouble
pyDot = ''
pyLast = ''
@ -63,33 +73,40 @@ def convert2Es6Format(value):
pyDot = '.'
pyFirst = pyDouble[:q]
pyLast = pyDouble[q + 1:]
# Now the string is split into: pySign + pyFirst + pyDot + pyLast + pyExpStr
#
# Now the string is split into: pySign + pyFirst + pyDot + pyLast + pyExpStr
#
if pyLast == '0':
# Always remove trailing .0
#
# Always remove trailing .0
#
pyDot = ''
pyLast = ''
if pyExpVal > 0 and pyExpVal < 21:
# Integers are shown as is with up to 21 digits
#
# Integers are shown as is with up to 21 digits
#
pyFirst += pyLast
pyLast = ''
pyDot = ''
pyExpStr = ''
q = pyExpVal - len(pyFirst)
while q >= 0:
q -= 1
q -= 1;
pyFirst += '0'
elif pyExpVal < 0 and pyExpVal > -7:
# Small numbers are shown as 0.etc with e-6 as lower limit
#
# Small numbers are shown as 0.etc with e-6 as lower limit
#
pyLast = pyFirst + pyLast
pyFirst = '0'
pyDot = '.'
pyExpStr = ''
q = pyExpVal
while q < -1:
q += 1
q += 1;
pyLast = '0' + pyLast
# The resulting sub-strings are concatenated
#
# The resulting sub-strings are concatenated
#
return pySign + pyFirst + pyDot + pyLast + pyExpStr

View File

@ -1,7 +1,5 @@
from collections import OrderedDict
import six
from .base import _cls_init
from .registration import (
_get_extension_class, _register_extension, _register_marking,
@ -13,14 +11,11 @@ def _get_properties_dict(properties):
try:
return OrderedDict(properties)
except TypeError as e:
six.raise_from(
ValueError(
"properties must be dict-like, e.g. a list "
"containing tuples. For example, "
"[('property1', IntegerProperty())]",
),
e,
)
raise ValueError(
"properties must be dict-like, e.g. a list "
"containing tuples. For example, "
"[('property1', IntegerProperty())]",
) from e
def _custom_object_builder(cls, type, properties, version, base_class):

View File

@ -15,8 +15,6 @@ Python STIX2 DataStore API.
from abc import ABCMeta, abstractmethod
import uuid
from six import with_metaclass
from stix2.datastore.filters import Filter, FilterSet
from stix2.utils import deduplicate
@ -219,7 +217,7 @@ class DataStoreMixin(object):
raise AttributeError(msg % self.__class__.__name__)
class DataSink(with_metaclass(ABCMeta)):
class DataSink(metaclass=ABCMeta):
"""An implementer will create a concrete subclass from
this class for the specific DataSink.
@ -245,7 +243,7 @@ class DataSink(with_metaclass(ABCMeta)):
"""
class DataSource(with_metaclass(ABCMeta)):
class DataSource(metaclass=ABCMeta):
"""An implementer will create a concrete subclass from
this class for the specific DataSource.

View File

@ -6,8 +6,6 @@ import os
import re
import stat
import six
from stix2 import v20, v21
from stix2.base import _STIXBase
from stix2.datastore import (
@ -116,7 +114,7 @@ def _update_allow(allow_set, value):
"""
adding_seq = hasattr(value, "__iter__") and \
not isinstance(value, six.string_types)
not isinstance(value, str)
if allow_set is None:
allow_set = set()

View File

@ -3,8 +3,6 @@
import collections
from datetime import datetime
import six
import stix2.utils
"""Supported filter operations"""
@ -12,8 +10,7 @@ FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=', 'contains']
"""Supported filter value types"""
FILTER_VALUE_TYPES = (
bool, dict, float, int, list, tuple, six.string_types,
datetime,
bool, dict, float, int, list, tuple, str, datetime,
)
@ -84,7 +81,7 @@ class Filter(collections.namedtuple('Filter', ['property', 'op', 'value'])):
# If filtering on a timestamp property and the filter value is a string,
# try to convert the filter value to a datetime instance.
if isinstance(stix_obj_property, datetime) and \
isinstance(self.value, six.string_types):
isinstance(self.value, str):
filter_value = stix2.utils.parse_into_datetime(self.value)
else:
filter_value = self.value

View File

@ -2,18 +2,10 @@
import copy
from .datastore import CompositeDataSource, DataStoreMixin
from .equivalence.graph import graphically_equivalent
from .equivalence.object import ( # noqa: F401
WEIGHTS, check_property_present, custom_pattern_based, exact_match,
list_reference_check, partial_external_reference_based, partial_list_based,
partial_location_distance, partial_string_based, partial_timestamp_based,
reference_check, semantically_equivalent,
)
from .equivalence.graph import graph_equivalence, graph_similarity
from .equivalence.object import object_equivalence, object_similarity
from .parsing import parse as _parse
# TODO: Remove all unused imports that now belong to the equivalence module in the next major release.
# Kept for backwards compatibility.
class ObjectFactory(object):
"""Easily create STIX objects with default values for certain properties.
@ -197,9 +189,8 @@ class Environment(DataStoreMixin):
return None
@staticmethod
def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
"""This method verifies if two objects of the same type are
semantically equivalent.
def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
"""This method returns a measure of how similar the two objects are.
Args:
obj1: A stix2 object instance
@ -207,13 +198,13 @@ class Environment(DataStoreMixin):
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the semantic equivalence process
in the similarity process
Returns:
float: A number between 0.0 and 100.0 as a measurement of equivalence.
float: A number between 0.0 and 100.0 as a measurement of similarity.
Warning:
Object types need to have property weights defined for the equivalence process.
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.object` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
@ -229,14 +220,54 @@ class Environment(DataStoreMixin):
see `the Committee Note <link here>`__.
"""
return semantically_equivalent(obj1, obj2, prop_scores, **weight_dict)
return object_similarity(obj1, obj2, prop_scores, **weight_dict)
@staticmethod
def graphically_equivalent(ds1, ds2, prop_scores={}, **weight_dict):
"""This method verifies if two graphs are semantically equivalent.
def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict):
"""This method returns a true/false value if two objects are semantically equivalent.
Internally, it calls the object_similarity function and compares it against the given
threshold value.
Args:
obj1: A stix2 object instance
obj2: A stix2 object instance
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both objects equivalent. This
value can be tuned.
weight_dict: A dictionary that can be used to override settings
in the similarity process
Returns:
bool: True if the result of the object similarity is greater than or equal to
the threshold value. False otherwise.
Warning:
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.object` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weight_dict:
.. include:: ../object_default_sem_eq_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
return object_equivalence(obj1, obj2, prop_scores, threshold, **weight_dict)
@staticmethod
def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
"""This method returns a similarity score for two given graphs.
Each DataStore can contain a connected or disconnected graph and the
final result is weighted over the amount of objects we managed to compare.
This approach builds on top of the object-based semantic equivalence process
This approach builds on top of the object-based similarity process
and each comparison can return a value between 0 and 100.
Args:
@ -245,13 +276,13 @@ class Environment(DataStoreMixin):
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the semantic equivalence process
in the similarity process
Returns:
float: A number between 0.0 and 100.0 as a measurement of equivalence.
float: A number between 0.0 and 100.0 as a measurement of similarity.
Warning:
Object types need to have property weights defined for the equivalence process.
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.graph` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
@ -267,4 +298,44 @@ class Environment(DataStoreMixin):
see `the Committee Note <link here>`__.
"""
return graphically_equivalent(ds1, ds2, prop_scores, **weight_dict)
return graph_similarity(ds1, ds2, prop_scores, **weight_dict)
@staticmethod
def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict):
"""This method returns a true/false value if two graphs are semantically equivalent.
Internally, it calls the graph_similarity function and compares it against the given
threshold value.
Args:
ds1: A DataStore object instance representing your graph
ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both graphs equivalent. This
value can be tuned.
weight_dict: A dictionary that can be used to override settings
in the similarity process
Returns:
bool: True if the result of the graph similarity is greater than or equal to
the threshold value. False otherwise.
Warning:
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.graph` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weight_dict:
.. include:: ../graph_default_sem_eq_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
return graph_equivalence(ds1, ds2, prop_scores, threshold, **weight_dict)

View File

@ -1,4 +1,4 @@
"""Python APIs for STIX 2 Semantic Equivalence.
"""Python APIs for STIX 2 Semantic Equivalence and Similarity.
.. autosummary::
:toctree: equivalence

View File

@ -1,41 +1,44 @@
"""Python APIs for STIX 2 Graph-based Semantic Equivalence."""
"""Python APIs for STIX 2 Graph-based Semantic Equivalence and Similarity."""
import logging
from ..object import (
WEIGHTS, exact_match, list_reference_check, partial_string_based,
partial_timestamp_based, reference_check, semantically_equivalent,
WEIGHTS, _bucket_per_type, _object_pairs, exact_match,
list_reference_check, object_similarity, partial_string_based,
partial_timestamp_based, reference_check,
)
logger = logging.getLogger(__name__)
def graphically_equivalent(ds1, ds2, prop_scores={}, **weight_dict):
"""This method verifies if two graphs are semantically equivalent.
Each DataStore can contain a connected or disconnected graph and the
final result is weighted over the amount of objects we managed to compare.
This approach builds on top of the object-based semantic equivalence process
and each comparison can return a value between 0 and 100.
def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict):
"""This method returns a true/false value if two graphs are semantically equivalent.
Internally, it calls the graph_similarity function and compares it against the given
threshold value.
Args:
ds1: A DataStore object instance representing your graph
ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both graphs equivalent. This
value can be tuned.
weight_dict: A dictionary that can be used to override settings
in the semantic equivalence process
in the similarity process
Returns:
float: A number between 0.0 and 100.0 as a measurement of equivalence.
bool: True if the result of the graph similarity is greater than or equal to
the threshold value. False otherwise.
Warning:
Object types need to have property weights defined for the equivalence process.
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.graph` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weights_dict:
Default weight_dict:
.. include:: ../../graph_default_sem_eq_weights.rst
@ -44,63 +47,103 @@ def graphically_equivalent(ds1, ds2, prop_scores={}, **weight_dict):
see `the Committee Note <link here>`__.
"""
similarity_result = graph_similarity(ds1, ds2, prop_scores, **weight_dict)
if similarity_result >= threshold:
return True
return False
def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
"""This method returns a similarity score for two given graphs.
Each DataStore can contain a connected or disconnected graph and the
final result is weighted over the amount of objects we managed to compare.
This approach builds on top of the object-based similarity process
and each comparison can return a value between 0 and 100.
Args:
ds1: A DataStore object instance representing your graph
ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the similarity process
Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity.
Warning:
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.graph` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weight_dict:
.. include:: ../../graph_default_sem_eq_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
results = {}
similarity_score = 0
weights = GRAPH_WEIGHTS.copy()
if weight_dict:
weights.update(weight_dict)
results = {}
depth = weights["_internal"]["max_depth"]
if weights["_internal"]["max_depth"] <= 0:
raise ValueError("weight_dict['_internal']['max_depth'] must be greater than 0")
graph1 = ds1.query([])
graph2 = ds2.query([])
pairs = _object_pairs(
_bucket_per_type(ds1.query([])),
_bucket_per_type(ds2.query([])),
weights,
)
graph1.sort(key=lambda x: x["type"])
graph2.sort(key=lambda x: x["type"])
weights["_internal"]["ds1"] = ds1
weights["_internal"]["ds2"] = ds2
if len(graph1) < len(graph2):
weights["_internal"]["ds1"] = ds1
weights["_internal"]["ds2"] = ds2
g1 = graph1
g2 = graph2
else:
weights["_internal"]["ds1"] = ds2
weights["_internal"]["ds2"] = ds1
g1 = graph2
g2 = graph1
logger.debug("Starting graph similarity process between DataStores: '%s' and '%s'", ds1.id, ds2.id)
for object1, object2 in pairs:
iprop_score = {}
object1_id = object1["id"]
object2_id = object2["id"]
for object1 in g1:
for object2 in g2:
if object1["type"] == object2["type"] and object1["type"] in weights:
iprop_score = {}
result = semantically_equivalent(object1, object2, iprop_score, **weights)
objects1_id = object1["id"]
weights["_internal"]["max_depth"] = depth
result = object_similarity(object1, object2, iprop_score, **weights)
if objects1_id not in results:
results[objects1_id] = {"matched": object2["id"], "prop_score": iprop_score, "value": result}
elif result > results[objects1_id]["value"]:
results[objects1_id] = {"matched": object2["id"], "prop_score": iprop_score, "value": result}
if object1_id not in results:
results[object1_id] = {"lhs": object1_id, "rhs": object2_id, "prop_score": iprop_score, "value": result}
elif result > results[object1_id]["value"]:
results[object1_id] = {"lhs": object1_id, "rhs": object2_id, "prop_score": iprop_score, "value": result}
if object2_id not in results:
results[object2_id] = {"lhs": object2_id, "rhs": object1_id, "prop_score": iprop_score, "value": result}
elif result > results[object2_id]["value"]:
results[object2_id] = {"lhs": object2_id, "rhs": object1_id, "prop_score": iprop_score, "value": result}
equivalence_score = 0
matching_score = sum(x["value"] for x in results.values())
sum_weights = len(results) * 100.0
if sum_weights > 0:
equivalence_score = (matching_score / sum_weights) * 100
len_pairs = len(results)
if len_pairs > 0:
similarity_score = matching_score / len_pairs
prop_scores["matching_score"] = matching_score
prop_scores["sum_weights"] = sum_weights
prop_scores["len_pairs"] = len_pairs
prop_scores["summary"] = results
logger.debug(
"DONE\t\tSUM_WEIGHT: %.2f\tMATCHING_SCORE: %.2f\t SCORE: %.2f",
sum_weights,
"DONE\t\tLEN_PAIRS: %.2f\tMATCHING_SCORE: %.2f\t SIMILARITY_SCORE: %.2f",
len_pairs,
matching_score,
equivalence_score,
similarity_score,
)
return equivalence_score
return similarity_score
# default weights used for the graph semantic equivalence process
# default weights used for the graph similarity process
GRAPH_WEIGHTS = WEIGHTS.copy()
GRAPH_WEIGHTS.update({
"grouping": {

View File

@ -1,4 +1,6 @@
"""Python APIs for STIX 2 Object-based Semantic Equivalence."""
"""Python APIs for STIX 2 Object-based Semantic Equivalence and Similarity."""
import collections
import itertools
import logging
import time
@ -9,9 +11,52 @@ from ..pattern import equivalent_patterns
logger = logging.getLogger(__name__)
def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
"""This method verifies if two objects of the same type are
semantically equivalent.
def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict):
"""This method returns a true/false value if two objects are semantically equivalent.
Internally, it calls the object_similarity function and compares it against the given
threshold value.
Args:
obj1: A stix2 object instance
obj2: A stix2 object instance
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both objects equivalent. This
value can be tuned.
weight_dict: A dictionary that can be used to override settings
in the similarity process
Returns:
bool: True if the result of the object similarity is greater than or equal to
the threshold value. False otherwise.
Warning:
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.object` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weight_dict:
.. include:: ../../object_default_sem_eq_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
similarity_result = object_similarity(obj1, obj2, prop_scores, **weight_dict)
if similarity_result >= threshold:
return True
return False
def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
"""This method returns a measure of similarity depending on how
similar the two objects are.
Args:
obj1: A stix2 object instance
@ -19,20 +64,20 @@ def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the semantic equivalence process
in the similarity process
Returns:
float: A number between 0.0 and 100.0 as a measurement of equivalence.
float: A number between 0.0 and 100.0 as a measurement of similarity.
Warning:
Object types need to have property weights defined for the equivalence process.
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.object` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weights_dict:
Default weight_dict:
.. include:: ../../object_default_sem_eq_weights.rst
@ -58,13 +103,13 @@ def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
try:
weights[type1]
except KeyError:
logger.warning("'%s' type has no 'weights' dict specified & thus no semantic equivalence method to call!", type1)
logger.warning("'%s' type has no 'weights' dict specified & thus no object similarity method to call!", type1)
sum_weights = matching_score = 0
else:
try:
method = weights[type1]["method"]
except KeyError:
logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"])
logger.debug("Starting object similarity process between: '%s' and '%s'", obj1["id"], obj2["id"])
matching_score = 0.0
sum_weights = 0.0
@ -80,12 +125,13 @@ def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
contributing_score = w * comp_funct(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], threshold)
elif comp_funct == reference_check or comp_funct == list_reference_check:
max_depth = weights["_internal"]["max_depth"]
if max_depth < 0:
continue # prevent excessive recursion
if max_depth > 0:
weights["_internal"]["max_depth"] = max_depth - 1
ds1, ds2 = weights["_internal"]["ds1"], weights["_internal"]["ds2"]
contributing_score = w * comp_funct(obj1[prop], obj2[prop], ds1, ds2, **weights)
else:
weights["_internal"]["max_depth"] -= 1
ds1, ds2 = weights["_internal"]["ds1"], weights["_internal"]["ds2"]
contributing_score = w * comp_funct(obj1[prop], obj2[prop], ds1, ds2, **weights)
continue # prevent excessive recursion
weights["_internal"]["max_depth"] = max_depth
else:
contributing_score = w * comp_funct(obj1[prop], obj2[prop])
@ -102,7 +148,7 @@ def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
prop_scores["sum_weights"] = sum_weights
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
else:
logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"])
logger.debug("Starting object similarity process between: '%s' and '%s'", obj1["id"], obj2["id"])
try:
matching_score, sum_weights = method(obj1, obj2, prop_scores, **weights[type1])
except TypeError:
@ -304,19 +350,24 @@ def partial_location_distance(lat1, long1, lat2, long2, threshold):
def _versioned_checks(ref1, ref2, ds1, ds2, **weights):
"""Checks multiple object versions if present in graph.
Maximizes for the semantic equivalence score of a particular version."""
Maximizes for the similarity score of a particular version."""
results = {}
objects1 = ds1.query([Filter("id", "=", ref1)])
objects2 = ds2.query([Filter("id", "=", ref2)])
if len(objects1) > 0 and len(objects2) > 0:
for o1 in objects1:
for o2 in objects2:
result = semantically_equivalent(o1, o2, **weights)
if ref1 not in results:
results[ref1] = {"matched": ref2, "value": result}
elif result > results[ref1]["value"]:
results[ref1] = {"matched": ref2, "value": result}
pairs = _object_pairs(
_bucket_per_type(objects1),
_bucket_per_type(objects2),
weights,
)
for object1, object2 in pairs:
result = object_similarity(object1, object2, **weights)
if ref1 not in results:
results[ref1] = {"matched": ref2, "value": result}
elif result > results[ref1]["value"]:
results[ref1] = {"matched": ref2, "value": result}
result = results.get(ref1, {}).get("value", 0.0)
logger.debug(
"--\t\t_versioned_checks '%s' '%s'\tresult: '%s'",
@ -326,18 +377,18 @@ def _versioned_checks(ref1, ref2, ds1, ds2, **weights):
def reference_check(ref1, ref2, ds1, ds2, **weights):
"""For two references, de-reference the object and perform object-based
semantic equivalence. The score influences the result of an edge check."""
"""For two references, de-reference the object and perform object_similarity.
The score influences the result of an edge check."""
type1, type2 = ref1.split("--")[0], ref2.split("--")[0]
result = 0.0
if type1 == type2:
if type1 == type2 and type1 in weights:
if weights["_internal"]["versioning_checks"]:
result = _versioned_checks(ref1, ref2, ds1, ds2, **weights) / 100.0
else:
o1, o2 = ds1.get(ref1), ds2.get(ref2)
if o1 and o2:
result = semantically_equivalent(o1, o2, **weights) / 100.0
result = object_similarity(o1, o2, **weights) / 100.0
logger.debug(
"--\t\treference_check '%s' '%s'\tresult: '%s'",
@ -348,38 +399,35 @@ def reference_check(ref1, ref2, ds1, ds2, **weights):
def list_reference_check(refs1, refs2, ds1, ds2, **weights):
"""For objects that contain multiple references (i.e., object_refs) perform
the same de-reference procedure and perform object-based semantic equivalence.
the same de-reference procedure and perform object_similarity.
The score influences the objects containing these references. The result is
weighted on the amount of unique objects that could 1) be de-referenced 2) """
results = {}
if len(refs1) >= len(refs2):
l1 = refs1
l2 = refs2
b1 = ds1
b2 = ds2
else:
l1 = refs2
l2 = refs1
b1 = ds2
b2 = ds1
l1.sort()
l2.sort()
pairs = _object_pairs(
_bucket_per_type(refs1, "id-split"),
_bucket_per_type(refs2, "id-split"),
weights,
)
for ref1 in l1:
for ref2 in l2:
type1, type2 = ref1.split("--")[0], ref2.split("--")[0]
if type1 == type2:
score = reference_check(ref1, ref2, b1, b2, **weights) * 100.0
for ref1, ref2 in pairs:
type1, type2 = ref1.split("--")[0], ref2.split("--")[0]
if type1 == type2:
score = reference_check(ref1, ref2, ds1, ds2, **weights)
if ref1 not in results:
results[ref1] = {"matched": ref2, "value": score}
elif score > results[ref1]["value"]:
results[ref1] = {"matched": ref2, "value": score}
if ref1 not in results:
results[ref1] = {"matched": ref2, "value": score}
elif score > results[ref1]["value"]:
results[ref1] = {"matched": ref2, "value": score}
if ref2 not in results:
results[ref2] = {"matched": ref1, "value": score}
elif score > results[ref2]["value"]:
results[ref2] = {"matched": ref1, "value": score}
result = 0.0
total_sum = sum(x["value"] for x in results.values())
max_score = len(results) * 100.0
max_score = len(results)
if max_score > 0:
result = total_sum / max_score
@ -391,7 +439,34 @@ def list_reference_check(refs1, refs2, ds1, ds2, **weights):
return result
# default weights used for the semantic equivalence process
def _bucket_per_type(graph, mode="type"):
"""Given a list of objects or references, bucket them by type.
Depending on the list type: extract from 'type' property or using
the 'id'.
"""
buckets = collections.defaultdict(list)
if mode == "type":
[buckets[obj["type"]].append(obj) for obj in graph]
elif mode == "id-split":
[buckets[obj.split("--")[0]].append(obj) for obj in graph]
return buckets
def _object_pairs(graph1, graph2, weights):
"""Returns a generator with the product of the comparable
objects for the graph similarity process. It determines
objects in common between graphs and objects with weights.
"""
types_in_common = set(graph1.keys()).intersection(graph2.keys())
testable_types = types_in_common.intersection(weights.keys())
return itertools.chain.from_iterable(
itertools.product(graph1[stix_type], graph2[stix_type])
for stix_type in testable_types
)
# default weights used for the similarity process
WEIGHTS = {
"attack-pattern": {
"name": (30, partial_string_based),

View File

@ -2,8 +2,6 @@
import collections
import six
from stix2 import exceptions, utils
@ -129,7 +127,7 @@ def compress_markings(granular_markings):
{'marking_ref': item, 'selectors': sorted(selectors)}
if utils.is_marking(item) else
{'lang': item, 'selectors': sorted(selectors)}
for item, selectors in six.iteritems(map_)
for item, selectors in map_.items()
]
return compressed
@ -230,7 +228,7 @@ def iterpath(obj, path=None):
if path is None:
path = []
for varname, varobj in iter(sorted(six.iteritems(obj))):
for varname, varobj in iter(sorted(obj.items())):
path.append(varname)
yield (path, varobj)

View File

@ -3,7 +3,6 @@
import importlib
import inspect
from six import text_type
from stix2patterns.exceptions import ParseException
from stix2patterns.grammars.STIXPatternParser import TerminalNode
from stix2patterns.v20.grammars.STIXPatternParser import \
@ -263,7 +262,7 @@ class STIXPatternVisitorForSTIX2():
property_path.append(
self.instantiate(
"ListObjectPathComponent",
current.property_name if isinstance(current, BasicObjectPathComponent) else text_type(current),
current.property_name if isinstance(current, BasicObjectPathComponent) else str(current),
next.value,
),
)
@ -286,7 +285,7 @@ class STIXPatternVisitorForSTIX2():
if isinstance(first_component, TerminalNode):
step = first_component.getText()
else:
step = text_type(first_component)
step = str(first_component)
# if step.endswith("_ref"):
# return stix2.ReferenceObjectPathComponent(step)
# else:

View File

@ -5,8 +5,6 @@ import binascii
import datetime
import re
import six
from .utils import parse_into_datetime
@ -15,7 +13,7 @@ def escape_quotes_and_backslashes(s):
def quote_if_needed(x):
if isinstance(x, six.string_types):
if isinstance(x, str):
if x.find("-") != -1:
if not x.startswith("'"):
return "'" + x + "'"

View File

@ -7,8 +7,6 @@ import inspect
import re
import uuid
from six import string_types, text_type
from . import registry, version
from .base import _STIXBase
from .exceptions import (
@ -170,7 +168,7 @@ class Property(object):
if required and default:
raise STIXError(
"Cant't use 'required' and 'default' together. 'required'"
"Can't use 'required' and 'default' together. 'required'"
"really means 'the user must provide this.'",
)
@ -226,7 +224,7 @@ class ListProperty(Property):
except TypeError:
raise ValueError("must be an iterable.")
if isinstance(value, (_STIXBase, string_types)):
if isinstance(value, (_STIXBase, str)):
value = [value]
if isinstance(self.contained, Property):
@ -267,8 +265,8 @@ class StringProperty(Property):
super(StringProperty, self).__init__(**kwargs)
def clean(self, value):
if not isinstance(value, string_types):
return text_type(value)
if not isinstance(value, str):
return str(value)
return value
@ -621,7 +619,7 @@ class ObservableProperty(Property):
if dictified == {}:
raise ValueError("The observable property must contain a non-empty dictionary")
valid_refs = dict((k, v['type']) for (k, v) in dictified.items())
valid_refs = {k: v['type'] for (k, v) in dictified.items()}
for key, obj in dictified.items():
parsed_obj = parse_observable(
@ -689,8 +687,9 @@ class STIXObjectProperty(Property):
def clean(self, value):
# Any STIX Object (SDO, SRO, or Marking Definition) can be added to
# a bundle with no further checks.
stix2_classes = {'_DomainObject', '_RelationshipObject', 'MarkingDefinition'}
if any(
x in ('_DomainObject', '_RelationshipObject', 'MarkingDefinition')
x in stix2_classes
for x in get_class_hierarchy_names(value)
):
# A simple "is this a spec version 2.1+ object" test. For now,

View File

@ -128,18 +128,17 @@ def test_filter_value_type_check():
with pytest.raises(TypeError) as excinfo:
Filter('created', '=', object())
# On Python 2, the type of object() is `<type 'object'>` On Python 3, it's `<class 'object'>`.
assert any([s in str(excinfo.value) for s in ["<type 'object'>", "'<class 'object'>'"]])
assert "'<class 'object'>'" in str(excinfo.value)
assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
with pytest.raises(TypeError) as excinfo:
Filter("type", "=", complex(2, -1))
assert any([s in str(excinfo.value) for s in ["<type 'complex'>", "'<class 'complex'>'"]])
assert "'<class 'complex'>'" in str(excinfo.value)
assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
with pytest.raises(TypeError) as excinfo:
Filter("type", "=", set([16, 23]))
assert any([s in str(excinfo.value) for s in ["<type 'set'>", "'<class 'set'>'"]])
assert "'<class 'set'>'" in str(excinfo.value)
assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)

View File

@ -3,7 +3,6 @@ import json
from medallion.filters.basic_filter import BasicFilter
import pytest
from requests.models import Response
import six
from taxii2client.common import _filter_kwargs_to_query_params
from taxii2client.v20 import Collection
@ -27,7 +26,7 @@ class MockTAXIICollectionEndpoint(Collection):
def add_objects(self, bundle):
self._verify_can_write()
if isinstance(bundle, six.string_types):
if isinstance(bundle, str):
bundle = json.loads(bundle)
for object in bundle.get("objects", []):
self.objects.append(object)

View File

@ -1,3 +1,4 @@
import json
import os
import pytest
@ -67,6 +68,11 @@ def ds2():
yield stix2.MemoryStore(stix_objs)
@pytest.fixture
def fs():
yield stix2.FileSystemSource(FS_PATH)
def test_object_factory_created_by_ref_str():
factory = stix2.ObjectFactory(created_by_ref=IDENTITY_ID)
ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
@ -497,7 +503,20 @@ def test_list_semantic_check(ds, ds2):
assert round(score) == 1
def test_graph_equivalence_with_filesystem_source(ds):
def test_graph_similarity_raises_value_error(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": -1,
},
}
with pytest.raises(ValueError):
prop_scores1 = {}
stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
def test_graph_similarity_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
@ -505,12 +524,151 @@ def test_graph_equivalence_with_filesystem_source(ds):
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, **weights)
assert round(env1) == 25
assert round(prop_scores1["matching_score"]) == 451
assert round(prop_scores1["len_pairs"]) == 18
assert round(env2) == 25
assert round(prop_scores2["matching_score"]) == 451
assert round(prop_scores2["len_pairs"]) == 18
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_similarity_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {}
fs = stix2.FileSystemSource(FS_PATH)
env = stix2.Environment().graphically_equivalent(fs, ds, prop_scores, **weights)
assert round(env) == 28
assert round(prop_scores["matching_score"]) == 139
assert round(prop_scores["sum_weights"]) == 500
env = stix2.Environment().graph_similarity(ds, ds, prop_scores, **weights)
assert round(env) == 100
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_similarity_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_similarity_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_equivalence_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, **weights)
assert env1 is False
assert round(prop_scores1["matching_score"]) == 451
assert round(prop_scores1["len_pairs"]) == 18
assert env2 is False
assert round(prop_scores2["matching_score"]) == 451
assert round(prop_scores2["len_pairs"]) == 18
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_equivalence_with_duplicate_graph(ds):
@ -522,10 +680,10 @@ def test_graph_equivalence_with_duplicate_graph(ds):
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds, prop_scores, **weights)
assert round(env) == 100
env = stix2.Environment().graph_equivalence(ds, ds, prop_scores, **weights)
assert env is True
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["sum_weights"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_equivalence_with_versioning_check_on(ds2, ds):
@ -536,11 +694,31 @@ def test_graph_equivalence_with_versioning_check_on(ds2, ds):
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
assert round(env) == 93
assert round(prop_scores["matching_score"]) == 745
assert round(prop_scores["sum_weights"]) == 800
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
assert env2 is True
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_equivalence_with_versioning_check_off(ds2, ds):
@ -551,8 +729,28 @@ def test_graph_equivalence_with_versioning_check_off(ds2, ds):
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
assert round(env) == 93
assert round(prop_scores["matching_score"]) == 745
assert round(prop_scores["sum_weights"]) == 800
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
assert env2 is True
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)

View File

@ -146,18 +146,17 @@ def test_filter_value_type_check():
with pytest.raises(TypeError) as excinfo:
Filter('created', '=', object())
# On Python 2, the type of object() is `<type 'object'>` On Python 3, it's `<class 'object'>`.
assert any([s in str(excinfo.value) for s in ["<type 'object'>", "'<class 'object'>'"]])
assert "'<class 'object'>'" in str(excinfo.value)
assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
with pytest.raises(TypeError) as excinfo:
Filter("type", "=", complex(2, -1))
assert any([s in str(excinfo.value) for s in ["<type 'complex'>", "'<class 'complex'>'"]])
assert "'<class 'complex'>'" in str(excinfo.value)
assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
with pytest.raises(TypeError) as excinfo:
Filter("type", "=", set([16, 23]))
assert any([s in str(excinfo.value) for s in ["<type 'set'>", "'<class 'set'>'"]])
assert "'<class 'set'>'" in str(excinfo.value)
assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)

View File

@ -3,7 +3,6 @@ import json
from medallion.filters.basic_filter import BasicFilter
import pytest
from requests.models import Response
import six
from taxii2client.common import _filter_kwargs_to_query_params
from taxii2client.v21 import Collection
@ -27,7 +26,7 @@ class MockTAXIICollectionEndpoint(Collection):
def add_objects(self, bundle):
self._verify_can_write()
if isinstance(bundle, six.string_types):
if isinstance(bundle, str):
bundle = json.loads(bundle)
for object in bundle.get("objects", []):
self.objects.append(object)

View File

@ -3,7 +3,6 @@ import datetime
import uuid
import pytest
import six
import stix2.base
import stix2.canonicalization.Canonicalize
@ -31,12 +30,7 @@ def _make_uuid5(name):
"""
Make a STIX 2.1+ compliant UUIDv5 from a "name".
"""
if six.PY3:
uuid_ = uuid.uuid5(SCO_DET_ID_NAMESPACE, name)
else:
uuid_ = uuid.uuid5(
SCO_DET_ID_NAMESPACE, name.encode("utf-8"),
)
uuid_ = uuid.uuid5(SCO_DET_ID_NAMESPACE, name)
return uuid_

View File

@ -1,3 +1,4 @@
import json
import os
import pytest
@ -37,7 +38,7 @@ def ds():
@pytest.fixture
def ds2():
def ds2_objects():
cam = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
idy = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
ind = stix2.v21.Indicator(id=INDICATOR_ID, created_by_ref=idy.id, **INDICATOR_KWARGS)
@ -68,7 +69,17 @@ def ds2():
published="2021-04-09T08:22:22Z", object_refs=stix_objs,
)
stix_objs.append(reprt)
yield stix2.MemoryStore(stix_objs)
yield stix_objs
@pytest.fixture
def ds2(ds2_objects):
yield stix2.MemoryStore(ds2_objects)
@pytest.fixture
def fs():
yield stix2.FileSystemSource(FS_PATH)
def test_object_factory_created_by_ref_str():
@ -426,14 +437,14 @@ def test_related_to_by_target(ds):
assert any(x['id'] == INDICATOR_ID for x in resp)
def test_semantic_equivalence_on_same_attack_pattern1():
def test_object_similarity_on_same_attack_pattern1():
ap1 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS)
ap2 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS)
env = stix2.Environment().semantically_equivalent(ap1, ap2)
env = stix2.Environment().object_similarity(ap1, ap2)
assert round(env) == 100
def test_semantic_equivalence_on_same_attack_pattern2():
def test_object_similarity_on_same_attack_pattern2():
ATTACK_KWARGS = dict(
name="Phishing",
external_references=[
@ -445,18 +456,18 @@ def test_semantic_equivalence_on_same_attack_pattern2():
)
ap1 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_KWARGS)
ap2 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_KWARGS)
env = stix2.Environment().semantically_equivalent(ap1, ap2)
env = stix2.Environment().object_similarity(ap1, ap2)
assert round(env) == 100
def test_semantic_equivalence_on_same_campaign1():
def test_object_similarity_on_same_campaign1():
camp1 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
camp2 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
env = stix2.Environment().semantically_equivalent(camp1, camp2)
env = stix2.Environment().object_similarity(camp1, camp2)
assert round(env) == 100
def test_semantic_equivalence_on_same_campaign2():
def test_object_similarity_on_same_campaign2():
CAMP_KWARGS = dict(
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector.",
@ -464,18 +475,18 @@ def test_semantic_equivalence_on_same_campaign2():
)
camp1 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMP_KWARGS)
camp2 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMP_KWARGS)
env = stix2.Environment().semantically_equivalent(camp1, camp2)
env = stix2.Environment().object_similarity(camp1, camp2)
assert round(env) == 100
def test_semantic_equivalence_on_same_identity1():
def test_object_similarity_on_same_identity1():
iden1 = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
iden2 = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
env = stix2.Environment().semantically_equivalent(iden1, iden2)
env = stix2.Environment().object_similarity(iden1, iden2)
assert round(env) == 100
def test_semantic_equivalence_on_same_identity2():
def test_object_similarity_on_same_identity2():
IDEN_KWARGS = dict(
name="John Smith",
identity_class="individual",
@ -483,26 +494,26 @@ def test_semantic_equivalence_on_same_identity2():
)
iden1 = stix2.v21.Identity(id=IDENTITY_ID, **IDEN_KWARGS)
iden2 = stix2.v21.Identity(id=IDENTITY_ID, **IDEN_KWARGS)
env = stix2.Environment().semantically_equivalent(iden1, iden2)
env = stix2.Environment().object_similarity(iden1, iden2)
assert round(env) == 100
def test_semantic_equivalence_on_same_indicator():
def test_object_similarity_on_same_indicator():
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
env = stix2.Environment().semantically_equivalent(ind1, ind2)
env = stix2.Environment().object_similarity(ind1, ind2)
assert round(env) == 100
def test_semantic_equivalence_on_same_location1():
def test_object_similarity_on_same_location1():
location_kwargs = dict(latitude=45, longitude=179)
loc1 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
loc2 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
env = stix2.Environment().semantically_equivalent(loc1, loc2)
env = stix2.Environment().object_similarity(loc1, loc2)
assert round(env) == 100
def test_semantic_equivalence_on_same_location2():
def test_object_similarity_on_same_location2():
location_kwargs = dict(
latitude=38.889,
longitude=-77.023,
@ -511,33 +522,33 @@ def test_semantic_equivalence_on_same_location2():
)
loc1 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
loc2 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
env = stix2.Environment().semantically_equivalent(loc1, loc2)
env = stix2.Environment().object_similarity(loc1, loc2)
assert round(env) == 100
def test_semantic_equivalence_location_with_no_latlong():
def test_object_similarity_location_with_no_latlong():
loc_kwargs = dict(country="US", administrative_area="US-DC")
loc1 = stix2.v21.Location(id=LOCATION_ID, **LOCATION_KWARGS)
loc2 = stix2.v21.Location(id=LOCATION_ID, **loc_kwargs)
env = stix2.Environment().semantically_equivalent(loc1, loc2)
env = stix2.Environment().object_similarity(loc1, loc2)
assert round(env) != 100
def test_semantic_equivalence_on_same_malware():
def test_object_similarity_on_same_malware():
malw1 = stix2.v21.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
malw2 = stix2.v21.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
env = stix2.Environment().semantically_equivalent(malw1, malw2)
env = stix2.Environment().object_similarity(malw1, malw2)
assert round(env) == 100
def test_semantic_equivalence_on_same_threat_actor1():
def test_object_similarity_on_same_threat_actor1():
ta1 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS)
ta2 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS)
env = stix2.Environment().semantically_equivalent(ta1, ta2)
env = stix2.Environment().object_similarity(ta1, ta2)
assert round(env) == 100
def test_semantic_equivalence_on_same_threat_actor2():
def test_object_similarity_on_same_threat_actor2():
THREAT_KWARGS = dict(
threat_actor_types=["crime-syndicate"],
aliases=["super-evil"],
@ -545,25 +556,38 @@ def test_semantic_equivalence_on_same_threat_actor2():
)
ta1 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_KWARGS)
ta2 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_KWARGS)
env = stix2.Environment().semantically_equivalent(ta1, ta2)
env = stix2.Environment().object_similarity(ta1, ta2)
assert round(env) == 100
def test_semantic_equivalence_on_same_tool():
def test_object_similarity_on_same_tool():
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
env = stix2.Environment().semantically_equivalent(tool1, tool2)
env = stix2.Environment().object_similarity(tool1, tool2)
assert round(env) == 100
def test_semantic_equivalence_on_same_vulnerability1():
def test_object_similarity_on_same_vulnerability1():
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
env = stix2.Environment().semantically_equivalent(vul1, vul2)
prop_scores = {}
env = stix2.Environment().object_similarity(vul1, vul2, prop_scores)
assert round(env) == 100
assert round(prop_scores["matching_score"]) == 30
assert round(prop_scores["sum_weights"]) == 30
def test_semantic_equivalence_on_same_vulnerability2():
def test_object_equivalence_on_same_vulnerability1():
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
prop_scores = {}
env = stix2.Environment().object_equivalence(vul1, vul2, prop_scores)
assert env is True
assert round(prop_scores["matching_score"]) == 30
assert round(prop_scores["sum_weights"]) == 30
def test_object_similarity_on_same_vulnerability2():
VULN_KWARGS1 = dict(
name="Heartbleed",
external_references=[
@ -584,11 +608,42 @@ def test_semantic_equivalence_on_same_vulnerability2():
)
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS1)
vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS2)
env = stix2.Environment().semantically_equivalent(vul1, vul2)
prop_scores = {}
env = stix2.Environment().object_similarity(vul1, vul2, prop_scores)
assert round(env) == 0.0
assert round(prop_scores["matching_score"]) == 0
assert round(prop_scores["sum_weights"]) == 100
def test_semantic_equivalence_on_unknown_object():
def test_object_equivalence_on_same_vulnerability2():
VULN_KWARGS1 = dict(
name="Heartbleed",
external_references=[
{
"url": "https://example",
"source_name": "some-source",
},
],
)
VULN_KWARGS2 = dict(
name="Foo",
external_references=[
{
"url": "https://example2",
"source_name": "some-source2",
},
],
)
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS1)
vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS2)
prop_scores = {}
env = stix2.Environment().object_equivalence(vul1, vul2, prop_scores)
assert env is False
assert round(prop_scores["matching_score"]) == 0
assert round(prop_scores["sum_weights"]) == 100
def test_object_similarity_on_unknown_object():
CUSTOM_KWARGS1 = dict(
type="x-foobar",
id="x-foobar--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061",
@ -615,17 +670,17 @@ def test_semantic_equivalence_on_unknown_object():
def _x_foobar_checks(obj1, obj2, **weights):
matching_score = 0.0
sum_weights = 0.0
if stix2.environment.check_property_present("external_references", obj1, obj2):
if stix2.equivalence.object.check_property_present("external_references", obj1, obj2):
w = weights["external_references"]
sum_weights += w
matching_score += w * stix2.environment.partial_external_reference_based(
matching_score += w * stix2.equivalence.object.partial_external_reference_based(
obj1["external_references"],
obj2["external_references"],
)
if stix2.environment.check_property_present("name", obj1, obj2):
if stix2.equivalence.object.check_property_present("name", obj1, obj2):
w = weights["name"]
sum_weights += w
matching_score += w * stix2.environment.partial_string_based(obj1["name"], obj2["name"])
matching_score += w * stix2.equivalence.object.partial_string_based(obj1["name"], obj2["name"])
return matching_score, sum_weights
weights = {
@ -640,20 +695,20 @@ def test_semantic_equivalence_on_unknown_object():
}
cust1 = stix2.parse(CUSTOM_KWARGS1, allow_custom=True)
cust2 = stix2.parse(CUSTOM_KWARGS2, allow_custom=True)
env = stix2.Environment().semantically_equivalent(cust1, cust2, **weights)
env = stix2.Environment().object_similarity(cust1, cust2, **weights)
assert round(env) == 0
def test_semantic_equivalence_different_type_raises():
def test_object_similarity_different_type_raises():
with pytest.raises(ValueError) as excinfo:
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
stix2.Environment().semantically_equivalent(vul1, ind1)
stix2.Environment().object_similarity(vul1, ind1)
assert str(excinfo.value) == "The objects to compare must be of the same type!"
def test_semantic_equivalence_different_spec_version_raises():
def test_object_similarity_different_spec_version_raises():
with pytest.raises(ValueError) as excinfo:
V20_KWARGS = dict(
labels=['malicious-activity'],
@ -661,23 +716,24 @@ def test_semantic_equivalence_different_spec_version_raises():
)
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **V20_KWARGS)
stix2.Environment().semantically_equivalent(ind1, ind2)
stix2.Environment().object_similarity(ind1, ind2)
assert str(excinfo.value) == "The objects to compare must be of the same spec version!"
def test_semantic_equivalence_zero_match():
def test_object_similarity_zero_match():
IND_KWARGS = dict(
indicator_types=["APTX"],
indicator_types=["malicious-activity", "bar"],
pattern="[ipv4-addr:value = '192.168.1.1']",
pattern_type="stix",
valid_from="2019-01-01T12:34:56Z",
labels=["APTX", "foo"],
)
weights = {
"indicator": {
"indicator_types": (15, stix2.environment.partial_list_based),
"pattern": (80, stix2.environment.custom_pattern_based),
"valid_from": (5, stix2.environment.partial_timestamp_based),
"indicator_types": (15, stix2.equivalence.object.partial_list_based),
"pattern": (80, stix2.equivalence.object.custom_pattern_based),
"valid_from": (5, stix2.equivalence.object.partial_timestamp_based),
"tdelta": 1, # One day interval
},
"_internal": {
@ -686,20 +742,22 @@ def test_semantic_equivalence_zero_match():
}
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v21.Indicator(id=INDICATOR_ID, **IND_KWARGS)
env = stix2.Environment().semantically_equivalent(ind1, ind2, **weights)
assert round(env) == 0
env = stix2.Environment().object_similarity(ind1, ind2, **weights)
assert round(env) == 8
env = stix2.Environment().object_similarity(ind2, ind1, **weights)
assert round(env) == 8
def test_semantic_equivalence_different_spec_version():
def test_object_similarity_different_spec_version():
IND_KWARGS = dict(
labels=["APTX"],
pattern="[ipv4-addr:value = '192.168.1.1']",
)
weights = {
"indicator": {
"indicator_types": (15, stix2.environment.partial_list_based),
"pattern": (80, stix2.environment.custom_pattern_based),
"valid_from": (5, stix2.environment.partial_timestamp_based),
"indicator_types": (15, stix2.equivalence.object.partial_list_based),
"pattern": (80, stix2.equivalence.object.custom_pattern_based),
"valid_from": (5, stix2.equivalence.object.partial_timestamp_based),
"tdelta": 1, # One day interval
},
"_internal": {
@ -708,7 +766,10 @@ def test_semantic_equivalence_different_spec_version():
}
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **IND_KWARGS)
env = stix2.Environment().semantically_equivalent(ind1, ind2, **weights)
env = stix2.Environment().object_similarity(ind1, ind2, **weights)
assert round(env) == 0
env = stix2.Environment().object_similarity(ind2, ind1, **weights)
assert round(env) == 0
@ -780,34 +841,34 @@ def test_semantic_equivalence_different_spec_version():
),
],
)
def test_semantic_equivalence_external_references(refs1, refs2, ret_val):
value = stix2.environment.partial_external_reference_based(refs1, refs2)
def test_object_similarity_external_references(refs1, refs2, ret_val):
value = stix2.equivalence.object.partial_external_reference_based(refs1, refs2)
assert value == ret_val
def test_semantic_equivalence_timestamp():
def test_object_similarity_timestamp():
t1 = "2018-10-17T00:14:20.652Z"
t2 = "2018-10-17T12:14:20.652Z"
assert stix2.environment.partial_timestamp_based(t1, t2, 1) == 0.5
assert stix2.equivalence.object.partial_timestamp_based(t1, t2, 1) == 0.5
def test_semantic_equivalence_exact_match():
def test_object_similarity_exact_match():
t1 = "2018-10-17T00:14:20.652Z"
t2 = "2018-10-17T12:14:20.652Z"
assert stix2.environment.exact_match(t1, t2) == 0.0
assert stix2.equivalence.object.exact_match(t1, t2) == 0.0
def test_non_existent_config_for_object():
r1 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
r2 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
assert stix2.Environment().semantically_equivalent(r1, r2) == 0.0
assert stix2.Environment().object_similarity(r1, r2) == 0.0
def custom_semantic_equivalence_method(obj1, obj2, **weights):
return 96.0, 100.0
def test_semantic_equivalence_method_provided():
def test_object_similarity_method_provided():
# Because `method` is provided, `partial_list_based` will be ignored
TOOL2_KWARGS = dict(
name="Random Software",
@ -816,19 +877,19 @@ def test_semantic_equivalence_method_provided():
weights = {
"tool": {
"tool_types": (20, stix2.environment.partial_list_based),
"name": (80, stix2.environment.partial_string_based),
"tool_types": (20, stix2.equivalence.object.partial_list_based),
"name": (80, stix2.equivalence.object.partial_string_based),
"method": custom_semantic_equivalence_method,
},
}
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
env = stix2.Environment().semantically_equivalent(tool1, tool2, **weights)
env = stix2.Environment().object_similarity(tool1, tool2, **weights)
assert round(env) == 96
def test_semantic_equivalence_prop_scores():
def test_object_similarity_prop_scores():
TOOL2_KWARGS = dict(
name="Random Software",
tool_types=["information-gathering"],
@ -838,7 +899,7 @@ def test_semantic_equivalence_prop_scores():
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
stix2.Environment().semantically_equivalent(tool1, tool2, prop_scores)
stix2.Environment().object_similarity(tool1, tool2, prop_scores)
assert len(prop_scores) == 4
assert round(prop_scores["matching_score"], 1) == 8.9
assert round(prop_scores["sum_weights"], 1) == 100.0
@ -850,7 +911,7 @@ def custom_semantic_equivalence_method_prop_scores(obj1, obj2, prop_scores, **we
return 96.0, 100.0
def test_semantic_equivalence_prop_scores_method_provided():
def test_object_similarity_prop_scores_method_provided():
TOOL2_KWARGS = dict(
name="Random Software",
tool_types=["information-gathering"],
@ -868,7 +929,7 @@ def test_semantic_equivalence_prop_scores_method_provided():
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
env = stix2.Environment().semantically_equivalent(tool1, tool2, prop_scores, **weights)
env = stix2.Environment().object_similarity(tool1, tool2, prop_scores, **weights)
assert round(env) == 96
assert len(prop_scores) == 2
assert prop_scores["matching_score"] == 96.0
@ -955,8 +1016,30 @@ def test_list_semantic_check(ds, ds2):
)
assert round(score) == 1
score = stix2.equivalence.object.list_reference_check(
object_refs2,
object_refs1,
ds2,
ds,
**weights,
)
assert round(score) == 1
def test_graph_equivalence_with_filesystem_source(ds):
def test_graph_similarity_raises_value_error(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": -1,
},
}
with pytest.raises(ValueError):
prop_scores1 = {}
stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
def test_graph_similarity_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
@ -964,12 +1047,257 @@ def test_graph_equivalence_with_filesystem_source(ds):
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, **weights)
assert round(env1) == 23
assert round(prop_scores1["matching_score"]) == 411
assert round(prop_scores1["len_pairs"]) == 18
assert round(env2) == 23
assert round(prop_scores2["matching_score"]) == 411
assert round(prop_scores2["len_pairs"]) == 18
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_depth_limiting():
g1 = [
{
"type": "foo",
"id": "foo--07f9dd2a-1cce-45bb-8cbe-dba3f007aafd",
"spec_version": "2.1",
"created": "1986-02-08T00:20:17Z",
"modified": "1989-12-11T06:54:29Z",
"some1_ref": "foo--700a8a3c-9936-412f-b4eb-ede466476180",
"some2_ref": "foo--f4a999a3-df94-499d-9cac-6c02e21775ee",
},
{
"type": "foo",
"id": "foo--700a8a3c-9936-412f-b4eb-ede466476180",
"spec_version": "2.1",
"created": "1989-01-06T10:31:54Z",
"modified": "1995-06-18T10:25:01Z",
"some1_ref": "foo--705afd45-eb56-43fc-a214-313d63d199a3",
},
{
"type": "foo",
"id": "foo--705afd45-eb56-43fc-a214-313d63d199a3",
"spec_version": "2.1",
"created": "1977-11-06T21:19:29Z",
"modified": "1997-12-02T20:33:34Z",
},
{
"type": "foo",
"id": "foo--f4a999a3-df94-499d-9cac-6c02e21775ee",
"spec_version": "2.1",
"created": "1991-09-17T00:40:52Z",
"modified": "1992-12-06T11:02:47Z",
"name": "alice",
},
]
g2 = [
{
"type": "foo",
"id": "foo--71570479-3e6e-48d2-81fb-897454dec55d",
"spec_version": "2.1",
"created": "1975-12-22T05:20:38Z",
"modified": "1980-11-11T01:09:03Z",
"some1_ref": "foo--4aeda39b-31fa-4ffb-a847-d8edc175a579",
"some2_ref": "foo--941e48d6-3100-4419-9e8c-cf1eb59e71b2",
},
{
"type": "foo",
"id": "foo--4aeda39b-31fa-4ffb-a847-d8edc175a579",
"spec_version": "2.1",
"created": "1976-01-05T08:32:03Z",
"modified": "1980-11-09T05:41:02Z",
"some1_ref": "foo--689252c3-5d20-43ff-bbf7-c8e45d713768",
},
{
"type": "foo",
"id": "foo--689252c3-5d20-43ff-bbf7-c8e45d713768",
"spec_version": "2.1",
"created": "1974-09-11T18:56:30Z",
"modified": "1976-10-31T11:59:43Z",
},
{
"type": "foo",
"id": "foo--941e48d6-3100-4419-9e8c-cf1eb59e71b2",
"spec_version": "2.1",
"created": "1985-01-03T01:07:03Z",
"modified": "1992-07-20T21:32:31Z",
"name": "alice",
},
]
mem_store1 = stix2.MemorySource(g1)
mem_store2 = stix2.MemorySource(g2)
custom_weights = {
"foo": {
"some1_ref": (33, stix2.equivalence.object.reference_check),
"some2_ref": (33, stix2.equivalence.object.reference_check),
"name": (34, stix2.equivalence.object.partial_string_based),
},
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.equivalence.graph.graph_similarity(mem_store1, mem_store2, prop_scores1, **custom_weights)
assert round(env1) == 38
assert round(prop_scores1["matching_score"]) == 300
assert round(prop_scores1["len_pairs"]) == 8
# from 'alice' check in de-reference
assert prop_scores1['summary']['foo--71570479-3e6e-48d2-81fb-897454dec55d']['prop_score']['some2_ref']['weight'] == 33
assert prop_scores1['summary']['foo--07f9dd2a-1cce-45bb-8cbe-dba3f007aafd']['prop_score']['some2_ref']['weight'] == 33
# Switching parameters
prop_scores2 = {}
env2 = stix2.equivalence.graph.graph_similarity(
mem_store2, mem_store1, prop_scores2, **custom_weights
)
assert round(env2) == 38
assert round(prop_scores2["matching_score"]) == 300
assert round(prop_scores2["len_pairs"]) == 8
# from 'alice' check in de-reference
assert prop_scores2['summary']['foo--71570479-3e6e-48d2-81fb-897454dec55d']['prop_score']['some2_ref']['weight'] == 33
assert prop_scores2['summary']['foo--07f9dd2a-1cce-45bb-8cbe-dba3f007aafd']['prop_score']['some2_ref']['weight'] == 33
def test_graph_similarity_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {}
fs = stix2.FileSystemSource(FS_PATH)
env = stix2.Environment().graphically_equivalent(fs, ds, prop_scores, **weights)
assert round(env) == 24
assert round(prop_scores["matching_score"]) == 122
assert round(prop_scores["sum_weights"]) == 500
env = stix2.Environment().graph_similarity(ds, ds, prop_scores, **weights)
assert round(env) == 100
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_similarity_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights)
assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_similarity_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights)
assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_equivalence_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, **weights)
assert env1 is False
assert round(prop_scores1["matching_score"]) == 411
assert round(prop_scores1["len_pairs"]) == 18
assert env2 is False
assert round(prop_scores2["matching_score"]) == 411
assert round(prop_scores2["len_pairs"]) == 18
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_equivalence_with_duplicate_graph(ds):
@ -981,10 +1309,10 @@ def test_graph_equivalence_with_duplicate_graph(ds):
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds, prop_scores, **weights)
assert round(env) == 100
env = stix2.Environment().graph_equivalence(ds, ds, prop_scores, **weights)
assert env is True
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["sum_weights"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_equivalence_with_versioning_check_on(ds2, ds):
@ -995,11 +1323,31 @@ def test_graph_equivalence_with_versioning_check_on(ds2, ds):
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
assert round(env) == 93
assert round(prop_scores["matching_score"]) == 745
assert round(prop_scores["sum_weights"]) == 800
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
assert env2 is True
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_equivalence_with_versioning_check_off(ds2, ds):
@ -1010,8 +1358,28 @@ def test_graph_equivalence_with_versioning_check_off(ds2, ds):
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
assert round(env) == 93
assert round(prop_scores["matching_score"]) == 745
assert round(prop_scores["sum_weights"]) == 800
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
assert env2 is True
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)

View File

@ -7,7 +7,6 @@ import json
import re
import pytz
import six
import stix2.registry as mappings
import stix2.version
@ -70,7 +69,7 @@ def _to_enum(value, enum_type, enum_default=None):
if not isinstance(value, enum_type):
if value is None and enum_default is not None:
value = enum_default
elif isinstance(value, six.string_types):
elif isinstance(value, str):
value = enum_type[value.upper()]
else:
raise TypeError(

View File

@ -3,8 +3,6 @@
from collections import OrderedDict
import copy
import six
from ..custom import _custom_marking_builder
from ..markings import _MarkingsMixin
from ..markings.utils import check_tlp_marking
@ -21,7 +19,7 @@ def _should_set_millisecond(cr, marking_type):
if marking_type == TLPMarking:
return True
# otherwise, precision is kept from how it was given
if isinstance(cr, six.string_types):
if isinstance(cr, str):
if '.' in cr:
return True
else:

View File

@ -2,9 +2,9 @@
from collections import OrderedDict
import itertools
from urllib.parse import quote_plus
import warnings
from six.moves.urllib.parse import quote_plus
from stix2patterns.validator import run_validator
from . import observables