diff --git a/.gitignore b/.gitignore
index 72b31cd..4d16202 100644
--- a/.gitignore
+++ b/.gitignore
@@ -55,8 +55,7 @@ coverage.xml
# Sphinx documentation
docs/_build/
.ipynb_checkpoints
-graph_default_sem_eq_weights.rst
-object_default_sem_eq_weights.rst
+similarity_weights.rst
# PyBuilder
target/
diff --git a/docs/conf.py b/docs/conf.py
index 5d12af3..b6dd6ea 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -7,7 +7,6 @@ import sys
from sphinx.ext.autodoc import ClassDocumenter
from stix2.base import _STIXBase
-from stix2.equivalence.graph import GRAPH_WEIGHTS
from stix2.equivalence.object import WEIGHTS
from stix2.version import __version__
@@ -66,16 +65,9 @@ object_default_sem_eq_weights = json.dumps(WEIGHTS, indent=4, default=lambda o:
object_default_sem_eq_weights = object_default_sem_eq_weights.replace('\n', '\n ')
object_default_sem_eq_weights = object_default_sem_eq_weights.replace(' "', ' ')
object_default_sem_eq_weights = object_default_sem_eq_weights.replace('"\n', '\n')
-with open('object_default_sem_eq_weights.rst', 'w') as f:
+with open('similarity_weights.rst', 'w') as f:
f.write(".. code-block:: python\n\n {}\n\n".format(object_default_sem_eq_weights))
-graph_default_sem_eq_weights = json.dumps(GRAPH_WEIGHTS, indent=4, default=lambda o: o.__name__)
-graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace('\n', '\n ')
-graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace(' "', ' ')
-graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace('"\n', '\n')
-with open('graph_default_sem_eq_weights.rst', 'w') as f:
- f.write(".. code-block:: python\n\n {}\n\n".format(graph_default_sem_eq_weights))
-
def get_property_type(prop):
"""Convert property classname into pretty string name of property.
diff --git a/docs/guide/equivalence.ipynb b/docs/guide/equivalence.ipynb
index e61e9ed..9e9c679 100644
--- a/docs/guide/equivalence.ipynb
+++ b/docs/guide/equivalence.ipynb
@@ -4607,20 +4607,11 @@
" ),\n",
"]\n",
"\n",
- "\n",
- "weights = {\n",
- " \"_internal\": {\n",
- " \"ignore_spec_version\": False,\n",
- " \"versioning_checks\": False,\n",
- " \"max_depth\": 1,\n",
- " },\n",
- "}\n",
- "\n",
"memstore1 = MemoryStore(g1)\n",
"memstore2 = MemoryStore(g2)\n",
"prop_scores = {}\n",
"\n",
- "similarity_result = env.graph_similarity(memstore1, memstore2, prop_scores, **weights)\n",
+ "similarity_result = env.graph_similarity(memstore1, memstore2, prop_scores)\n",
"equivalence_result = env.graph_equivalence(memstore1, memstore2, threshold=60)\n",
"\n",
"print(similarity_result)\n",
diff --git a/setup.py b/setup.py
index 3f82733..f382412 100644
--- a/setup.py
+++ b/setup.py
@@ -60,7 +60,7 @@ setup(
'Bug Tracker': 'https://github.com/oasis-open/cti-python-stix2/issues/',
},
extras_require={
- 'taxii': ['taxii2-client>=2.2.1'],
+ 'taxii': ['taxii2-client>=2.3.0'],
'semantic': ['haversine', 'rapidfuzz'],
},
)
diff --git a/stix2/base.py b/stix2/base.py
index 6f29882..c581be8 100644
--- a/stix2/base.py
+++ b/stix2/base.py
@@ -17,7 +17,7 @@ from .exceptions import (
)
from .markings import _MarkingsMixin
from .markings.utils import validate
-from .serialization import STIXJSONEncoder, serialize
+from .serialization import STIXJSONEncoder, fp_serialize, serialize
from .utils import NOW, PREFIX_21_REGEX, get_timestamp
from .versioning import new_version as _new_version
from .versioning import revoke as _revoke
@@ -262,6 +262,35 @@ class _STIXBase(collections.abc.Mapping):
"""
return serialize(self, *args, **kwargs)
+ def fp_serialize(self, *args, **kwargs):
+ """
+ Serialize a STIX object to ``fp`` (a text stream file-like supporting object).
+
+ Examples:
+ >>> import stix2
+ >>> identity = stix2.Identity(name='Example Corp.', identity_class='organization')
+ >>> print(identity.serialize(sort_keys=True))
+ {"created": "2018-06-08T19:03:54.066Z", ... "name": "Example Corp.", "type": "identity"}
+ >>> print(identity.serialize(sort_keys=True, indent=4))
+ {
+ "created": "2018-06-08T19:03:54.066Z",
+ "id": "identity--d7f3e25a-ba1c-447a-ab71-6434b092b05e",
+ "identity_class": "organization",
+ "modified": "2018-06-08T19:03:54.066Z",
+ "name": "Example Corp.",
+ "type": "identity"
+ }
+ >>> with open("example.json", mode="w", encoding="utf-8") as f:
+ >>> identity.fp_serialize(f, pretty=True)
+
+ Returns:
+ None
+
+ See Also:
+ ``stix2.serialization.fp_serialize`` for options.
+ """
+ fp_serialize(self, *args, **kwargs)
+
class _DomainObject(_STIXBase, _MarkingsMixin):
pass
diff --git a/stix2/datastore/filesystem.py b/stix2/datastore/filesystem.py
index d844115..2209116 100644
--- a/stix2/datastore/filesystem.py
+++ b/stix2/datastore/filesystem.py
@@ -13,7 +13,7 @@ from stix2.datastore import (
)
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
from stix2.parsing import parse
-from stix2.serialization import serialize
+from stix2.serialization import fp_serialize
from stix2.utils import format_datetime, get_type_from_id, parse_into_datetime
@@ -584,9 +584,8 @@ class FileSystemSink(DataSink):
if os.path.isfile(file_path):
raise DataSourceError("Attempted to overwrite file (!) at: {}".format(file_path))
- with io.open(file_path, 'w', encoding=encoding) as f:
- stix_obj = serialize(stix_obj, pretty=True, encoding=encoding, ensure_ascii=False)
- f.write(stix_obj)
+ with io.open(file_path, mode='w', encoding=encoding) as f:
+ fp_serialize(stix_obj, f, pretty=True, encoding=encoding, ensure_ascii=False)
def add(self, stix_data=None, version=None):
"""Add STIX objects to file directory.
diff --git a/stix2/datastore/taxii.py b/stix2/datastore/taxii.py
index 41d1e54..9ad6df9 100644
--- a/stix2/datastore/taxii.py
+++ b/stix2/datastore/taxii.py
@@ -12,6 +12,8 @@ from stix2.parsing import parse
from stix2.utils import deduplicate
try:
+ from taxii2client import v20 as tcv20
+ from taxii2client import v21 as tcv21
from taxii2client.exceptions import ValidationError
_taxii2_client = True
except ImportError:
@@ -33,9 +35,12 @@ class TAXIICollectionStore(DataStoreMixin):
side(retrieving data) and False for TAXIICollectionSink
side(pushing data). However, when parameter is supplied, it will
be applied to both TAXIICollectionSource/Sink.
+ items_per_page (int): How many STIX objects to request per call
+ to TAXII Server. The value can be tuned, but servers may override
+ if their internal limit is surpassed. Used by TAXIICollectionSource
"""
- def __init__(self, collection, allow_custom=None):
+ def __init__(self, collection, allow_custom=None, items_per_page=5000):
if allow_custom is None:
allow_custom_source = True
allow_custom_sink = False
@@ -43,7 +48,7 @@ class TAXIICollectionStore(DataStoreMixin):
allow_custom_sink = allow_custom_source = allow_custom
super(TAXIICollectionStore, self).__init__(
- source=TAXIICollectionSource(collection, allow_custom=allow_custom_source),
+ source=TAXIICollectionSource(collection, allow_custom=allow_custom_source, items_per_page=items_per_page),
sink=TAXIICollectionSink(collection, allow_custom=allow_custom_sink),
)
@@ -144,9 +149,12 @@ class TAXIICollectionSource(DataSource):
collection (taxii2.Collection): TAXII Collection instance
allow_custom (bool): Whether to allow custom STIX content to be
added to the FileSystemSink. Default: True
+ items_per_page (int): How many STIX objects to request per call
+ to TAXII Server. The value can be tuned, but servers may override
+ if their internal limit is surpassed.
"""
- def __init__(self, collection, allow_custom=True):
+ def __init__(self, collection, allow_custom=True, items_per_page=5000):
super(TAXIICollectionSource, self).__init__()
if not _taxii2_client:
raise ImportError("taxii2client library is required for usage of TAXIICollectionSource")
@@ -167,6 +175,7 @@ class TAXIICollectionSource(DataSource):
)
self.allow_custom = allow_custom
+ self.items_per_page = items_per_page
def get(self, stix_id, version=None, _composite_filters=None):
"""Retrieve STIX object from local/remote STIX Collection
@@ -286,8 +295,12 @@ class TAXIICollectionSource(DataSource):
taxii_filters_dict = dict((f.property, f.value) for f in taxii_filters)
# query TAXII collection
+ all_data = []
try:
- all_data = self.collection.get_objects(**taxii_filters_dict).get('objects', [])
+ paged_request = tcv21.as_pages if isinstance(self.collection, tcv21.Collection) else tcv20.as_pages
+
+ for resource in paged_request(self.collection.get_objects, per_request=self.items_per_page, **taxii_filters_dict):
+ all_data.extend(resource.get("objects", []))
# deduplicate data (before filtering as reduces wasted filtering)
all_data = deduplicate(all_data)
diff --git a/stix2/environment.py b/stix2/environment.py
index d0f694e..f7c13ee 100644
--- a/stix2/environment.py
+++ b/stix2/environment.py
@@ -189,7 +189,11 @@ class Environment(DataStoreMixin):
return None
@staticmethod
- def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
+ def object_similarity(
+ obj1, obj2, prop_scores={}, ds1=None, ds2=None,
+ ignore_spec_version=False, versioning_checks=False,
+ max_depth=1, **weight_dict
+ ):
"""This method returns a measure of how similar the two objects are.
Args:
@@ -197,8 +201,19 @@ class Environment(DataStoreMixin):
obj2: A stix2 object instance
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
- weight_dict: A dictionary that can be used to override settings
- in the similarity process
+ ds1 (optional): A DataStore object instance from which to pull related objects
+ ds2 (optional): A DataStore object instance from which to pull related objects
+ ignore_spec_version: A boolean indicating whether to test object types
+ that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
+ If set to True this check will be skipped.
+ versioning_checks: A boolean indicating whether to test multiple revisions
+ of the same object (when present) to maximize similarity against a
+ particular version. If set to True the algorithm will perform this step.
+ max_depth: A positive integer indicating the maximum recursion depth the
+ algorithm can reach when de-referencing objects and performing the
+ object_similarity algorithm.
+ weight_dict: A dictionary that can be used to override what checks are done
+ to objects in the similarity process.
Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity.
@@ -213,17 +228,24 @@ class Environment(DataStoreMixin):
Note:
Default weight_dict:
- .. include:: ../object_default_sem_eq_weights.rst
+ .. include:: ../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note `__.
"""
- return object_similarity(obj1, obj2, prop_scores, **weight_dict)
+ return object_similarity(
+ obj1, obj2, prop_scores, ds1, ds2, ignore_spec_version,
+ versioning_checks, max_depth, **weight_dict
+ )
@staticmethod
- def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict):
+ def object_equivalence(
+ obj1, obj2, prop_scores={}, threshold=70, ds1=None, ds2=None,
+ ignore_spec_version=False, versioning_checks=False,
+ max_depth=1, **weight_dict
+ ):
"""This method returns a true/false value if two objects are semantically equivalent.
Internally, it calls the object_similarity function and compares it against the given
threshold value.
@@ -236,8 +258,19 @@ class Environment(DataStoreMixin):
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both objects equivalent. This
value can be tuned.
- weight_dict: A dictionary that can be used to override settings
- in the similarity process
+ ds1 (optional): A DataStore object instance from which to pull related objects
+ ds2 (optional): A DataStore object instance from which to pull related objects
+ ignore_spec_version: A boolean indicating whether to test object types
+ that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
+ If set to True this check will be skipped.
+ versioning_checks: A boolean indicating whether to test multiple revisions
+ of the same object (when present) to maximize similarity against a
+ particular version. If set to True the algorithm will perform this step.
+ max_depth: A positive integer indicating the maximum recursion depth the
+ algorithm can reach when de-referencing objects and performing the
+ object_similarity algorithm.
+ weight_dict: A dictionary that can be used to override what checks are done
+ to objects in the similarity process.
Returns:
bool: True if the result of the object similarity is greater than or equal to
@@ -253,17 +286,23 @@ class Environment(DataStoreMixin):
Note:
Default weight_dict:
- .. include:: ../object_default_sem_eq_weights.rst
+ .. include:: ../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note `__.
"""
- return object_equivalence(obj1, obj2, prop_scores, threshold, **weight_dict)
+ return object_equivalence(
+ obj1, obj2, prop_scores, threshold, ds1, ds2,
+ ignore_spec_version, versioning_checks, max_depth, **weight_dict
+ )
@staticmethod
- def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
+ def graph_similarity(
+ ds1, ds2, prop_scores={}, ignore_spec_version=False,
+ versioning_checks=False, max_depth=1, **weight_dict
+ ):
"""This method returns a similarity score for two given graphs.
Each DataStore can contain a connected or disconnected graph and the
final result is weighted over the amount of objects we managed to compare.
@@ -275,8 +314,17 @@ class Environment(DataStoreMixin):
ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
- weight_dict: A dictionary that can be used to override settings
- in the similarity process
+ ignore_spec_version: A boolean indicating whether to test object types
+ that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
+ If set to True this check will be skipped.
+ versioning_checks: A boolean indicating whether to test multiple revisions
+ of the same object (when present) to maximize similarity against a
+ particular version. If set to True the algorithm will perform this step.
+ max_depth: A positive integer indicating the maximum recursion depth the
+ algorithm can reach when de-referencing objects and performing the
+ object_similarity algorithm.
+ weight_dict: A dictionary that can be used to override what checks are done
+ to objects in the similarity process.
Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity.
@@ -291,17 +339,24 @@ class Environment(DataStoreMixin):
Note:
Default weight_dict:
- .. include:: ../graph_default_sem_eq_weights.rst
+ .. include:: ../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note `__.
"""
- return graph_similarity(ds1, ds2, prop_scores, **weight_dict)
+ return graph_similarity(
+ ds1, ds2, prop_scores, ignore_spec_version,
+ versioning_checks, max_depth, **weight_dict
+ )
@staticmethod
- def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict):
+ def graph_equivalence(
+ ds1, ds2, prop_scores={}, threshold=70,
+ ignore_spec_version=False, versioning_checks=False,
+ max_depth=1, **weight_dict
+ ):
"""This method returns a true/false value if two graphs are semantically equivalent.
Internally, it calls the graph_similarity function and compares it against the given
threshold value.
@@ -314,8 +369,17 @@ class Environment(DataStoreMixin):
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both graphs equivalent. This
value can be tuned.
- weight_dict: A dictionary that can be used to override settings
- in the similarity process
+ ignore_spec_version: A boolean indicating whether to test object types
+ that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
+ If set to True this check will be skipped.
+ versioning_checks: A boolean indicating whether to test multiple revisions
+ of the same object (when present) to maximize similarity against a
+ particular version. If set to True the algorithm will perform this step.
+ max_depth: A positive integer indicating the maximum recursion depth the
+ algorithm can reach when de-referencing objects and performing the
+ object_similarity algorithm.
+ weight_dict: A dictionary that can be used to override what checks are done
+ to objects in the similarity process.
Returns:
bool: True if the result of the graph similarity is greater than or equal to
@@ -331,11 +395,14 @@ class Environment(DataStoreMixin):
Note:
Default weight_dict:
- .. include:: ../graph_default_sem_eq_weights.rst
+ .. include:: ../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note `__.
"""
- return graph_equivalence(ds1, ds2, prop_scores, threshold, **weight_dict)
+ return graph_equivalence(
+ ds1, ds2, prop_scores, threshold, ignore_spec_version,
+ versioning_checks, max_depth, **weight_dict
+ )
diff --git a/stix2/equivalence/graph/__init__.py b/stix2/equivalence/graph/__init__.py
index e78624e..1f46fd3 100644
--- a/stix2/equivalence/graph/__init__.py
+++ b/stix2/equivalence/graph/__init__.py
@@ -2,15 +2,17 @@
import logging
from ..object import (
- WEIGHTS, _bucket_per_type, _object_pairs, exact_match,
- list_reference_check, object_similarity, partial_string_based,
- partial_timestamp_based, reference_check,
+ WEIGHTS, _bucket_per_type, _object_pairs, object_similarity,
)
logger = logging.getLogger(__name__)
-def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict):
+def graph_equivalence(
+ ds1, ds2, prop_scores={}, threshold=70,
+ ignore_spec_version=False, versioning_checks=False,
+ max_depth=1, **weight_dict
+):
"""This method returns a true/false value if two graphs are semantically equivalent.
Internally, it calls the graph_similarity function and compares it against the given
threshold value.
@@ -23,8 +25,17 @@ def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict):
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both graphs equivalent. This
value can be tuned.
- weight_dict: A dictionary that can be used to override settings
- in the similarity process
+ ignore_spec_version: A boolean indicating whether to test object types
+ that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
+ If set to True this check will be skipped.
+ versioning_checks: A boolean indicating whether to test multiple revisions
+ of the same object (when present) to maximize similarity against a
+ particular version. If set to True the algorithm will perform this step.
+ max_depth: A positive integer indicating the maximum recursion depth the
+ algorithm can reach when de-referencing objects and performing the
+ object_similarity algorithm.
+ weight_dict: A dictionary that can be used to override what checks are done
+ to objects in the similarity process.
Returns:
bool: True if the result of the graph similarity is greater than or equal to
@@ -40,20 +51,26 @@ def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict):
Note:
Default weight_dict:
- .. include:: ../../graph_default_sem_eq_weights.rst
+ .. include:: ../../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note `__.
"""
- similarity_result = graph_similarity(ds1, ds2, prop_scores, **weight_dict)
+ similarity_result = graph_similarity(
+ ds1, ds2, prop_scores, ignore_spec_version,
+ versioning_checks, max_depth, **weight_dict
+ )
if similarity_result >= threshold:
return True
return False
-def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
+def graph_similarity(
+ ds1, ds2, prop_scores={}, ignore_spec_version=False,
+ versioning_checks=False, max_depth=1, **weight_dict
+):
"""This method returns a similarity score for two given graphs.
Each DataStore can contain a connected or disconnected graph and the
final result is weighted over the amount of objects we managed to compare.
@@ -65,8 +82,17 @@ def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
- weight_dict: A dictionary that can be used to override settings
- in the similarity process
+ ignore_spec_version: A boolean indicating whether to test object types
+ that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
+ If set to True this check will be skipped.
+ versioning_checks: A boolean indicating whether to test multiple revisions
+ of the same object (when present) to maximize similarity against a
+ particular version. If set to True the algorithm will perform this step.
+ max_depth: A positive integer indicating the maximum recursion depth the
+ algorithm can reach when de-referencing objects and performing the
+ object_similarity algorithm.
+ weight_dict: A dictionary that can be used to override what checks are done
+ to objects in the similarity process.
Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity.
@@ -81,7 +107,7 @@ def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
Note:
Default weight_dict:
- .. include:: ../../graph_default_sem_eq_weights.rst
+ .. include:: ../../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
@@ -90,13 +116,21 @@ def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
"""
results = {}
similarity_score = 0
- weights = GRAPH_WEIGHTS.copy()
+ weights = WEIGHTS.copy()
if weight_dict:
weights.update(weight_dict)
- if weights["_internal"]["max_depth"] <= 0:
- raise ValueError("weight_dict['_internal']['max_depth'] must be greater than 0")
+ weights["_internal"] = {
+ "ignore_spec_version": ignore_spec_version,
+ "versioning_checks": versioning_checks,
+ "ds1": ds1,
+ "ds2": ds2,
+ "max_depth": max_depth,
+ }
+
+ if max_depth <= 0:
+ raise ValueError("'max_depth' must be greater than 0")
pairs = _object_pairs(
_bucket_per_type(ds1.query([])),
@@ -104,16 +138,17 @@ def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
weights,
)
- weights["_internal"]["ds1"] = ds1
- weights["_internal"]["ds2"] = ds2
-
logger.debug("Starting graph similarity process between DataStores: '%s' and '%s'", ds1.id, ds2.id)
for object1, object2 in pairs:
iprop_score = {}
object1_id = object1["id"]
object2_id = object2["id"]
- result = object_similarity(object1, object2, iprop_score, **weights)
+ result = object_similarity(
+ object1, object2, iprop_score, ds1, ds2,
+ ignore_spec_version, versioning_checks,
+ max_depth, **weights
+ )
if object1_id not in results:
results[object1_id] = {"lhs": object1_id, "rhs": object2_id, "prop_score": iprop_score, "value": result}
@@ -141,40 +176,3 @@ def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
similarity_score,
)
return similarity_score
-
-
-# default weights used for the graph similarity process
-GRAPH_WEIGHTS = WEIGHTS.copy()
-GRAPH_WEIGHTS.update({
- "grouping": {
- "name": (20, partial_string_based),
- "context": (20, partial_string_based),
- "object_refs": (60, list_reference_check),
- },
- "relationship": {
- "relationship_type": (20, exact_match),
- "source_ref": (40, reference_check),
- "target_ref": (40, reference_check),
- },
- "report": {
- "name": (30, partial_string_based),
- "published": (10, partial_timestamp_based),
- "object_refs": (60, list_reference_check),
- "tdelta": 1, # One day interval
- },
- "sighting": {
- "first_seen": (5, partial_timestamp_based),
- "last_seen": (5, partial_timestamp_based),
- "sighting_of_ref": (40, reference_check),
- "observed_data_refs": (20, list_reference_check),
- "where_sighted_refs": (20, list_reference_check),
- "summary": (10, exact_match),
- },
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": False,
- "ds1": None,
- "ds2": None,
- "max_depth": 1,
- },
-}) # :autodoc-skip:
diff --git a/stix2/equivalence/object/__init__.py b/stix2/equivalence/object/__init__.py
index e175938..dde52ec 100644
--- a/stix2/equivalence/object/__init__.py
+++ b/stix2/equivalence/object/__init__.py
@@ -4,14 +4,18 @@ import itertools
import logging
import time
-from ...datastore import Filter
+from ...datastore import DataSource, DataStoreMixin, Filter
from ...utils import STIXdatetime, parse_into_datetime
from ..pattern import equivalent_patterns
logger = logging.getLogger(__name__)
-def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict):
+def object_equivalence(
+ obj1, obj2, prop_scores={}, threshold=70, ds1=None,
+ ds2=None, ignore_spec_version=False,
+ versioning_checks=False, max_depth=1, **weight_dict
+):
"""This method returns a true/false value if two objects are semantically equivalent.
Internally, it calls the object_similarity function and compares it against the given
threshold value.
@@ -24,8 +28,19 @@ def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict):
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both objects equivalent. This
value can be tuned.
- weight_dict: A dictionary that can be used to override settings
- in the similarity process
+ ds1 (optional): A DataStore object instance from which to pull related objects
+ ds2 (optional): A DataStore object instance from which to pull related objects
+ ignore_spec_version: A boolean indicating whether to test object types
+ that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
+ If set to True this check will be skipped.
+ versioning_checks: A boolean indicating whether to test multiple revisions
+ of the same object (when present) to maximize similarity against a
+ particular version. If set to True the algorithm will perform this step.
+ max_depth: A positive integer indicating the maximum recursion depth the
+ algorithm can reach when de-referencing objects and performing the
+ object_similarity algorithm.
+ weight_dict: A dictionary that can be used to override what checks are done
+ to objects in the similarity process.
Returns:
bool: True if the result of the object similarity is greater than or equal to
@@ -41,20 +56,27 @@ def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict):
Note:
Default weight_dict:
- .. include:: ../../object_default_sem_eq_weights.rst
+ .. include:: ../../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note `__.
"""
- similarity_result = object_similarity(obj1, obj2, prop_scores, **weight_dict)
+ similarity_result = object_similarity(
+ obj1, obj2, prop_scores, ds1, ds2, ignore_spec_version,
+ versioning_checks, max_depth, **weight_dict
+ )
if similarity_result >= threshold:
return True
return False
-def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
+def object_similarity(
+ obj1, obj2, prop_scores={}, ds1=None, ds2=None,
+ ignore_spec_version=False, versioning_checks=False,
+ max_depth=1, **weight_dict
+):
"""This method returns a measure of similarity depending on how
similar the two objects are.
@@ -63,8 +85,19 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
obj2: A stix2 object instance
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
- weight_dict: A dictionary that can be used to override settings
- in the similarity process
+ ds1 (optional): A DataStore object instance from which to pull related objects
+ ds2 (optional): A DataStore object instance from which to pull related objects
+ ignore_spec_version: A boolean indicating whether to test object types
+ that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
+ If set to True this check will be skipped.
+ versioning_checks: A boolean indicating whether to test multiple revisions
+ of the same object (when present) to maximize similarity against a
+ particular version. If set to True the algorithm will perform this step.
+ max_depth: A positive integer indicating the maximum recursion depth the
+ algorithm can reach when de-referencing objects and performing the
+ object_similarity algorithm.
+ weight_dict: A dictionary that can be used to override what checks are done
+ to objects in the similarity process.
Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity.
@@ -79,7 +112,7 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
Note:
Default weight_dict:
- .. include:: ../../object_default_sem_eq_weights.rst
+ .. include:: ../../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
@@ -91,8 +124,15 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
if weight_dict:
weights.update(weight_dict)
+ weights["_internal"] = {
+ "ignore_spec_version": ignore_spec_version,
+ "versioning_checks": versioning_checks,
+ "ds1": ds1,
+ "ds2": ds2,
+ "max_depth": max_depth,
+ }
+
type1, type2 = obj1["type"], obj2["type"]
- ignore_spec_version = weights["_internal"]["ignore_spec_version"]
if type1 != type2:
raise ValueError('The objects to compare must be of the same type!')
@@ -117,6 +157,7 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
if check_property_present(prop, obj1, obj2):
w = weights[type1][prop][0]
comp_funct = weights[type1][prop][1]
+ prop_scores[prop] = {}
if comp_funct == partial_timestamp_based:
contributing_score = w * comp_funct(obj1[prop], obj2[prop], weights[type1]["tdelta"])
@@ -124,11 +165,18 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
threshold = weights[type1]["threshold"]
contributing_score = w * comp_funct(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], threshold)
elif comp_funct == reference_check or comp_funct == list_reference_check:
- max_depth = weights["_internal"]["max_depth"]
if max_depth > 0:
weights["_internal"]["max_depth"] = max_depth - 1
ds1, ds2 = weights["_internal"]["ds1"], weights["_internal"]["ds2"]
- contributing_score = w * comp_funct(obj1[prop], obj2[prop], ds1, ds2, **weights)
+ if _datastore_check(ds1, ds2):
+ contributing_score = w * comp_funct(obj1[prop], obj2[prop], ds1, ds2, **weights)
+ elif comp_funct == reference_check:
+ comp_funct = exact_match
+ contributing_score = w * comp_funct(obj1[prop], obj2[prop])
+ elif comp_funct == list_reference_check:
+ comp_funct = partial_list_based
+ contributing_score = w * comp_funct(obj1[prop], obj2[prop])
+ prop_scores[prop]["check_type"] = comp_funct.__name__
else:
continue # prevent excessive recursion
weights["_internal"]["max_depth"] = max_depth
@@ -138,10 +186,8 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
sum_weights += w
matching_score += contributing_score
- prop_scores[prop] = {
- "weight": w,
- "contributing_score": contributing_score,
- }
+ prop_scores[prop]["weight"] = w
+ prop_scores[prop]["contributing_score"] = contributing_score
logger.debug("'%s' check -- weight: %s, contributing score: %s", prop, w, contributing_score)
prop_scores["matching_score"] = matching_score
@@ -165,7 +211,7 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
def check_property_present(prop, obj1, obj2):
"""Helper method checks if a property is present on both objects."""
if prop == "longitude_latitude":
- if all(x in obj1 and x in obj2 for x in ['latitude', 'longitude']):
+ if all(x in obj1 and x in obj2 for x in ('latitude', 'longitude')):
return True
elif prop in obj1 and prop in obj2:
return True
@@ -196,7 +242,9 @@ def partial_timestamp_based(t1, t2, tdelta):
def partial_list_based(l1, l2):
- """Performs a partial list matching via finding the intersection between common values.
+ """Performs a partial list matching via finding the intersection between
+ common values. Repeated values are counted only once. This method can be
+ used for *_refs equality checks when de-reference is not possible.
Args:
l1: A list of values.
@@ -213,7 +261,8 @@ def partial_list_based(l1, l2):
def exact_match(val1, val2):
- """Performs an exact value match based on two values
+ """Performs an exact value match based on two values. This method can be
+ used for *_ref equality check when de-reference is not possible.
Args:
val1: A value suitable for an equality test.
@@ -261,12 +310,12 @@ def custom_pattern_based(pattern1, pattern2):
return equivalent_patterns(pattern1, pattern2)
-def partial_external_reference_based(refs1, refs2):
+def partial_external_reference_based(ext_refs1, ext_refs2):
"""Performs a matching on External References.
Args:
- refs1: A list of external references.
- refs2: A list of external references.
+ ext_refs1: A list of external references.
+ ext_refs2: A list of external references.
Returns:
float: Number between 0.0 and 1.0 depending on matches.
@@ -275,51 +324,47 @@ def partial_external_reference_based(refs1, refs2):
allowed = {"veris", "cve", "capec", "mitre-attack"}
matches = 0
- if len(refs1) >= len(refs2):
- l1 = refs1
- l2 = refs2
- else:
- l1 = refs2
- l2 = refs1
+ ref_pairs = itertools.chain(
+ itertools.product(ext_refs1, ext_refs2),
+ )
- for ext_ref1 in l1:
- for ext_ref2 in l2:
- sn_match = False
- ei_match = False
- url_match = False
- source_name = None
+ for ext_ref1, ext_ref2 in ref_pairs:
+ sn_match = False
+ ei_match = False
+ url_match = False
+ source_name = None
- if check_property_present("source_name", ext_ref1, ext_ref2):
- if ext_ref1["source_name"] == ext_ref2["source_name"]:
- source_name = ext_ref1["source_name"]
- sn_match = True
- if check_property_present("external_id", ext_ref1, ext_ref2):
- if ext_ref1["external_id"] == ext_ref2["external_id"]:
- ei_match = True
- if check_property_present("url", ext_ref1, ext_ref2):
- if ext_ref1["url"] == ext_ref2["url"]:
- url_match = True
+ if check_property_present("source_name", ext_ref1, ext_ref2):
+ if ext_ref1["source_name"] == ext_ref2["source_name"]:
+ source_name = ext_ref1["source_name"]
+ sn_match = True
+ if check_property_present("external_id", ext_ref1, ext_ref2):
+ if ext_ref1["external_id"] == ext_ref2["external_id"]:
+ ei_match = True
+ if check_property_present("url", ext_ref1, ext_ref2):
+ if ext_ref1["url"] == ext_ref2["url"]:
+ url_match = True
- # Special case: if source_name is a STIX defined name and either
- # external_id or url match then its a perfect match and other entries
- # can be ignored.
- if sn_match and (ei_match or url_match) and source_name in allowed:
- result = 1.0
- logger.debug(
- "--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
- refs1, refs2, result,
- )
- return result
+ # Special case: if source_name is a STIX defined name and either
+ # external_id or url match then its a perfect match and other entries
+ # can be ignored.
+ if sn_match and (ei_match or url_match) and source_name in allowed:
+ result = 1.0
+ logger.debug(
+ "--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
+ ext_refs1, ext_refs2, result,
+ )
+ return result
- # Regular check. If the source_name (not STIX-defined) or external_id or
- # url matches then we consider the entry a match.
- if (sn_match or ei_match or url_match) and source_name not in allowed:
- matches += 1
+ # Regular check. If the source_name (not STIX-defined) or external_id or
+ # url matches then we consider the entry a match.
+ if (sn_match or ei_match or url_match) and source_name not in allowed:
+ matches += 1
- result = matches / max(len(refs1), len(refs2))
+ result = matches / max(len(ext_refs1), len(ext_refs2))
logger.debug(
"--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
- refs1, refs2, result,
+ ext_refs1, ext_refs2, result,
)
return result
@@ -352,17 +397,23 @@ def _versioned_checks(ref1, ref2, ds1, ds2, **weights):
"""Checks multiple object versions if present in graph.
Maximizes for the similarity score of a particular version."""
results = {}
- objects1 = ds1.query([Filter("id", "=", ref1)])
- objects2 = ds2.query([Filter("id", "=", ref2)])
pairs = _object_pairs(
- _bucket_per_type(objects1),
- _bucket_per_type(objects2),
+ _bucket_per_type(ds1.query([Filter("id", "=", ref1)])),
+ _bucket_per_type(ds2.query([Filter("id", "=", ref2)])),
weights,
)
+ ignore_spec_version = weights["_internal"]["ignore_spec_version"]
+ versioning_checks = weights["_internal"]["versioning_checks"]
+ max_depth = weights["_internal"]["max_depth"]
for object1, object2 in pairs:
- result = object_similarity(object1, object2, **weights)
+ result = object_similarity(
+ object1, object2, ds1=ds1, ds2=ds2,
+ ignore_spec_version=ignore_spec_version,
+ versioning_checks=versioning_checks,
+ max_depth=max_depth, **weights,
+ )
if ref1 not in results:
results[ref1] = {"matched": ref2, "value": result}
elif result > results[ref1]["value"]:
@@ -383,12 +434,20 @@ def reference_check(ref1, ref2, ds1, ds2, **weights):
result = 0.0
if type1 == type2 and type1 in weights:
- if weights["_internal"]["versioning_checks"]:
+ ignore_spec_version = weights["_internal"]["ignore_spec_version"]
+ versioning_checks = weights["_internal"]["versioning_checks"]
+ max_depth = weights["_internal"]["max_depth"]
+ if versioning_checks:
result = _versioned_checks(ref1, ref2, ds1, ds2, **weights) / 100.0
else:
o1, o2 = ds1.get(ref1), ds2.get(ref2)
if o1 and o2:
- result = object_similarity(o1, o2, **weights) / 100.0
+ result = object_similarity(
+ o1, o2, ds1=ds1, ds2=ds2,
+ ignore_spec_version=ignore_spec_version,
+ versioning_checks=versioning_checks,
+ max_depth=max_depth, **weights,
+ ) / 100.0
logger.debug(
"--\t\treference_check '%s' '%s'\tresult: '%s'",
@@ -439,6 +498,15 @@ def list_reference_check(refs1, refs2, ds1, ds2, **weights):
return result
+def _datastore_check(ds1, ds2):
+ if (
+ issubclass(ds1.__class__, (DataStoreMixin, DataSource)) or
+ issubclass(ds2.__class__, (DataStoreMixin, DataSource))
+ ):
+ return True
+ return False
+
+
def _bucket_per_type(graph, mode="type"):
"""Given a list of objects or references, bucket them by type.
Depending on the list type: extract from 'type' property or using
@@ -480,11 +548,20 @@ WEIGHTS = {
"name": (60, partial_string_based),
"external_references": (40, partial_external_reference_based),
},
+ "grouping": {
+ "name": (20, partial_string_based),
+ "context": (20, partial_string_based),
+ "object_refs": (60, list_reference_check),
+ },
"identity": {
"name": (60, partial_string_based),
"identity_class": (20, exact_match),
"sectors": (20, partial_list_based),
},
+ "incident": {
+ "name": (30, partial_string_based),
+ "external_references": (70, partial_external_reference_based),
+ },
"indicator": {
"indicator_types": (15, partial_list_based),
"pattern": (80, custom_pattern_based),
@@ -511,6 +588,25 @@ WEIGHTS = {
"definition": (60, exact_match),
"definition_type": (20, exact_match),
},
+ "relationship": {
+ "relationship_type": (20, exact_match),
+ "source_ref": (40, reference_check),
+ "target_ref": (40, reference_check),
+ },
+ "report": {
+ "name": (30, partial_string_based),
+ "published": (10, partial_timestamp_based),
+ "object_refs": (60, list_reference_check),
+ "tdelta": 1, # One day interval
+ },
+ "sighting": {
+ "first_seen": (5, partial_timestamp_based),
+ "last_seen": (5, partial_timestamp_based),
+ "sighting_of_ref": (40, reference_check),
+ "observed_data_refs": (20, list_reference_check),
+ "where_sighted_refs": (20, list_reference_check),
+ "summary": (10, exact_match),
+ },
"threat-actor": {
"name": (60, partial_string_based),
"threat_actor_types": (20, partial_list_based),
@@ -524,7 +620,4 @@ WEIGHTS = {
"name": (30, partial_string_based),
"external_references": (70, partial_external_reference_based),
},
- "_internal": {
- "ignore_spec_version": False,
- },
} # :autodoc-skip:
diff --git a/stix2/serialization.py b/stix2/serialization.py
index 7488eb5..2784d39 100644
--- a/stix2/serialization.py
+++ b/stix2/serialization.py
@@ -2,6 +2,7 @@
import copy
import datetime as dt
+import io
import simplejson as json
@@ -64,6 +65,37 @@ def serialize(obj, pretty=False, include_optional_defaults=False, **kwargs):
Returns:
str: The serialized JSON object.
+ Note:
+ The argument ``pretty=True`` will output the STIX object following
+ spec order. Using this argument greatly impacts object serialization
+ performance. If your use case is centered across machine-to-machine
+ operation it is recommended to set ``pretty=False``.
+
+ When ``pretty=True`` the following key-value pairs will be added or
+ overridden: indent=4, separators=(",", ": "), item_sort_key=sort_by.
+ """
+ with io.StringIO() as fp:
+ fp_serialize(obj, fp, pretty, include_optional_defaults, **kwargs)
+ return fp.getvalue()
+
+
+def fp_serialize(obj, fp, pretty=False, include_optional_defaults=False, **kwargs):
+ """
+ Serialize a STIX object to ``fp`` (a text stream file-like supporting object).
+
+ Args:
+ obj: The STIX object to be serialized.
+ fp: A text stream file-like object supporting ``.write()``.
+ pretty (bool): If True, output properties following the STIX specs
+ formatting. This includes indentation. Refer to notes for more
+ details. (Default: ``False``)
+ include_optional_defaults (bool): Determines whether to include
+ optional properties set to the default value defined in the spec.
+ **kwargs: The arguments for a json.dumps() call.
+
+ Returns:
+ None
+
Note:
The argument ``pretty=True`` will output the STIX object following
spec order. Using this argument greatly impacts object serialization
@@ -80,9 +112,9 @@ def serialize(obj, pretty=False, include_optional_defaults=False, **kwargs):
kwargs.update({'indent': 4, 'separators': (',', ': '), 'item_sort_key': sort_by})
if include_optional_defaults:
- return json.dumps(obj, cls=STIXJSONIncludeOptionalDefaultsEncoder, **kwargs)
+ json.dump(obj, fp, cls=STIXJSONIncludeOptionalDefaultsEncoder, **kwargs)
else:
- return json.dumps(obj, cls=STIXJSONEncoder, **kwargs)
+ json.dump(obj, fp, cls=STIXJSONEncoder, **kwargs)
def _find(seq, val):
diff --git a/stix2/test/v20/test_bundle.py b/stix2/test/v20/test_bundle.py
index 3e09192..35588a5 100644
--- a/stix2/test/v20/test_bundle.py
+++ b/stix2/test/v20/test_bundle.py
@@ -1,3 +1,4 @@
+import io
import json
import pytest
@@ -113,6 +114,27 @@ def test_bundle_id_must_start_with_bundle():
assert str(excinfo.value) == "Invalid value for Bundle 'id': must start with 'bundle--'."
+def test_create_bundle_fp_serialize_pretty(indicator, malware, relationship):
+ bundle = stix2.v20.Bundle(objects=[indicator, malware, relationship])
+ buffer = io.StringIO()
+
+ bundle.fp_serialize(buffer, pretty=True)
+
+ assert str(bundle) == EXPECTED_BUNDLE
+ assert bundle.serialize(pretty=True) == EXPECTED_BUNDLE
+ assert buffer.getvalue() == EXPECTED_BUNDLE
+
+
+def test_create_bundle_fp_serialize_nonpretty(indicator, malware, relationship):
+ bundle = stix2.v20.Bundle(objects=[indicator, malware, relationship])
+ buffer = io.StringIO()
+
+ bundle.fp_serialize(buffer, sort_keys=True)
+
+ assert bundle.serialize(sort_keys=True) == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
+ assert buffer.getvalue() == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
+
+
def test_create_bundle1(indicator, malware, relationship):
bundle = stix2.v20.Bundle(objects=[indicator, malware, relationship])
diff --git a/stix2/test/v20/test_datastore_taxii.py b/stix2/test/v20/test_datastore_taxii.py
index 34daa80..075f0a3 100644
--- a/stix2/test/v20/test_datastore_taxii.py
+++ b/stix2/test/v20/test_datastore_taxii.py
@@ -4,7 +4,7 @@ from medallion.filters.basic_filter import BasicFilter
import pytest
from requests.models import Response
from taxii2client.common import _filter_kwargs_to_query_params
-from taxii2client.v20 import Collection
+from taxii2client.v20 import MEDIA_TYPE_STIX_V20, Collection
import stix2
from stix2.datastore import DataSourceError
@@ -34,12 +34,12 @@ class MockTAXIICollectionEndpoint(Collection):
{
"date_added": get_timestamp(),
"id": object["id"],
- "media_type": "application/stix+json;version=2.1",
+ "media_type": "application/stix+json;version=2.0",
"version": object.get("modified", object.get("created", get_timestamp())),
},
)
- def get_objects(self, **filter_kwargs):
+ def get_objects(self, accept=MEDIA_TYPE_STIX_V20, start=0, per_request=0, **filter_kwargs):
self._verify_can_read()
query_params = _filter_kwargs_to_query_params(filter_kwargs)
assert isinstance(query_params, dict)
@@ -51,7 +51,12 @@ class MockTAXIICollectionEndpoint(Collection):
100,
)[0]
if objs:
- return stix2.v20.Bundle(objects=objs)
+ resp = Response()
+ resp.status_code = 200
+ resp.headers["Content-Range"] = f"items 0-{len(objs)}/{len(objs)}"
+ resp.encoding = "utf-8"
+ resp._content = bytes(stix2.v20.Bundle(objects=objs).serialize(ensure_ascii=False), resp.encoding)
+ return resp
else:
resp = Response()
resp.status_code = 404
diff --git a/stix2/test/v20/test_environment.py b/stix2/test/v20/test_environment.py
index 33e0985..c8867b0 100644
--- a/stix2/test/v20/test_environment.py
+++ b/stix2/test/v20/test_environment.py
@@ -424,7 +424,7 @@ def test_related_to_by_target(ds):
def test_versioned_checks(ds, ds2):
- weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
+ weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": True,
@@ -437,7 +437,7 @@ def test_versioned_checks(ds, ds2):
def test_semantic_check_with_versioning(ds, ds2):
- weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
+ weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": False,
@@ -467,13 +467,11 @@ def test_semantic_check_with_versioning(ds, ds2):
def test_list_semantic_check(ds, ds2):
- weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
+ weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
- "ds1": ds,
- "ds2": ds2,
"max_depth": 1,
},
})
@@ -504,39 +502,18 @@ def test_list_semantic_check(ds, ds2):
def test_graph_similarity_raises_value_error(ds):
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": False,
- "max_depth": -1,
- },
- }
with pytest.raises(ValueError):
prop_scores1 = {}
- stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
+ stix2.Environment().graph_similarity(ds, ds2, prop_scores1, max_depth=-1)
def test_graph_similarity_with_filesystem_source(ds, fs):
- weights = {
- "_internal": {
- "ignore_spec_version": True,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores1 = {}
- env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, **weights)
+ env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, ignore_spec_version=True)
# Switching parameters
- weights = {
- "_internal": {
- "ignore_spec_version": True,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores2 = {}
- env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, **weights)
+ env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, ignore_spec_version=True)
assert round(env1) == 25
assert round(prop_scores1["matching_score"]) == 451
@@ -552,41 +529,20 @@ def test_graph_similarity_with_filesystem_source(ds, fs):
def test_graph_similarity_with_duplicate_graph(ds):
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores = {}
- env = stix2.Environment().graph_similarity(ds, ds, prop_scores, **weights)
+ env = stix2.Environment().graph_similarity(ds, ds, prop_scores)
assert round(env) == 100
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_similarity_with_versioning_check_on(ds2, ds):
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": True,
- "max_depth": 1,
- },
- }
prop_scores1 = {}
- env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
+ env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, versioning_checks=True)
# Switching parameters
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": True,
- "max_depth": 1,
- },
- }
prop_scores2 = {}
- env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights)
+ env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, versioning_checks=True)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
@@ -602,26 +558,12 @@ def test_graph_similarity_with_versioning_check_on(ds2, ds):
def test_graph_similarity_with_versioning_check_off(ds2, ds):
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores1 = {}
- env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
+ env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1)
# Switching parameters
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores2 = {}
- env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights)
+ env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
@@ -637,26 +579,12 @@ def test_graph_similarity_with_versioning_check_off(ds2, ds):
def test_graph_equivalence_with_filesystem_source(ds, fs):
- weights = {
- "_internal": {
- "ignore_spec_version": True,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores1 = {}
- env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, **weights)
+ env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, ignore_spec_version=True)
# Switching parameters
- weights = {
- "_internal": {
- "ignore_spec_version": True,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores2 = {}
- env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, **weights)
+ env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, ignore_spec_version=True)
assert env1 is False
assert round(prop_scores1["matching_score"]) == 451
@@ -672,41 +600,20 @@ def test_graph_equivalence_with_filesystem_source(ds, fs):
def test_graph_equivalence_with_duplicate_graph(ds):
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores = {}
- env = stix2.Environment().graph_equivalence(ds, ds, prop_scores, **weights)
+ env = stix2.Environment().graph_equivalence(ds, ds, prop_scores)
assert env is True
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_equivalence_with_versioning_check_on(ds2, ds):
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": True,
- "max_depth": 1,
- },
- }
prop_scores1 = {}
- env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights)
+ env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, versioning_checks=True)
# Switching parameters
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": True,
- "max_depth": 1,
- },
- }
prop_scores2 = {}
- env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights)
+ env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, versioning_checks=True)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789
@@ -722,26 +629,12 @@ def test_graph_equivalence_with_versioning_check_on(ds2, ds):
def test_graph_equivalence_with_versioning_check_off(ds2, ds):
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores1 = {}
- env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights)
+ env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1)
# Switching parameters
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores2 = {}
- env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights)
+ env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789
diff --git a/stix2/test/v21/test_bundle.py b/stix2/test/v21/test_bundle.py
index 2eeaff4..b7d0946 100644
--- a/stix2/test/v21/test_bundle.py
+++ b/stix2/test/v21/test_bundle.py
@@ -1,3 +1,4 @@
+import io
import json
import pytest
@@ -123,6 +124,27 @@ def test_bundle_id_must_start_with_bundle():
assert str(excinfo.value) == "Invalid value for Bundle 'id': must start with 'bundle--'."
+def test_create_bundle_fp_serialize_pretty(indicator, malware, relationship):
+ bundle = stix2.v21.Bundle(objects=[indicator, malware, relationship])
+ buffer = io.StringIO()
+
+ bundle.fp_serialize(buffer, pretty=True)
+
+ assert str(bundle) == EXPECTED_BUNDLE
+ assert bundle.serialize(pretty=True) == EXPECTED_BUNDLE
+ assert buffer.getvalue() == EXPECTED_BUNDLE
+
+
+def test_create_bundle_fp_serialize_nonpretty(indicator, malware, relationship):
+ bundle = stix2.v21.Bundle(objects=[indicator, malware, relationship])
+ buffer = io.StringIO()
+
+ bundle.fp_serialize(buffer, sort_keys=True)
+
+ assert bundle.serialize(sort_keys=True) == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
+ assert buffer.getvalue() == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
+
+
def test_create_bundle1(indicator, malware, relationship):
bundle = stix2.v21.Bundle(objects=[indicator, malware, relationship])
diff --git a/stix2/test/v21/test_datastore_taxii.py b/stix2/test/v21/test_datastore_taxii.py
index 4b7e299..62ddadc 100644
--- a/stix2/test/v21/test_datastore_taxii.py
+++ b/stix2/test/v21/test_datastore_taxii.py
@@ -28,14 +28,14 @@ class MockTAXIICollectionEndpoint(Collection):
self._verify_can_write()
if isinstance(bundle, str):
bundle = json.loads(bundle)
- for object in bundle.get("objects", []):
- self.objects.append(object)
+ for obj in bundle.get("objects", []):
+ self.objects.append(obj)
self.manifests.append(
{
"date_added": get_timestamp(),
- "id": object["id"],
+ "id": obj["id"],
"media_type": "application/stix+json;version=2.1",
- "version": object.get("modified", object.get("created", get_timestamp())),
+ "version": obj.get("modified", obj.get("created", get_timestamp())),
},
)
@@ -51,7 +51,10 @@ class MockTAXIICollectionEndpoint(Collection):
100,
)[0]
if objs:
- return stix2.v21.Bundle(objects=objs)
+ return {
+ "objects": objs,
+ "more": False,
+ }
else:
resp = Response()
resp.status_code = 404
@@ -75,7 +78,10 @@ class MockTAXIICollectionEndpoint(Collection):
else:
filtered_objects = []
if filtered_objects:
- return stix2.v21.Bundle(objects=filtered_objects)
+ return {
+ "objects": filtered_objects,
+ "more": False,
+ }
else:
resp = Response()
resp.status_code = 404
diff --git a/stix2/test/v21/test_environment.py b/stix2/test/v21/test_environment.py
index e7bf4da..7f6b71c 100644
--- a/stix2/test/v21/test_environment.py
+++ b/stix2/test/v21/test_environment.py
@@ -760,16 +760,13 @@ def test_object_similarity_different_spec_version():
"valid_from": (5, stix2.equivalence.object.partial_timestamp_based),
"tdelta": 1, # One day interval
},
- "_internal": {
- "ignore_spec_version": True, # Disables spec_version check.
- },
}
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **IND_KWARGS)
- env = stix2.Environment().object_similarity(ind1, ind2, **weights)
+ env = stix2.Environment().object_similarity(ind1, ind2, ignore_spec_version=True, **weights)
assert round(env) == 0
- env = stix2.Environment().object_similarity(ind2, ind1, **weights)
+ env = stix2.Environment().object_similarity(ind2, ind1, ignore_spec_version=True, **weights)
assert round(env) == 0
@@ -858,10 +855,12 @@ def test_object_similarity_exact_match():
assert stix2.equivalence.object.exact_match(t1, t2) == 0.0
-def test_non_existent_config_for_object():
+def test_no_datastore_fallsback_list_based_check_for_refs_check():
r1 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
r2 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
- assert stix2.Environment().object_similarity(r1, r2) == 0.0
+ prop_scores = {}
+ assert stix2.Environment().object_similarity(r1, r2, prop_scores) == 100.0
+ assert prop_scores["object_refs"]["check_type"] == "partial_list_based"
def custom_semantic_equivalence_method(obj1, obj2, **weights):
@@ -937,7 +936,8 @@ def test_object_similarity_prop_scores_method_provided():
def test_versioned_checks(ds, ds2):
- weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
+ # Testing internal method
+ weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": True,
@@ -950,7 +950,7 @@ def test_versioned_checks(ds, ds2):
def test_semantic_check_with_versioning(ds, ds2):
- weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
+ weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": False,
@@ -981,7 +981,7 @@ def test_semantic_check_with_versioning(ds, ds2):
def test_list_semantic_check(ds, ds2):
- weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
+ weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": False,
@@ -1027,39 +1027,28 @@ def test_list_semantic_check(ds, ds2):
def test_graph_similarity_raises_value_error(ds):
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": False,
- "max_depth": -1,
- },
- }
with pytest.raises(ValueError):
prop_scores1 = {}
- stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
+ stix2.Environment().graph_similarity(ds, ds2, prop_scores1, max_depth=-1)
def test_graph_similarity_with_filesystem_source(ds, fs):
- weights = {
- "_internal": {
- "ignore_spec_version": True,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores1 = {}
- env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, **weights)
+ env1 = stix2.Environment().graph_similarity(
+ fs, ds, prop_scores1,
+ ignore_spec_version=True,
+ versioning_checks=False,
+ max_depth=1,
+ )
# Switching parameters
- weights = {
- "_internal": {
- "ignore_spec_version": True,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores2 = {}
- env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, **weights)
+ env2 = stix2.Environment().graph_similarity(
+ ds, fs, prop_scores2,
+ ignore_spec_version=True,
+ versioning_checks=False,
+ max_depth=1,
+ )
assert round(env1) == 23
assert round(prop_scores1["matching_score"]) == 411
@@ -1154,14 +1143,11 @@ def test_depth_limiting():
"some2_ref": (33, stix2.equivalence.object.reference_check),
"name": (34, stix2.equivalence.object.partial_string_based),
},
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": False,
- "max_depth": 1,
- },
}
prop_scores1 = {}
- env1 = stix2.equivalence.graph.graph_similarity(mem_store1, mem_store2, prop_scores1, **custom_weights)
+ env1 = stix2.equivalence.graph.graph_similarity(
+ mem_store1, mem_store2, prop_scores1, **custom_weights
+ )
assert round(env1) == 38
assert round(prop_scores1["matching_score"]) == 300
@@ -1185,44 +1171,23 @@ def test_depth_limiting():
def test_graph_similarity_with_duplicate_graph(ds):
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores = {}
- env = stix2.Environment().graph_similarity(ds, ds, prop_scores, **weights)
+ env = stix2.Environment().graph_similarity(ds, ds, prop_scores)
assert round(env) == 100
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_similarity_with_versioning_check_on(ds2, ds):
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": True,
- "max_depth": 1,
- },
- }
prop_scores1 = {}
- env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
+ env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, versioning_checks=True)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
# Switching parameters
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores2 = {}
- env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights)
+ env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, versioning_checks=True)
assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
@@ -1233,29 +1198,15 @@ def test_graph_similarity_with_versioning_check_on(ds2, ds):
def test_graph_similarity_with_versioning_check_off(ds2, ds):
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores1 = {}
- env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
+ env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
# Switching parameters
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores2 = {}
- env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights)
+ env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2)
assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
@@ -1266,26 +1217,12 @@ def test_graph_similarity_with_versioning_check_off(ds2, ds):
def test_graph_equivalence_with_filesystem_source(ds, fs):
- weights = {
- "_internal": {
- "ignore_spec_version": True,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores1 = {}
- env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, **weights)
+ env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, ignore_spec_version=True)
# Switching parameters
- weights = {
- "_internal": {
- "ignore_spec_version": True,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores2 = {}
- env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, **weights)
+ env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, ignore_spec_version=True)
assert env1 is False
assert round(prop_scores1["matching_score"]) == 411
@@ -1301,41 +1238,20 @@ def test_graph_equivalence_with_filesystem_source(ds, fs):
def test_graph_equivalence_with_duplicate_graph(ds):
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores = {}
- env = stix2.Environment().graph_equivalence(ds, ds, prop_scores, **weights)
+ env = stix2.Environment().graph_equivalence(ds, ds, prop_scores)
assert env is True
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_equivalence_with_versioning_check_on(ds2, ds):
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": True,
- "max_depth": 1,
- },
- }
prop_scores1 = {}
- env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights)
+ env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, versioning_checks=True)
# Switching parameters
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": True,
- "max_depth": 1,
- },
- }
prop_scores2 = {}
- env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights)
+ env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, versioning_checks=True)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789
@@ -1351,26 +1267,12 @@ def test_graph_equivalence_with_versioning_check_on(ds2, ds):
def test_graph_equivalence_with_versioning_check_off(ds2, ds):
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores1 = {}
- env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights)
+ env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1)
# Switching parameters
- weights = {
- "_internal": {
- "ignore_spec_version": False,
- "versioning_checks": False,
- "max_depth": 1,
- },
- }
prop_scores2 = {}
- env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights)
+ env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789