Merge pull request #496 from emmanvg/semantic-equivalence-part3

Similarity/Equivalence Changes
pull/1/head
Chris Lenk 2021-03-10 14:08:45 -05:00 committed by GitHub
commit f155e3e571
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 364 additions and 429 deletions

3
.gitignore vendored
View File

@ -55,8 +55,7 @@ coverage.xml
# Sphinx documentation # Sphinx documentation
docs/_build/ docs/_build/
.ipynb_checkpoints .ipynb_checkpoints
graph_default_sem_eq_weights.rst similarity_weights.rst
object_default_sem_eq_weights.rst
# PyBuilder # PyBuilder
target/ target/

View File

@ -7,7 +7,6 @@ import sys
from sphinx.ext.autodoc import ClassDocumenter from sphinx.ext.autodoc import ClassDocumenter
from stix2.base import _STIXBase from stix2.base import _STIXBase
from stix2.equivalence.graph import GRAPH_WEIGHTS
from stix2.equivalence.object import WEIGHTS from stix2.equivalence.object import WEIGHTS
from stix2.version import __version__ from stix2.version import __version__
@ -66,16 +65,9 @@ object_default_sem_eq_weights = json.dumps(WEIGHTS, indent=4, default=lambda o:
object_default_sem_eq_weights = object_default_sem_eq_weights.replace('\n', '\n ') object_default_sem_eq_weights = object_default_sem_eq_weights.replace('\n', '\n ')
object_default_sem_eq_weights = object_default_sem_eq_weights.replace(' "', ' ') object_default_sem_eq_weights = object_default_sem_eq_weights.replace(' "', ' ')
object_default_sem_eq_weights = object_default_sem_eq_weights.replace('"\n', '\n') object_default_sem_eq_weights = object_default_sem_eq_weights.replace('"\n', '\n')
with open('object_default_sem_eq_weights.rst', 'w') as f: with open('similarity_weights.rst', 'w') as f:
f.write(".. code-block:: python\n\n {}\n\n".format(object_default_sem_eq_weights)) f.write(".. code-block:: python\n\n {}\n\n".format(object_default_sem_eq_weights))
graph_default_sem_eq_weights = json.dumps(GRAPH_WEIGHTS, indent=4, default=lambda o: o.__name__)
graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace('\n', '\n ')
graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace(' "', ' ')
graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace('"\n', '\n')
with open('graph_default_sem_eq_weights.rst', 'w') as f:
f.write(".. code-block:: python\n\n {}\n\n".format(graph_default_sem_eq_weights))
def get_property_type(prop): def get_property_type(prop):
"""Convert property classname into pretty string name of property. """Convert property classname into pretty string name of property.

View File

@ -4607,20 +4607,11 @@
" ),\n", " ),\n",
"]\n", "]\n",
"\n", "\n",
"\n",
"weights = {\n",
" \"_internal\": {\n",
" \"ignore_spec_version\": False,\n",
" \"versioning_checks\": False,\n",
" \"max_depth\": 1,\n",
" },\n",
"}\n",
"\n",
"memstore1 = MemoryStore(g1)\n", "memstore1 = MemoryStore(g1)\n",
"memstore2 = MemoryStore(g2)\n", "memstore2 = MemoryStore(g2)\n",
"prop_scores = {}\n", "prop_scores = {}\n",
"\n", "\n",
"similarity_result = env.graph_similarity(memstore1, memstore2, prop_scores, **weights)\n", "similarity_result = env.graph_similarity(memstore1, memstore2, prop_scores)\n",
"equivalence_result = env.graph_equivalence(memstore1, memstore2, threshold=60)\n", "equivalence_result = env.graph_equivalence(memstore1, memstore2, threshold=60)\n",
"\n", "\n",
"print(similarity_result)\n", "print(similarity_result)\n",

View File

@ -189,7 +189,11 @@ class Environment(DataStoreMixin):
return None return None
@staticmethod @staticmethod
def object_similarity(obj1, obj2, prop_scores={}, **weight_dict): def object_similarity(
obj1, obj2, prop_scores={}, ds1=None, ds2=None,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a measure of how similar the two objects are. """This method returns a measure of how similar the two objects are.
Args: Args:
@ -197,8 +201,19 @@ class Environment(DataStoreMixin):
obj2: A stix2 object instance obj2: A stix2 object instance
prop_scores: A dictionary that can hold individual property scores, prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights. weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings ds1 (optional): A DataStore object instance from which to pull related objects
in the similarity process ds2 (optional): A DataStore object instance from which to pull related objects
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns: Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity. float: A number between 0.0 and 100.0 as a measurement of similarity.
@ -213,17 +228,24 @@ class Environment(DataStoreMixin):
Note: Note:
Default weight_dict: Default weight_dict:
.. include:: ../object_default_sem_eq_weights.rst .. include:: ../similarity_weights.rst
Note: Note:
This implementation follows the Semantic Equivalence Committee Note. This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__. see `the Committee Note <link here>`__.
""" """
return object_similarity(obj1, obj2, prop_scores, **weight_dict) return object_similarity(
obj1, obj2, prop_scores, ds1, ds2, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)
@staticmethod @staticmethod
def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict): def object_equivalence(
obj1, obj2, prop_scores={}, threshold=70, ds1=None, ds2=None,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a true/false value if two objects are semantically equivalent. """This method returns a true/false value if two objects are semantically equivalent.
Internally, it calls the object_similarity function and compares it against the given Internally, it calls the object_similarity function and compares it against the given
threshold value. threshold value.
@ -236,8 +258,19 @@ class Environment(DataStoreMixin):
threshold: A numerical value between 0 and 100 to determine the minimum threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both objects equivalent. This score to result in successfully calling both objects equivalent. This
value can be tuned. value can be tuned.
weight_dict: A dictionary that can be used to override settings ds1 (optional): A DataStore object instance from which to pull related objects
in the similarity process ds2 (optional): A DataStore object instance from which to pull related objects
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns: Returns:
bool: True if the result of the object similarity is greater than or equal to bool: True if the result of the object similarity is greater than or equal to
@ -253,17 +286,23 @@ class Environment(DataStoreMixin):
Note: Note:
Default weight_dict: Default weight_dict:
.. include:: ../object_default_sem_eq_weights.rst .. include:: ../similarity_weights.rst
Note: Note:
This implementation follows the Semantic Equivalence Committee Note. This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__. see `the Committee Note <link here>`__.
""" """
return object_equivalence(obj1, obj2, prop_scores, threshold, **weight_dict) return object_equivalence(
obj1, obj2, prop_scores, threshold, ds1, ds2,
ignore_spec_version, versioning_checks, max_depth, **weight_dict
)
@staticmethod @staticmethod
def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict): def graph_similarity(
ds1, ds2, prop_scores={}, ignore_spec_version=False,
versioning_checks=False, max_depth=1, **weight_dict
):
"""This method returns a similarity score for two given graphs. """This method returns a similarity score for two given graphs.
Each DataStore can contain a connected or disconnected graph and the Each DataStore can contain a connected or disconnected graph and the
final result is weighted over the amount of objects we managed to compare. final result is weighted over the amount of objects we managed to compare.
@ -275,8 +314,17 @@ class Environment(DataStoreMixin):
ds2: A DataStore object instance representing your graph ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores, prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights. weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings ignore_spec_version: A boolean indicating whether to test object types
in the similarity process that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns: Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity. float: A number between 0.0 and 100.0 as a measurement of similarity.
@ -291,17 +339,24 @@ class Environment(DataStoreMixin):
Note: Note:
Default weight_dict: Default weight_dict:
.. include:: ../graph_default_sem_eq_weights.rst .. include:: ../similarity_weights.rst
Note: Note:
This implementation follows the Semantic Equivalence Committee Note. This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__. see `the Committee Note <link here>`__.
""" """
return graph_similarity(ds1, ds2, prop_scores, **weight_dict) return graph_similarity(
ds1, ds2, prop_scores, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)
@staticmethod @staticmethod
def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict): def graph_equivalence(
ds1, ds2, prop_scores={}, threshold=70,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a true/false value if two graphs are semantically equivalent. """This method returns a true/false value if two graphs are semantically equivalent.
Internally, it calls the graph_similarity function and compares it against the given Internally, it calls the graph_similarity function and compares it against the given
threshold value. threshold value.
@ -314,8 +369,17 @@ class Environment(DataStoreMixin):
threshold: A numerical value between 0 and 100 to determine the minimum threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both graphs equivalent. This score to result in successfully calling both graphs equivalent. This
value can be tuned. value can be tuned.
weight_dict: A dictionary that can be used to override settings ignore_spec_version: A boolean indicating whether to test object types
in the similarity process that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns: Returns:
bool: True if the result of the graph similarity is greater than or equal to bool: True if the result of the graph similarity is greater than or equal to
@ -331,11 +395,14 @@ class Environment(DataStoreMixin):
Note: Note:
Default weight_dict: Default weight_dict:
.. include:: ../graph_default_sem_eq_weights.rst .. include:: ../similarity_weights.rst
Note: Note:
This implementation follows the Semantic Equivalence Committee Note. This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__. see `the Committee Note <link here>`__.
""" """
return graph_equivalence(ds1, ds2, prop_scores, threshold, **weight_dict) return graph_equivalence(
ds1, ds2, prop_scores, threshold, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)

View File

@ -2,15 +2,17 @@
import logging import logging
from ..object import ( from ..object import (
WEIGHTS, _bucket_per_type, _object_pairs, exact_match, WEIGHTS, _bucket_per_type, _object_pairs, object_similarity,
list_reference_check, object_similarity, partial_string_based,
partial_timestamp_based, reference_check,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict): def graph_equivalence(
ds1, ds2, prop_scores={}, threshold=70,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a true/false value if two graphs are semantically equivalent. """This method returns a true/false value if two graphs are semantically equivalent.
Internally, it calls the graph_similarity function and compares it against the given Internally, it calls the graph_similarity function and compares it against the given
threshold value. threshold value.
@ -23,8 +25,17 @@ def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict):
threshold: A numerical value between 0 and 100 to determine the minimum threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both graphs equivalent. This score to result in successfully calling both graphs equivalent. This
value can be tuned. value can be tuned.
weight_dict: A dictionary that can be used to override settings ignore_spec_version: A boolean indicating whether to test object types
in the similarity process that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns: Returns:
bool: True if the result of the graph similarity is greater than or equal to bool: True if the result of the graph similarity is greater than or equal to
@ -40,20 +51,26 @@ def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict):
Note: Note:
Default weight_dict: Default weight_dict:
.. include:: ../../graph_default_sem_eq_weights.rst .. include:: ../../similarity_weights.rst
Note: Note:
This implementation follows the Semantic Equivalence Committee Note. This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__. see `the Committee Note <link here>`__.
""" """
similarity_result = graph_similarity(ds1, ds2, prop_scores, **weight_dict) similarity_result = graph_similarity(
ds1, ds2, prop_scores, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)
if similarity_result >= threshold: if similarity_result >= threshold:
return True return True
return False return False
def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict): def graph_similarity(
ds1, ds2, prop_scores={}, ignore_spec_version=False,
versioning_checks=False, max_depth=1, **weight_dict
):
"""This method returns a similarity score for two given graphs. """This method returns a similarity score for two given graphs.
Each DataStore can contain a connected or disconnected graph and the Each DataStore can contain a connected or disconnected graph and the
final result is weighted over the amount of objects we managed to compare. final result is weighted over the amount of objects we managed to compare.
@ -65,8 +82,17 @@ def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
ds2: A DataStore object instance representing your graph ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores, prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights. weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings ignore_spec_version: A boolean indicating whether to test object types
in the similarity process that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns: Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity. float: A number between 0.0 and 100.0 as a measurement of similarity.
@ -81,7 +107,7 @@ def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
Note: Note:
Default weight_dict: Default weight_dict:
.. include:: ../../graph_default_sem_eq_weights.rst .. include:: ../../similarity_weights.rst
Note: Note:
This implementation follows the Semantic Equivalence Committee Note. This implementation follows the Semantic Equivalence Committee Note.
@ -90,13 +116,21 @@ def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
""" """
results = {} results = {}
similarity_score = 0 similarity_score = 0
weights = GRAPH_WEIGHTS.copy() weights = WEIGHTS.copy()
if weight_dict: if weight_dict:
weights.update(weight_dict) weights.update(weight_dict)
if weights["_internal"]["max_depth"] <= 0: weights["_internal"] = {
raise ValueError("weight_dict['_internal']['max_depth'] must be greater than 0") "ignore_spec_version": ignore_spec_version,
"versioning_checks": versioning_checks,
"ds1": ds1,
"ds2": ds2,
"max_depth": max_depth,
}
if max_depth <= 0:
raise ValueError("'max_depth' must be greater than 0")
pairs = _object_pairs( pairs = _object_pairs(
_bucket_per_type(ds1.query([])), _bucket_per_type(ds1.query([])),
@ -104,16 +138,17 @@ def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
weights, weights,
) )
weights["_internal"]["ds1"] = ds1
weights["_internal"]["ds2"] = ds2
logger.debug("Starting graph similarity process between DataStores: '%s' and '%s'", ds1.id, ds2.id) logger.debug("Starting graph similarity process between DataStores: '%s' and '%s'", ds1.id, ds2.id)
for object1, object2 in pairs: for object1, object2 in pairs:
iprop_score = {} iprop_score = {}
object1_id = object1["id"] object1_id = object1["id"]
object2_id = object2["id"] object2_id = object2["id"]
result = object_similarity(object1, object2, iprop_score, **weights) result = object_similarity(
object1, object2, iprop_score, ds1, ds2,
ignore_spec_version, versioning_checks,
max_depth, **weights
)
if object1_id not in results: if object1_id not in results:
results[object1_id] = {"lhs": object1_id, "rhs": object2_id, "prop_score": iprop_score, "value": result} results[object1_id] = {"lhs": object1_id, "rhs": object2_id, "prop_score": iprop_score, "value": result}
@ -141,40 +176,3 @@ def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
similarity_score, similarity_score,
) )
return similarity_score return similarity_score
# default weights used for the graph similarity process
GRAPH_WEIGHTS = WEIGHTS.copy()
GRAPH_WEIGHTS.update({
"grouping": {
"name": (20, partial_string_based),
"context": (20, partial_string_based),
"object_refs": (60, list_reference_check),
},
"relationship": {
"relationship_type": (20, exact_match),
"source_ref": (40, reference_check),
"target_ref": (40, reference_check),
},
"report": {
"name": (30, partial_string_based),
"published": (10, partial_timestamp_based),
"object_refs": (60, list_reference_check),
"tdelta": 1, # One day interval
},
"sighting": {
"first_seen": (5, partial_timestamp_based),
"last_seen": (5, partial_timestamp_based),
"sighting_of_ref": (40, reference_check),
"observed_data_refs": (20, list_reference_check),
"where_sighted_refs": (20, list_reference_check),
"summary": (10, exact_match),
},
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"ds1": None,
"ds2": None,
"max_depth": 1,
},
}) # :autodoc-skip:

View File

@ -4,14 +4,18 @@ import itertools
import logging import logging
import time import time
from ...datastore import Filter from ...datastore import DataSource, DataStoreMixin, Filter
from ...utils import STIXdatetime, parse_into_datetime from ...utils import STIXdatetime, parse_into_datetime
from ..pattern import equivalent_patterns from ..pattern import equivalent_patterns
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict): def object_equivalence(
obj1, obj2, prop_scores={}, threshold=70, ds1=None,
ds2=None, ignore_spec_version=False,
versioning_checks=False, max_depth=1, **weight_dict
):
"""This method returns a true/false value if two objects are semantically equivalent. """This method returns a true/false value if two objects are semantically equivalent.
Internally, it calls the object_similarity function and compares it against the given Internally, it calls the object_similarity function and compares it against the given
threshold value. threshold value.
@ -24,8 +28,19 @@ def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict):
threshold: A numerical value between 0 and 100 to determine the minimum threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both objects equivalent. This score to result in successfully calling both objects equivalent. This
value can be tuned. value can be tuned.
weight_dict: A dictionary that can be used to override settings ds1 (optional): A DataStore object instance from which to pull related objects
in the similarity process ds2 (optional): A DataStore object instance from which to pull related objects
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns: Returns:
bool: True if the result of the object similarity is greater than or equal to bool: True if the result of the object similarity is greater than or equal to
@ -41,20 +56,27 @@ def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict):
Note: Note:
Default weight_dict: Default weight_dict:
.. include:: ../../object_default_sem_eq_weights.rst .. include:: ../../similarity_weights.rst
Note: Note:
This implementation follows the Semantic Equivalence Committee Note. This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__. see `the Committee Note <link here>`__.
""" """
similarity_result = object_similarity(obj1, obj2, prop_scores, **weight_dict) similarity_result = object_similarity(
obj1, obj2, prop_scores, ds1, ds2, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)
if similarity_result >= threshold: if similarity_result >= threshold:
return True return True
return False return False
def object_similarity(obj1, obj2, prop_scores={}, **weight_dict): def object_similarity(
obj1, obj2, prop_scores={}, ds1=None, ds2=None,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a measure of similarity depending on how """This method returns a measure of similarity depending on how
similar the two objects are. similar the two objects are.
@ -63,8 +85,19 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
obj2: A stix2 object instance obj2: A stix2 object instance
prop_scores: A dictionary that can hold individual property scores, prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights. weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings ds1 (optional): A DataStore object instance from which to pull related objects
in the similarity process ds2 (optional): A DataStore object instance from which to pull related objects
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns: Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity. float: A number between 0.0 and 100.0 as a measurement of similarity.
@ -79,7 +112,7 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
Note: Note:
Default weight_dict: Default weight_dict:
.. include:: ../../object_default_sem_eq_weights.rst .. include:: ../../similarity_weights.rst
Note: Note:
This implementation follows the Semantic Equivalence Committee Note. This implementation follows the Semantic Equivalence Committee Note.
@ -91,8 +124,15 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
if weight_dict: if weight_dict:
weights.update(weight_dict) weights.update(weight_dict)
weights["_internal"] = {
"ignore_spec_version": ignore_spec_version,
"versioning_checks": versioning_checks,
"ds1": ds1,
"ds2": ds2,
"max_depth": max_depth,
}
type1, type2 = obj1["type"], obj2["type"] type1, type2 = obj1["type"], obj2["type"]
ignore_spec_version = weights["_internal"]["ignore_spec_version"]
if type1 != type2: if type1 != type2:
raise ValueError('The objects to compare must be of the same type!') raise ValueError('The objects to compare must be of the same type!')
@ -117,6 +157,7 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
if check_property_present(prop, obj1, obj2): if check_property_present(prop, obj1, obj2):
w = weights[type1][prop][0] w = weights[type1][prop][0]
comp_funct = weights[type1][prop][1] comp_funct = weights[type1][prop][1]
prop_scores[prop] = {}
if comp_funct == partial_timestamp_based: if comp_funct == partial_timestamp_based:
contributing_score = w * comp_funct(obj1[prop], obj2[prop], weights[type1]["tdelta"]) contributing_score = w * comp_funct(obj1[prop], obj2[prop], weights[type1]["tdelta"])
@ -124,11 +165,18 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
threshold = weights[type1]["threshold"] threshold = weights[type1]["threshold"]
contributing_score = w * comp_funct(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], threshold) contributing_score = w * comp_funct(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], threshold)
elif comp_funct == reference_check or comp_funct == list_reference_check: elif comp_funct == reference_check or comp_funct == list_reference_check:
max_depth = weights["_internal"]["max_depth"]
if max_depth > 0: if max_depth > 0:
weights["_internal"]["max_depth"] = max_depth - 1 weights["_internal"]["max_depth"] = max_depth - 1
ds1, ds2 = weights["_internal"]["ds1"], weights["_internal"]["ds2"] ds1, ds2 = weights["_internal"]["ds1"], weights["_internal"]["ds2"]
contributing_score = w * comp_funct(obj1[prop], obj2[prop], ds1, ds2, **weights) if _datastore_check(ds1, ds2):
contributing_score = w * comp_funct(obj1[prop], obj2[prop], ds1, ds2, **weights)
elif comp_funct == reference_check:
comp_funct = exact_match
contributing_score = w * comp_funct(obj1[prop], obj2[prop])
elif comp_funct == list_reference_check:
comp_funct = partial_list_based
contributing_score = w * comp_funct(obj1[prop], obj2[prop])
prop_scores[prop]["check_type"] = comp_funct.__name__
else: else:
continue # prevent excessive recursion continue # prevent excessive recursion
weights["_internal"]["max_depth"] = max_depth weights["_internal"]["max_depth"] = max_depth
@ -138,10 +186,8 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
sum_weights += w sum_weights += w
matching_score += contributing_score matching_score += contributing_score
prop_scores[prop] = { prop_scores[prop]["weight"] = w
"weight": w, prop_scores[prop]["contributing_score"] = contributing_score
"contributing_score": contributing_score,
}
logger.debug("'%s' check -- weight: %s, contributing score: %s", prop, w, contributing_score) logger.debug("'%s' check -- weight: %s, contributing score: %s", prop, w, contributing_score)
prop_scores["matching_score"] = matching_score prop_scores["matching_score"] = matching_score
@ -165,7 +211,7 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
def check_property_present(prop, obj1, obj2): def check_property_present(prop, obj1, obj2):
"""Helper method checks if a property is present on both objects.""" """Helper method checks if a property is present on both objects."""
if prop == "longitude_latitude": if prop == "longitude_latitude":
if all(x in obj1 and x in obj2 for x in ['latitude', 'longitude']): if all(x in obj1 and x in obj2 for x in ('latitude', 'longitude')):
return True return True
elif prop in obj1 and prop in obj2: elif prop in obj1 and prop in obj2:
return True return True
@ -196,7 +242,9 @@ def partial_timestamp_based(t1, t2, tdelta):
def partial_list_based(l1, l2): def partial_list_based(l1, l2):
"""Performs a partial list matching via finding the intersection between common values. """Performs a partial list matching via finding the intersection between
common values. Repeated values are counted only once. This method can be
used for *_refs equality checks when de-reference is not possible.
Args: Args:
l1: A list of values. l1: A list of values.
@ -213,7 +261,8 @@ def partial_list_based(l1, l2):
def exact_match(val1, val2): def exact_match(val1, val2):
"""Performs an exact value match based on two values """Performs an exact value match based on two values. This method can be
used for *_ref equality check when de-reference is not possible.
Args: Args:
val1: A value suitable for an equality test. val1: A value suitable for an equality test.
@ -261,12 +310,12 @@ def custom_pattern_based(pattern1, pattern2):
return equivalent_patterns(pattern1, pattern2) return equivalent_patterns(pattern1, pattern2)
def partial_external_reference_based(refs1, refs2): def partial_external_reference_based(ext_refs1, ext_refs2):
"""Performs a matching on External References. """Performs a matching on External References.
Args: Args:
refs1: A list of external references. ext_refs1: A list of external references.
refs2: A list of external references. ext_refs2: A list of external references.
Returns: Returns:
float: Number between 0.0 and 1.0 depending on matches. float: Number between 0.0 and 1.0 depending on matches.
@ -275,51 +324,47 @@ def partial_external_reference_based(refs1, refs2):
allowed = {"veris", "cve", "capec", "mitre-attack"} allowed = {"veris", "cve", "capec", "mitre-attack"}
matches = 0 matches = 0
if len(refs1) >= len(refs2): ref_pairs = itertools.chain(
l1 = refs1 itertools.product(ext_refs1, ext_refs2),
l2 = refs2 )
else:
l1 = refs2
l2 = refs1
for ext_ref1 in l1: for ext_ref1, ext_ref2 in ref_pairs:
for ext_ref2 in l2: sn_match = False
sn_match = False ei_match = False
ei_match = False url_match = False
url_match = False source_name = None
source_name = None
if check_property_present("source_name", ext_ref1, ext_ref2): if check_property_present("source_name", ext_ref1, ext_ref2):
if ext_ref1["source_name"] == ext_ref2["source_name"]: if ext_ref1["source_name"] == ext_ref2["source_name"]:
source_name = ext_ref1["source_name"] source_name = ext_ref1["source_name"]
sn_match = True sn_match = True
if check_property_present("external_id", ext_ref1, ext_ref2): if check_property_present("external_id", ext_ref1, ext_ref2):
if ext_ref1["external_id"] == ext_ref2["external_id"]: if ext_ref1["external_id"] == ext_ref2["external_id"]:
ei_match = True ei_match = True
if check_property_present("url", ext_ref1, ext_ref2): if check_property_present("url", ext_ref1, ext_ref2):
if ext_ref1["url"] == ext_ref2["url"]: if ext_ref1["url"] == ext_ref2["url"]:
url_match = True url_match = True
# Special case: if source_name is a STIX defined name and either # Special case: if source_name is a STIX defined name and either
# external_id or url match then its a perfect match and other entries # external_id or url match then its a perfect match and other entries
# can be ignored. # can be ignored.
if sn_match and (ei_match or url_match) and source_name in allowed: if sn_match and (ei_match or url_match) and source_name in allowed:
result = 1.0 result = 1.0
logger.debug( logger.debug(
"--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'", "--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
refs1, refs2, result, ext_refs1, ext_refs2, result,
) )
return result return result
# Regular check. If the source_name (not STIX-defined) or external_id or # Regular check. If the source_name (not STIX-defined) or external_id or
# url matches then we consider the entry a match. # url matches then we consider the entry a match.
if (sn_match or ei_match or url_match) and source_name not in allowed: if (sn_match or ei_match or url_match) and source_name not in allowed:
matches += 1 matches += 1
result = matches / max(len(refs1), len(refs2)) result = matches / max(len(ext_refs1), len(ext_refs2))
logger.debug( logger.debug(
"--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'", "--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
refs1, refs2, result, ext_refs1, ext_refs2, result,
) )
return result return result
@ -352,17 +397,23 @@ def _versioned_checks(ref1, ref2, ds1, ds2, **weights):
"""Checks multiple object versions if present in graph. """Checks multiple object versions if present in graph.
Maximizes for the similarity score of a particular version.""" Maximizes for the similarity score of a particular version."""
results = {} results = {}
objects1 = ds1.query([Filter("id", "=", ref1)])
objects2 = ds2.query([Filter("id", "=", ref2)])
pairs = _object_pairs( pairs = _object_pairs(
_bucket_per_type(objects1), _bucket_per_type(ds1.query([Filter("id", "=", ref1)])),
_bucket_per_type(objects2), _bucket_per_type(ds2.query([Filter("id", "=", ref2)])),
weights, weights,
) )
ignore_spec_version = weights["_internal"]["ignore_spec_version"]
versioning_checks = weights["_internal"]["versioning_checks"]
max_depth = weights["_internal"]["max_depth"]
for object1, object2 in pairs: for object1, object2 in pairs:
result = object_similarity(object1, object2, **weights) result = object_similarity(
object1, object2, ds1=ds1, ds2=ds2,
ignore_spec_version=ignore_spec_version,
versioning_checks=versioning_checks,
max_depth=max_depth, **weights,
)
if ref1 not in results: if ref1 not in results:
results[ref1] = {"matched": ref2, "value": result} results[ref1] = {"matched": ref2, "value": result}
elif result > results[ref1]["value"]: elif result > results[ref1]["value"]:
@ -383,12 +434,20 @@ def reference_check(ref1, ref2, ds1, ds2, **weights):
result = 0.0 result = 0.0
if type1 == type2 and type1 in weights: if type1 == type2 and type1 in weights:
if weights["_internal"]["versioning_checks"]: ignore_spec_version = weights["_internal"]["ignore_spec_version"]
versioning_checks = weights["_internal"]["versioning_checks"]
max_depth = weights["_internal"]["max_depth"]
if versioning_checks:
result = _versioned_checks(ref1, ref2, ds1, ds2, **weights) / 100.0 result = _versioned_checks(ref1, ref2, ds1, ds2, **weights) / 100.0
else: else:
o1, o2 = ds1.get(ref1), ds2.get(ref2) o1, o2 = ds1.get(ref1), ds2.get(ref2)
if o1 and o2: if o1 and o2:
result = object_similarity(o1, o2, **weights) / 100.0 result = object_similarity(
o1, o2, ds1=ds1, ds2=ds2,
ignore_spec_version=ignore_spec_version,
versioning_checks=versioning_checks,
max_depth=max_depth, **weights,
) / 100.0
logger.debug( logger.debug(
"--\t\treference_check '%s' '%s'\tresult: '%s'", "--\t\treference_check '%s' '%s'\tresult: '%s'",
@ -439,6 +498,15 @@ def list_reference_check(refs1, refs2, ds1, ds2, **weights):
return result return result
def _datastore_check(ds1, ds2):
if (
issubclass(ds1.__class__, (DataStoreMixin, DataSource)) or
issubclass(ds2.__class__, (DataStoreMixin, DataSource))
):
return True
return False
def _bucket_per_type(graph, mode="type"): def _bucket_per_type(graph, mode="type"):
"""Given a list of objects or references, bucket them by type. """Given a list of objects or references, bucket them by type.
Depending on the list type: extract from 'type' property or using Depending on the list type: extract from 'type' property or using
@ -480,11 +548,20 @@ WEIGHTS = {
"name": (60, partial_string_based), "name": (60, partial_string_based),
"external_references": (40, partial_external_reference_based), "external_references": (40, partial_external_reference_based),
}, },
"grouping": {
"name": (20, partial_string_based),
"context": (20, partial_string_based),
"object_refs": (60, list_reference_check),
},
"identity": { "identity": {
"name": (60, partial_string_based), "name": (60, partial_string_based),
"identity_class": (20, exact_match), "identity_class": (20, exact_match),
"sectors": (20, partial_list_based), "sectors": (20, partial_list_based),
}, },
"incident": {
"name": (60, partial_string_based),
"external_references": (40, partial_external_reference_based),
},
"indicator": { "indicator": {
"indicator_types": (15, partial_list_based), "indicator_types": (15, partial_list_based),
"pattern": (80, custom_pattern_based), "pattern": (80, custom_pattern_based),
@ -511,6 +588,25 @@ WEIGHTS = {
"definition": (60, exact_match), "definition": (60, exact_match),
"definition_type": (20, exact_match), "definition_type": (20, exact_match),
}, },
"relationship": {
"relationship_type": (20, exact_match),
"source_ref": (40, reference_check),
"target_ref": (40, reference_check),
},
"report": {
"name": (30, partial_string_based),
"published": (10, partial_timestamp_based),
"object_refs": (60, list_reference_check),
"tdelta": 1, # One day interval
},
"sighting": {
"first_seen": (5, partial_timestamp_based),
"last_seen": (5, partial_timestamp_based),
"sighting_of_ref": (40, reference_check),
"observed_data_refs": (20, list_reference_check),
"where_sighted_refs": (20, list_reference_check),
"summary": (10, exact_match),
},
"threat-actor": { "threat-actor": {
"name": (60, partial_string_based), "name": (60, partial_string_based),
"threat_actor_types": (20, partial_list_based), "threat_actor_types": (20, partial_list_based),
@ -524,7 +620,4 @@ WEIGHTS = {
"name": (30, partial_string_based), "name": (30, partial_string_based),
"external_references": (70, partial_external_reference_based), "external_references": (70, partial_external_reference_based),
}, },
"_internal": {
"ignore_spec_version": False,
},
} # :autodoc-skip: } # :autodoc-skip:

View File

@ -424,7 +424,7 @@ def test_related_to_by_target(ds):
def test_versioned_checks(ds, ds2): def test_versioned_checks(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({ weights.update({
"_internal": { "_internal": {
"ignore_spec_version": True, "ignore_spec_version": True,
@ -437,7 +437,7 @@ def test_versioned_checks(ds, ds2):
def test_semantic_check_with_versioning(ds, ds2): def test_semantic_check_with_versioning(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({ weights.update({
"_internal": { "_internal": {
"ignore_spec_version": False, "ignore_spec_version": False,
@ -467,13 +467,11 @@ def test_semantic_check_with_versioning(ds, ds2):
def test_list_semantic_check(ds, ds2): def test_list_semantic_check(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({ weights.update({
"_internal": { "_internal": {
"ignore_spec_version": False, "ignore_spec_version": False,
"versioning_checks": False, "versioning_checks": False,
"ds1": ds,
"ds2": ds2,
"max_depth": 1, "max_depth": 1,
}, },
}) })
@ -504,39 +502,18 @@ def test_list_semantic_check(ds, ds2):
def test_graph_similarity_raises_value_error(ds): def test_graph_similarity_raises_value_error(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": -1,
},
}
with pytest.raises(ValueError): with pytest.raises(ValueError):
prop_scores1 = {} prop_scores1 = {}
stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights) stix2.Environment().graph_similarity(ds, ds2, prop_scores1, max_depth=-1)
def test_graph_similarity_with_filesystem_source(ds, fs): def test_graph_similarity_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, **weights) env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, ignore_spec_version=True)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, **weights) env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, ignore_spec_version=True)
assert round(env1) == 25 assert round(env1) == 25
assert round(prop_scores1["matching_score"]) == 451 assert round(prop_scores1["matching_score"]) == 451
@ -552,41 +529,20 @@ def test_graph_similarity_with_filesystem_source(ds, fs):
def test_graph_similarity_with_duplicate_graph(ds): def test_graph_similarity_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {} prop_scores = {}
env = stix2.Environment().graph_similarity(ds, ds, prop_scores, **weights) env = stix2.Environment().graph_similarity(ds, ds, prop_scores)
assert round(env) == 100 assert round(env) == 100
assert round(prop_scores["matching_score"]) == 800 assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8 assert round(prop_scores["len_pairs"]) == 8
def test_graph_similarity_with_versioning_check_on(ds2, ds): def test_graph_similarity_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights) env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, versioning_checks=True)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights) env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, versioning_checks=True)
assert round(env1) == 88 assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789 assert round(prop_scores1["matching_score"]) == 789
@ -602,26 +558,12 @@ def test_graph_similarity_with_versioning_check_on(ds2, ds):
def test_graph_similarity_with_versioning_check_off(ds2, ds): def test_graph_similarity_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights) env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights) env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2)
assert round(env1) == 88 assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789 assert round(prop_scores1["matching_score"]) == 789
@ -637,26 +579,12 @@ def test_graph_similarity_with_versioning_check_off(ds2, ds):
def test_graph_equivalence_with_filesystem_source(ds, fs): def test_graph_equivalence_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, **weights) env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, ignore_spec_version=True)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, **weights) env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, ignore_spec_version=True)
assert env1 is False assert env1 is False
assert round(prop_scores1["matching_score"]) == 451 assert round(prop_scores1["matching_score"]) == 451
@ -672,41 +600,20 @@ def test_graph_equivalence_with_filesystem_source(ds, fs):
def test_graph_equivalence_with_duplicate_graph(ds): def test_graph_equivalence_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {} prop_scores = {}
env = stix2.Environment().graph_equivalence(ds, ds, prop_scores, **weights) env = stix2.Environment().graph_equivalence(ds, ds, prop_scores)
assert env is True assert env is True
assert round(prop_scores["matching_score"]) == 800 assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8 assert round(prop_scores["len_pairs"]) == 8
def test_graph_equivalence_with_versioning_check_on(ds2, ds): def test_graph_equivalence_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights) env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, versioning_checks=True)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights) env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, versioning_checks=True)
assert env1 is True assert env1 is True
assert round(prop_scores1["matching_score"]) == 789 assert round(prop_scores1["matching_score"]) == 789
@ -722,26 +629,12 @@ def test_graph_equivalence_with_versioning_check_on(ds2, ds):
def test_graph_equivalence_with_versioning_check_off(ds2, ds): def test_graph_equivalence_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights) env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights) env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2)
assert env1 is True assert env1 is True
assert round(prop_scores1["matching_score"]) == 789 assert round(prop_scores1["matching_score"]) == 789

View File

@ -760,16 +760,13 @@ def test_object_similarity_different_spec_version():
"valid_from": (5, stix2.equivalence.object.partial_timestamp_based), "valid_from": (5, stix2.equivalence.object.partial_timestamp_based),
"tdelta": 1, # One day interval "tdelta": 1, # One day interval
}, },
"_internal": {
"ignore_spec_version": True, # Disables spec_version check.
},
} }
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **IND_KWARGS) ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **IND_KWARGS)
env = stix2.Environment().object_similarity(ind1, ind2, **weights) env = stix2.Environment().object_similarity(ind1, ind2, ignore_spec_version=True, **weights)
assert round(env) == 0 assert round(env) == 0
env = stix2.Environment().object_similarity(ind2, ind1, **weights) env = stix2.Environment().object_similarity(ind2, ind1, ignore_spec_version=True, **weights)
assert round(env) == 0 assert round(env) == 0
@ -858,10 +855,12 @@ def test_object_similarity_exact_match():
assert stix2.equivalence.object.exact_match(t1, t2) == 0.0 assert stix2.equivalence.object.exact_match(t1, t2) == 0.0
def test_non_existent_config_for_object(): def test_no_datastore_fallsback_list_based_check_for_refs_check():
r1 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS) r1 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
r2 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS) r2 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
assert stix2.Environment().object_similarity(r1, r2) == 0.0 prop_scores = {}
assert stix2.Environment().object_similarity(r1, r2, prop_scores) == 100.0
assert prop_scores["object_refs"]["check_type"] == "partial_list_based"
def custom_semantic_equivalence_method(obj1, obj2, **weights): def custom_semantic_equivalence_method(obj1, obj2, **weights):
@ -937,7 +936,8 @@ def test_object_similarity_prop_scores_method_provided():
def test_versioned_checks(ds, ds2): def test_versioned_checks(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() # Testing internal method
weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({ weights.update({
"_internal": { "_internal": {
"ignore_spec_version": True, "ignore_spec_version": True,
@ -950,7 +950,7 @@ def test_versioned_checks(ds, ds2):
def test_semantic_check_with_versioning(ds, ds2): def test_semantic_check_with_versioning(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({ weights.update({
"_internal": { "_internal": {
"ignore_spec_version": False, "ignore_spec_version": False,
@ -981,7 +981,7 @@ def test_semantic_check_with_versioning(ds, ds2):
def test_list_semantic_check(ds, ds2): def test_list_semantic_check(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({ weights.update({
"_internal": { "_internal": {
"ignore_spec_version": False, "ignore_spec_version": False,
@ -1027,39 +1027,28 @@ def test_list_semantic_check(ds, ds2):
def test_graph_similarity_raises_value_error(ds): def test_graph_similarity_raises_value_error(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": -1,
},
}
with pytest.raises(ValueError): with pytest.raises(ValueError):
prop_scores1 = {} prop_scores1 = {}
stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights) stix2.Environment().graph_similarity(ds, ds2, prop_scores1, max_depth=-1)
def test_graph_similarity_with_filesystem_source(ds, fs): def test_graph_similarity_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, **weights) env1 = stix2.Environment().graph_similarity(
fs, ds, prop_scores1,
ignore_spec_version=True,
versioning_checks=False,
max_depth=1,
)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, **weights) env2 = stix2.Environment().graph_similarity(
ds, fs, prop_scores2,
ignore_spec_version=True,
versioning_checks=False,
max_depth=1,
)
assert round(env1) == 23 assert round(env1) == 23
assert round(prop_scores1["matching_score"]) == 411 assert round(prop_scores1["matching_score"]) == 411
@ -1154,14 +1143,11 @@ def test_depth_limiting():
"some2_ref": (33, stix2.equivalence.object.reference_check), "some2_ref": (33, stix2.equivalence.object.reference_check),
"name": (34, stix2.equivalence.object.partial_string_based), "name": (34, stix2.equivalence.object.partial_string_based),
}, },
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
} }
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.equivalence.graph.graph_similarity(mem_store1, mem_store2, prop_scores1, **custom_weights) env1 = stix2.equivalence.graph.graph_similarity(
mem_store1, mem_store2, prop_scores1, **custom_weights
)
assert round(env1) == 38 assert round(env1) == 38
assert round(prop_scores1["matching_score"]) == 300 assert round(prop_scores1["matching_score"]) == 300
@ -1185,44 +1171,23 @@ def test_depth_limiting():
def test_graph_similarity_with_duplicate_graph(ds): def test_graph_similarity_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {} prop_scores = {}
env = stix2.Environment().graph_similarity(ds, ds, prop_scores, **weights) env = stix2.Environment().graph_similarity(ds, ds, prop_scores)
assert round(env) == 100 assert round(env) == 100
assert round(prop_scores["matching_score"]) == 800 assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8 assert round(prop_scores["len_pairs"]) == 8
def test_graph_similarity_with_versioning_check_on(ds2, ds): def test_graph_similarity_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights) env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, versioning_checks=True)
assert round(env1) == 88 assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789 assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9 assert round(prop_scores1["len_pairs"]) == 9
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights) env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, versioning_checks=True)
assert round(env2) == 88 assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789 assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9 assert round(prop_scores2["len_pairs"]) == 9
@ -1233,29 +1198,15 @@ def test_graph_similarity_with_versioning_check_on(ds2, ds):
def test_graph_similarity_with_versioning_check_off(ds2, ds): def test_graph_similarity_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights) env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1)
assert round(env1) == 88 assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789 assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9 assert round(prop_scores1["len_pairs"]) == 9
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights) env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2)
assert round(env2) == 88 assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789 assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9 assert round(prop_scores2["len_pairs"]) == 9
@ -1266,26 +1217,12 @@ def test_graph_similarity_with_versioning_check_off(ds2, ds):
def test_graph_equivalence_with_filesystem_source(ds, fs): def test_graph_equivalence_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, **weights) env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, ignore_spec_version=True)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, **weights) env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, ignore_spec_version=True)
assert env1 is False assert env1 is False
assert round(prop_scores1["matching_score"]) == 411 assert round(prop_scores1["matching_score"]) == 411
@ -1301,41 +1238,20 @@ def test_graph_equivalence_with_filesystem_source(ds, fs):
def test_graph_equivalence_with_duplicate_graph(ds): def test_graph_equivalence_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {} prop_scores = {}
env = stix2.Environment().graph_equivalence(ds, ds, prop_scores, **weights) env = stix2.Environment().graph_equivalence(ds, ds, prop_scores)
assert env is True assert env is True
assert round(prop_scores["matching_score"]) == 800 assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8 assert round(prop_scores["len_pairs"]) == 8
def test_graph_equivalence_with_versioning_check_on(ds2, ds): def test_graph_equivalence_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights) env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, versioning_checks=True)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights) env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, versioning_checks=True)
assert env1 is True assert env1 is True
assert round(prop_scores1["matching_score"]) == 789 assert round(prop_scores1["matching_score"]) == 789
@ -1351,26 +1267,12 @@ def test_graph_equivalence_with_versioning_check_on(ds2, ds):
def test_graph_equivalence_with_versioning_check_off(ds2, ds): def test_graph_equivalence_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights) env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights) env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2)
assert env1 is True assert env1 is True
assert round(prop_scores1["matching_score"]) == 789 assert round(prop_scores1["matching_score"]) == 789