expose configuration options, combine weight dictionary, update tests

pull/1/head
Emmanuelle Vargas-Gonzalez 2021-03-01 12:27:52 -05:00
parent f9a52eeed3
commit ff5014c606
6 changed files with 284 additions and 378 deletions

View File

@ -66,16 +66,9 @@ object_default_sem_eq_weights = json.dumps(WEIGHTS, indent=4, default=lambda o:
object_default_sem_eq_weights = object_default_sem_eq_weights.replace('\n', '\n ') object_default_sem_eq_weights = object_default_sem_eq_weights.replace('\n', '\n ')
object_default_sem_eq_weights = object_default_sem_eq_weights.replace(' "', ' ') object_default_sem_eq_weights = object_default_sem_eq_weights.replace(' "', ' ')
object_default_sem_eq_weights = object_default_sem_eq_weights.replace('"\n', '\n') object_default_sem_eq_weights = object_default_sem_eq_weights.replace('"\n', '\n')
with open('object_default_sem_eq_weights.rst', 'w') as f: with open('similarity_weights.rst', 'w') as f:
f.write(".. code-block:: python\n\n {}\n\n".format(object_default_sem_eq_weights)) f.write(".. code-block:: python\n\n {}\n\n".format(object_default_sem_eq_weights))
graph_default_sem_eq_weights = json.dumps(GRAPH_WEIGHTS, indent=4, default=lambda o: o.__name__)
graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace('\n', '\n ')
graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace(' "', ' ')
graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace('"\n', '\n')
with open('graph_default_sem_eq_weights.rst', 'w') as f:
f.write(".. code-block:: python\n\n {}\n\n".format(graph_default_sem_eq_weights))
def get_property_type(prop): def get_property_type(prop):
"""Convert property classname into pretty string name of property. """Convert property classname into pretty string name of property.

View File

@ -189,8 +189,11 @@ class Environment(DataStoreMixin):
return None return None
@staticmethod @staticmethod
def object_similarity(obj1, obj2, prop_scores={}, ignore_spec_version=False, def object_similarity(
versioning_checks=False, max_depth=1, **weight_dict): obj1, obj2, prop_scores={}, ds1=None, ds2=None,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a measure of how similar the two objects are. """This method returns a measure of how similar the two objects are.
Args: Args:
@ -198,8 +201,19 @@ class Environment(DataStoreMixin):
obj2: A stix2 object instance obj2: A stix2 object instance
prop_scores: A dictionary that can hold individual property scores, prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights. weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings ds1: A DataStore object instance representing your graph
in the similarity process ds2: A DataStore object instance representing your graph
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns: Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity. float: A number between 0.0 and 100.0 as a measurement of similarity.
@ -221,12 +235,17 @@ class Environment(DataStoreMixin):
see `the Committee Note <link here>`__. see `the Committee Note <link here>`__.
""" """
return object_similarity(obj1, obj2, prop_scores, ignore_spec_version, return object_similarity(
versioning_checks, max_depth, **weight_dict) obj1, obj2, prop_scores, ds1, ds2, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)
@staticmethod @staticmethod
def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, ignore_spec_version=False, def object_equivalence(
versioning_checks=False, max_depth=1, **weight_dict): obj1, obj2, prop_scores={}, threshold=70, ds1=None, ds2=None,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a true/false value if two objects are semantically equivalent. """This method returns a true/false value if two objects are semantically equivalent.
Internally, it calls the object_similarity function and compares it against the given Internally, it calls the object_similarity function and compares it against the given
threshold value. threshold value.
@ -239,8 +258,19 @@ class Environment(DataStoreMixin):
threshold: A numerical value between 0 and 100 to determine the minimum threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both objects equivalent. This score to result in successfully calling both objects equivalent. This
value can be tuned. value can be tuned.
weight_dict: A dictionary that can be used to override settings ds1: A DataStore object instance representing your graph
in the similarity process ds2: A DataStore object instance representing your graph
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns: Returns:
bool: True if the result of the object similarity is greater than or equal to bool: True if the result of the object similarity is greater than or equal to
@ -263,11 +293,16 @@ class Environment(DataStoreMixin):
see `the Committee Note <link here>`__. see `the Committee Note <link here>`__.
""" """
return object_equivalence(obj1, obj2, prop_scores, threshold, **weight_dict) return object_equivalence(
obj1, obj2, prop_scores, threshold, ds1, ds2,
ignore_spec_version, versioning_checks, max_depth, **weight_dict
)
@staticmethod @staticmethod
def graph_similarity(ds1, ds2, prop_scores={}, ignore_spec_version=False, def graph_similarity(
versioning_checks=False, max_depth=1, **weight_dict): ds1, ds2, prop_scores={}, ignore_spec_version=False,
versioning_checks=False, max_depth=1, **weight_dict
):
"""This method returns a similarity score for two given graphs. """This method returns a similarity score for two given graphs.
Each DataStore can contain a connected or disconnected graph and the Each DataStore can contain a connected or disconnected graph and the
final result is weighted over the amount of objects we managed to compare. final result is weighted over the amount of objects we managed to compare.
@ -279,8 +314,17 @@ class Environment(DataStoreMixin):
ds2: A DataStore object instance representing your graph ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores, prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights. weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings ignore_spec_version: A boolean indicating whether to test object types
in the similarity process that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns: Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity. float: A number between 0.0 and 100.0 as a measurement of similarity.
@ -295,19 +339,24 @@ class Environment(DataStoreMixin):
Note: Note:
Default weight_dict: Default weight_dict:
.. include:: ../graph_default_sem_eq_weights.rst .. include:: ../similarity_weights.rst
Note: Note:
This implementation follows the Semantic Equivalence Committee Note. This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__. see `the Committee Note <link here>`__.
""" """
return graph_similarity(ds1, ds2, prop_scores, ignore_spec_version, return graph_similarity(
versioning_checks, max_depth, **weight_dict) ds1, ds2, prop_scores, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)
@staticmethod @staticmethod
def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, ignore_spec_version=False, def graph_equivalence(
versioning_checks=False, max_depth=1, **weight_dict): ds1, ds2, prop_scores={}, threshold=70,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a true/false value if two graphs are semantically equivalent. """This method returns a true/false value if two graphs are semantically equivalent.
Internally, it calls the graph_similarity function and compares it against the given Internally, it calls the graph_similarity function and compares it against the given
threshold value. threshold value.
@ -320,8 +369,17 @@ class Environment(DataStoreMixin):
threshold: A numerical value between 0 and 100 to determine the minimum threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both graphs equivalent. This score to result in successfully calling both graphs equivalent. This
value can be tuned. value can be tuned.
weight_dict: A dictionary that can be used to override settings ignore_spec_version: A boolean indicating whether to test object types
in the similarity process that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns: Returns:
bool: True if the result of the graph similarity is greater than or equal to bool: True if the result of the graph similarity is greater than or equal to
@ -337,11 +395,14 @@ class Environment(DataStoreMixin):
Note: Note:
Default weight_dict: Default weight_dict:
.. include:: ../graph_default_sem_eq_weights.rst .. include:: ../similarity_weights.rst
Note: Note:
This implementation follows the Semantic Equivalence Committee Note. This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__. see `the Committee Note <link here>`__.
""" """
return graph_equivalence(ds1, ds2, prop_scores, threshold, **weight_dict) return graph_equivalence(
ds1, ds2, prop_scores, threshold, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)

View File

@ -10,7 +10,11 @@ from ..object import (
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict): def graph_equivalence(
ds1, ds2, prop_scores={}, threshold=70,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a true/false value if two graphs are semantically equivalent. """This method returns a true/false value if two graphs are semantically equivalent.
Internally, it calls the graph_similarity function and compares it against the given Internally, it calls the graph_similarity function and compares it against the given
threshold value. threshold value.
@ -23,8 +27,17 @@ def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict):
threshold: A numerical value between 0 and 100 to determine the minimum threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both graphs equivalent. This score to result in successfully calling both graphs equivalent. This
value can be tuned. value can be tuned.
weight_dict: A dictionary that can be used to override settings ignore_spec_version: A boolean indicating whether to test object types
in the similarity process that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns: Returns:
bool: True if the result of the graph similarity is greater than or equal to bool: True if the result of the graph similarity is greater than or equal to
@ -40,21 +53,26 @@ def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict):
Note: Note:
Default weight_dict: Default weight_dict:
.. include:: ../../graph_default_sem_eq_weights.rst .. include:: ../../similarity_weights.rst
Note: Note:
This implementation follows the Semantic Equivalence Committee Note. This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__. see `the Committee Note <link here>`__.
""" """
similarity_result = graph_similarity(ds1, ds2, prop_scores, **weight_dict) similarity_result = graph_similarity(
ds1, ds2, prop_scores, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)
if similarity_result >= threshold: if similarity_result >= threshold:
return True return True
return False return False
def graph_similarity(ds1, ds2, prop_scores={}, ignore_spec_version=False, def graph_similarity(
versioning_checks=False, max_depth=1, **weight_dict): ds1, ds2, prop_scores={}, ignore_spec_version=False,
versioning_checks=False, max_depth=1, **weight_dict
):
"""This method returns a similarity score for two given graphs. """This method returns a similarity score for two given graphs.
Each DataStore can contain a connected or disconnected graph and the Each DataStore can contain a connected or disconnected graph and the
final result is weighted over the amount of objects we managed to compare. final result is weighted over the amount of objects we managed to compare.
@ -66,11 +84,17 @@ def graph_similarity(ds1, ds2, prop_scores={}, ignore_spec_version=False,
ds2: A DataStore object instance representing your graph ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores, prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights. weights, contributing score, matching score and sum of weights.
ignore_spec_version: As ignore_spec_version: A boolean indicating whether to test object types
versioning_checks: As that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
max_depth: As If set to True this check will be skipped.
weight_dict: A dictionary that can be used to override settings versioning_checks: A boolean indicating whether to test multiple revisions
in the similarity process of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns: Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity. float: A number between 0.0 and 100.0 as a measurement of similarity.
@ -85,7 +109,7 @@ def graph_similarity(ds1, ds2, prop_scores={}, ignore_spec_version=False,
Note: Note:
Default weight_dict: Default weight_dict:
.. include:: ../../graph_default_sem_eq_weights.rst .. include:: ../../similarity_weights.rst
Note: Note:
This implementation follows the Semantic Equivalence Committee Note. This implementation follows the Semantic Equivalence Committee Note.
@ -107,7 +131,7 @@ def graph_similarity(ds1, ds2, prop_scores={}, ignore_spec_version=False,
"max_depth": max_depth, "max_depth": max_depth,
} }
if weights["_internal"]["max_depth"] <= 0: if max_depth <= 0:
raise ValueError("'max_depth' must be greater than 0") raise ValueError("'max_depth' must be greater than 0")
pairs = _object_pairs( pairs = _object_pairs(
@ -122,9 +146,11 @@ def graph_similarity(ds1, ds2, prop_scores={}, ignore_spec_version=False,
object1_id = object1["id"] object1_id = object1["id"]
object2_id = object2["id"] object2_id = object2["id"]
result = object_similarity(object1, object2, iprop_score, ds1, ds2, result = object_similarity(
object1, object2, iprop_score, ds1, ds2,
ignore_spec_version, versioning_checks, ignore_spec_version, versioning_checks,
max_depth, **weights) max_depth, **weights
)
if object1_id not in results: if object1_id not in results:
results[object1_id] = {"lhs": object1_id, "rhs": object2_id, "prop_score": iprop_score, "value": result} results[object1_id] = {"lhs": object1_id, "rhs": object2_id, "prop_score": iprop_score, "value": result}

View File

@ -4,14 +4,18 @@ import itertools
import logging import logging
import time import time
from ...datastore import Filter, DataStoreMixin, DataSink, DataSource from ...datastore import DataSink, DataSource, DataStoreMixin, Filter
from ...utils import STIXdatetime, parse_into_datetime from ...utils import STIXdatetime, parse_into_datetime
from ..pattern import equivalent_patterns from ..pattern import equivalent_patterns
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict): def object_equivalence(
obj1, obj2, prop_scores={}, threshold=70, ds1=None,
ds2=None, ignore_spec_version=False,
versioning_checks=False, max_depth=1, **weight_dict
):
"""This method returns a true/false value if two objects are semantically equivalent. """This method returns a true/false value if two objects are semantically equivalent.
Internally, it calls the object_similarity function and compares it against the given Internally, it calls the object_similarity function and compares it against the given
threshold value. threshold value.
@ -24,8 +28,19 @@ def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict):
threshold: A numerical value between 0 and 100 to determine the minimum threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both objects equivalent. This score to result in successfully calling both objects equivalent. This
value can be tuned. value can be tuned.
weight_dict: A dictionary that can be used to override settings ds1: A DataStore object instance representing your graph
in the similarity process ds2: A DataStore object instance representing your graph
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns: Returns:
bool: True if the result of the object similarity is greater than or equal to bool: True if the result of the object similarity is greater than or equal to
@ -41,22 +56,27 @@ def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict):
Note: Note:
Default weight_dict: Default weight_dict:
.. include:: ../../object_default_sem_eq_weights.rst .. include:: ../../similarity_weights.rst
Note: Note:
This implementation follows the Semantic Equivalence Committee Note. This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__. see `the Committee Note <link here>`__.
""" """
similarity_result = object_similarity(obj1, obj2, prop_scores, **weight_dict) similarity_result = object_similarity(
obj1, obj2, prop_scores, ds1, ds2, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)
if similarity_result >= threshold: if similarity_result >= threshold:
return True return True
return False return False
def object_similarity(obj1, obj2, prop_scores={}, ds1=None, ds2=None, def object_similarity(
obj1, obj2, prop_scores={}, ds1=None, ds2=None,
ignore_spec_version=False, versioning_checks=False, ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict): max_depth=1, **weight_dict
):
"""This method returns a measure of similarity depending on how """This method returns a measure of similarity depending on how
similar the two objects are. similar the two objects are.
@ -65,13 +85,19 @@ def object_similarity(obj1, obj2, prop_scores={}, ds1=None, ds2=None,
obj2: A stix2 object instance obj2: A stix2 object instance
prop_scores: A dictionary that can hold individual property scores, prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights. weights, contributing score, matching score and sum of weights.
ds1: As ds1: A DataStore object instance representing your graph
ds2: As ds2: A DataStore object instance representing your graph
ignore_spec_version: As ignore_spec_version: A boolean indicating whether to test object types
versioning_checks: As that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
max_depth: As If set to True this check will be skipped.
weight_dict: A dictionary that can be used to override settings versioning_checks: A boolean indicating whether to test multiple revisions
in the similarity process of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns: Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity. float: A number between 0.0 and 100.0 as a measurement of similarity.
@ -86,7 +112,7 @@ def object_similarity(obj1, obj2, prop_scores={}, ds1=None, ds2=None,
Note: Note:
Default weight_dict: Default weight_dict:
.. include:: ../../object_default_sem_eq_weights.rst .. include:: ../../similarity_weights.rst
Note: Note:
This implementation follows the Semantic Equivalence Committee Note. This implementation follows the Semantic Equivalence Committee Note.
@ -107,7 +133,6 @@ def object_similarity(obj1, obj2, prop_scores={}, ds1=None, ds2=None,
} }
type1, type2 = obj1["type"], obj2["type"] type1, type2 = obj1["type"], obj2["type"]
ignore_spec_version = weights["_internal"]["ignore_spec_version"]
if type1 != type2: if type1 != type2:
raise ValueError('The objects to compare must be of the same type!') raise ValueError('The objects to compare must be of the same type!')
@ -140,9 +165,8 @@ def object_similarity(obj1, obj2, prop_scores={}, ds1=None, ds2=None,
threshold = weights[type1]["threshold"] threshold = weights[type1]["threshold"]
contributing_score = w * comp_funct(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], threshold) contributing_score = w * comp_funct(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], threshold)
elif comp_funct == reference_check or comp_funct == list_reference_check: elif comp_funct == reference_check or comp_funct == list_reference_check:
max_depth_i = weights["_internal"]["max_depth"] if max_depth > 0:
if max_depth_i > 0: weights["_internal"]["max_depth"] = max_depth - 1
weights["_internal"]["max_depth"] = max_depth_i - 1
ds1, ds2 = weights["_internal"]["ds1"], weights["_internal"]["ds2"] ds1, ds2 = weights["_internal"]["ds1"], weights["_internal"]["ds2"]
if _datastore_check(ds1, ds2): if _datastore_check(ds1, ds2):
contributing_score = w * comp_funct(obj1[prop], obj2[prop], ds1, ds2, **weights) contributing_score = w * comp_funct(obj1[prop], obj2[prop], ds1, ds2, **weights)
@ -155,7 +179,7 @@ def object_similarity(obj1, obj2, prop_scores={}, ds1=None, ds2=None,
prop_scores[prop]["method"] = comp_funct.__name__ prop_scores[prop]["method"] = comp_funct.__name__
else: else:
continue # prevent excessive recursion continue # prevent excessive recursion
weights["_internal"]["max_depth"] = max_depth_i weights["_internal"]["max_depth"] = max_depth
else: else:
contributing_score = w * comp_funct(obj1[prop], obj2[prop]) contributing_score = w * comp_funct(obj1[prop], obj2[prop])
@ -187,7 +211,7 @@ def object_similarity(obj1, obj2, prop_scores={}, ds1=None, ds2=None,
def check_property_present(prop, obj1, obj2): def check_property_present(prop, obj1, obj2):
"""Helper method checks if a property is present on both objects.""" """Helper method checks if a property is present on both objects."""
if prop == "longitude_latitude": if prop == "longitude_latitude":
if all(x in obj1 and x in obj2 for x in ['latitude', 'longitude']): if all(x in obj1 and x in obj2 for x in ('latitude', 'longitude')):
return True return True
elif prop in obj1 and prop in obj2: elif prop in obj1 and prop in obj2:
return True return True
@ -286,12 +310,12 @@ def custom_pattern_based(pattern1, pattern2):
return equivalent_patterns(pattern1, pattern2) return equivalent_patterns(pattern1, pattern2)
def partial_external_reference_based(refs1, refs2): def partial_external_reference_based(ext_refs1, ext_refs2):
"""Performs a matching on External References. """Performs a matching on External References.
Args: Args:
refs1: A list of external references. ext_refs1: A list of external references.
refs2: A list of external references. ext_refs2: A list of external references.
Returns: Returns:
float: Number between 0.0 and 1.0 depending on matches. float: Number between 0.0 and 1.0 depending on matches.
@ -300,8 +324,11 @@ def partial_external_reference_based(refs1, refs2):
allowed = {"veris", "cve", "capec", "mitre-attack"} allowed = {"veris", "cve", "capec", "mitre-attack"}
matches = 0 matches = 0
for ext_ref1 in refs1: ref_pairs = itertools.chain(
for ext_ref2 in refs2: itertools.product(ext_refs1, ext_refs2),
)
for ext_ref1, ext_ref2 in ref_pairs:
sn_match = False sn_match = False
ei_match = False ei_match = False
url_match = False url_match = False
@ -325,7 +352,7 @@ def partial_external_reference_based(refs1, refs2):
result = 1.0 result = 1.0
logger.debug( logger.debug(
"--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'", "--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
refs1, refs2, result, ext_refs1, ext_refs2, result,
) )
return result return result
@ -334,10 +361,10 @@ def partial_external_reference_based(refs1, refs2):
if (sn_match or ei_match or url_match) and source_name not in allowed: if (sn_match or ei_match or url_match) and source_name not in allowed:
matches += 1 matches += 1
result = matches / max(len(refs1), len(refs2)) result = matches / max(len(ext_refs1), len(ext_refs2))
logger.debug( logger.debug(
"--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'", "--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
refs1, refs2, result, ext_refs1, ext_refs2, result,
) )
return result return result
@ -381,10 +408,11 @@ def _versioned_checks(ref1, ref2, ds1, ds2, **weights):
max_depth = weights["_internal"]["max_depth"] max_depth = weights["_internal"]["max_depth"]
for object1, object2 in pairs: for object1, object2 in pairs:
result = object_similarity(object1, object2, ds1=ds1, ds2=ds2, result = object_similarity(
ignore_spec_version=ignore_spec_version, object1, object2, ds1, ds2,
versioning_checks=versioning_checks, ignore_spec_version, versioning_checks,
max_depth=max_depth, **weights) max_depth, **weights
)
if ref1 not in results: if ref1 not in results:
results[ref1] = {"matched": ref2, "value": result} results[ref1] = {"matched": ref2, "value": result}
elif result > results[ref1]["value"]: elif result > results[ref1]["value"]:
@ -413,10 +441,11 @@ def reference_check(ref1, ref2, ds1, ds2, **weights):
else: else:
o1, o2 = ds1.get(ref1), ds2.get(ref2) o1, o2 = ds1.get(ref1), ds2.get(ref2)
if o1 and o2: if o1 and o2:
result = object_similarity(o1, o2, ds1=ds1, ds2=ds2, result = object_similarity(
ignore_spec_version=ignore_spec_version, o1, o2, ds1, ds2,
versioning_checks=versioning_checks, ignore_spec_version, versioning_checks,
max_depth=max_depth, **weights) / 100.0 max_depth, **weights
) / 100.0
logger.debug( logger.debug(
"--\t\treference_check '%s' '%s'\tresult: '%s'", "--\t\treference_check '%s' '%s'\tresult: '%s'",
@ -468,8 +497,10 @@ def list_reference_check(refs1, refs2, ds1, ds2, **weights):
def _datastore_check(ds1, ds2): def _datastore_check(ds1, ds2):
if (issubclass(ds1.__class__, (DataStoreMixin, DataSink, DataSource)) or if (
issubclass(ds2.__class__, (DataStoreMixin, DataSink, DataSource))): issubclass(ds1.__class__, (DataStoreMixin, DataSink, DataSource)) or
issubclass(ds2.__class__, (DataStoreMixin, DataSink, DataSource))
):
return True return True
return False return False
@ -586,5 +617,5 @@ WEIGHTS = {
"vulnerability": { "vulnerability": {
"name": (30, partial_string_based), "name": (30, partial_string_based),
"external_references": (70, partial_external_reference_based), "external_references": (70, partial_external_reference_based),
} },
} # :autodoc-skip: } # :autodoc-skip:

View File

@ -424,7 +424,7 @@ def test_related_to_by_target(ds):
def test_versioned_checks(ds, ds2): def test_versioned_checks(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({ weights.update({
"_internal": { "_internal": {
"ignore_spec_version": True, "ignore_spec_version": True,
@ -437,7 +437,7 @@ def test_versioned_checks(ds, ds2):
def test_semantic_check_with_versioning(ds, ds2): def test_semantic_check_with_versioning(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({ weights.update({
"_internal": { "_internal": {
"ignore_spec_version": False, "ignore_spec_version": False,
@ -467,13 +467,11 @@ def test_semantic_check_with_versioning(ds, ds2):
def test_list_semantic_check(ds, ds2): def test_list_semantic_check(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({ weights.update({
"_internal": { "_internal": {
"ignore_spec_version": False, "ignore_spec_version": False,
"versioning_checks": False, "versioning_checks": False,
"ds1": ds,
"ds2": ds2,
"max_depth": 1, "max_depth": 1,
}, },
}) })
@ -504,39 +502,18 @@ def test_list_semantic_check(ds, ds2):
def test_graph_similarity_raises_value_error(ds): def test_graph_similarity_raises_value_error(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": -1,
},
}
with pytest.raises(ValueError): with pytest.raises(ValueError):
prop_scores1 = {} prop_scores1 = {}
stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights) stix2.Environment().graph_similarity(ds, ds2, prop_scores1, max_depth=-1)
def test_graph_similarity_with_filesystem_source(ds, fs): def test_graph_similarity_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, **weights) env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, ignore_spec_version=True)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, **weights) env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, ignore_spec_version=True)
assert round(env1) == 25 assert round(env1) == 25
assert round(prop_scores1["matching_score"]) == 451 assert round(prop_scores1["matching_score"]) == 451
@ -552,41 +529,20 @@ def test_graph_similarity_with_filesystem_source(ds, fs):
def test_graph_similarity_with_duplicate_graph(ds): def test_graph_similarity_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {} prop_scores = {}
env = stix2.Environment().graph_similarity(ds, ds, prop_scores, **weights) env = stix2.Environment().graph_similarity(ds, ds, prop_scores)
assert round(env) == 100 assert round(env) == 100
assert round(prop_scores["matching_score"]) == 800 assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8 assert round(prop_scores["len_pairs"]) == 8
def test_graph_similarity_with_versioning_check_on(ds2, ds): def test_graph_similarity_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights) env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, versioning_checks=True)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights) env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, versioning_checks=True)
assert round(env1) == 88 assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789 assert round(prop_scores1["matching_score"]) == 789
@ -602,26 +558,12 @@ def test_graph_similarity_with_versioning_check_on(ds2, ds):
def test_graph_similarity_with_versioning_check_off(ds2, ds): def test_graph_similarity_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights) env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights) env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2)
assert round(env1) == 88 assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789 assert round(prop_scores1["matching_score"]) == 789
@ -637,26 +579,12 @@ def test_graph_similarity_with_versioning_check_off(ds2, ds):
def test_graph_equivalence_with_filesystem_source(ds, fs): def test_graph_equivalence_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, **weights) env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, ignore_spec_version=True)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, **weights) env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, ignore_spec_version=True)
assert env1 is False assert env1 is False
assert round(prop_scores1["matching_score"]) == 451 assert round(prop_scores1["matching_score"]) == 451
@ -672,41 +600,20 @@ def test_graph_equivalence_with_filesystem_source(ds, fs):
def test_graph_equivalence_with_duplicate_graph(ds): def test_graph_equivalence_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {} prop_scores = {}
env = stix2.Environment().graph_equivalence(ds, ds, prop_scores, **weights) env = stix2.Environment().graph_equivalence(ds, ds, prop_scores)
assert env is True assert env is True
assert round(prop_scores["matching_score"]) == 800 assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8 assert round(prop_scores["len_pairs"]) == 8
def test_graph_equivalence_with_versioning_check_on(ds2, ds): def test_graph_equivalence_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights) env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, versioning_checks=True)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights) env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, versioning_checks=True)
assert env1 is True assert env1 is True
assert round(prop_scores1["matching_score"]) == 789 assert round(prop_scores1["matching_score"]) == 789
@ -722,26 +629,12 @@ def test_graph_equivalence_with_versioning_check_on(ds2, ds):
def test_graph_equivalence_with_versioning_check_off(ds2, ds): def test_graph_equivalence_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights) env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights) env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2)
assert env1 is True assert env1 is True
assert round(prop_scores1["matching_score"]) == 789 assert round(prop_scores1["matching_score"]) == 789

View File

@ -760,16 +760,13 @@ def test_object_similarity_different_spec_version():
"valid_from": (5, stix2.equivalence.object.partial_timestamp_based), "valid_from": (5, stix2.equivalence.object.partial_timestamp_based),
"tdelta": 1, # One day interval "tdelta": 1, # One day interval
}, },
"_internal": {
"ignore_spec_version": True, # Disables spec_version check.
},
} }
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **IND_KWARGS) ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **IND_KWARGS)
env = stix2.Environment().object_similarity(ind1, ind2, **weights) env = stix2.Environment().object_similarity(ind1, ind2, ignore_spec_version=True, **weights)
assert round(env) == 0 assert round(env) == 0
env = stix2.Environment().object_similarity(ind2, ind1, **weights) env = stix2.Environment().object_similarity(ind2, ind1, ignore_spec_version=True, **weights)
assert round(env) == 0 assert round(env) == 0
@ -861,7 +858,9 @@ def test_object_similarity_exact_match():
def test_non_existent_config_for_object(): def test_non_existent_config_for_object():
r1 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS) r1 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
r2 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS) r2 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
assert stix2.Environment().object_similarity(r1, r2) == 0.0 prop_scores = {}
assert stix2.Environment().object_similarity(r1, r2, prop_scores) == 100.0
assert prop_scores["object_refs"]["method"] == "partial_list_based"
def custom_semantic_equivalence_method(obj1, obj2, **weights): def custom_semantic_equivalence_method(obj1, obj2, **weights):
@ -937,7 +936,8 @@ def test_object_similarity_prop_scores_method_provided():
def test_versioned_checks(ds, ds2): def test_versioned_checks(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() # Testing internal method
weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({ weights.update({
"_internal": { "_internal": {
"ignore_spec_version": True, "ignore_spec_version": True,
@ -950,7 +950,7 @@ def test_versioned_checks(ds, ds2):
def test_semantic_check_with_versioning(ds, ds2): def test_semantic_check_with_versioning(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({ weights.update({
"_internal": { "_internal": {
"ignore_spec_version": False, "ignore_spec_version": False,
@ -981,7 +981,7 @@ def test_semantic_check_with_versioning(ds, ds2):
def test_list_semantic_check(ds, ds2): def test_list_semantic_check(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({ weights.update({
"_internal": { "_internal": {
"ignore_spec_version": False, "ignore_spec_version": False,
@ -1027,39 +1027,28 @@ def test_list_semantic_check(ds, ds2):
def test_graph_similarity_raises_value_error(ds): def test_graph_similarity_raises_value_error(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": -1,
},
}
with pytest.raises(ValueError): with pytest.raises(ValueError):
prop_scores1 = {} prop_scores1 = {}
stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights) stix2.Environment().graph_similarity(ds, ds2, prop_scores1, max_depth=-1)
def test_graph_similarity_with_filesystem_source(ds, fs): def test_graph_similarity_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, **weights) env1 = stix2.Environment().graph_similarity(
fs, ds, prop_scores1,
ignore_spec_version=True,
versioning_checks=False,
max_depth=1,
)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, **weights) env2 = stix2.Environment().graph_similarity(
ds, fs, prop_scores2,
ignore_spec_version=True,
versioning_checks=False,
max_depth=1,
)
assert round(env1) == 23 assert round(env1) == 23
assert round(prop_scores1["matching_score"]) == 411 assert round(prop_scores1["matching_score"]) == 411
@ -1154,14 +1143,11 @@ def test_depth_limiting():
"some2_ref": (33, stix2.equivalence.object.reference_check), "some2_ref": (33, stix2.equivalence.object.reference_check),
"name": (34, stix2.equivalence.object.partial_string_based), "name": (34, stix2.equivalence.object.partial_string_based),
}, },
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
} }
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.equivalence.graph.graph_similarity(mem_store1, mem_store2, prop_scores1, **custom_weights) env1 = stix2.equivalence.graph.graph_similarity(
mem_store1, mem_store2, prop_scores1, **custom_weights
)
assert round(env1) == 38 assert round(env1) == 38
assert round(prop_scores1["matching_score"]) == 300 assert round(prop_scores1["matching_score"]) == 300
@ -1185,44 +1171,23 @@ def test_depth_limiting():
def test_graph_similarity_with_duplicate_graph(ds): def test_graph_similarity_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {} prop_scores = {}
env = stix2.Environment().graph_similarity(ds, ds, prop_scores, **weights) env = stix2.Environment().graph_similarity(ds, ds, prop_scores)
assert round(env) == 100 assert round(env) == 100
assert round(prop_scores["matching_score"]) == 800 assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8 assert round(prop_scores["len_pairs"]) == 8
def test_graph_similarity_with_versioning_check_on(ds2, ds): def test_graph_similarity_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights) env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, versioning_checks=True)
assert round(env1) == 88 assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789 assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9 assert round(prop_scores1["len_pairs"]) == 9
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights) env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, versioning_checks=True)
assert round(env2) == 88 assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789 assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9 assert round(prop_scores2["len_pairs"]) == 9
@ -1233,29 +1198,15 @@ def test_graph_similarity_with_versioning_check_on(ds2, ds):
def test_graph_similarity_with_versioning_check_off(ds2, ds): def test_graph_similarity_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights) env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1)
assert round(env1) == 88 assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789 assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9 assert round(prop_scores1["len_pairs"]) == 9
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights) env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2)
assert round(env2) == 88 assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789 assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9 assert round(prop_scores2["len_pairs"]) == 9
@ -1266,26 +1217,12 @@ def test_graph_similarity_with_versioning_check_off(ds2, ds):
def test_graph_equivalence_with_filesystem_source(ds, fs): def test_graph_equivalence_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, **weights) env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, ignore_spec_version=True)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, **weights) env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, ignore_spec_version=True)
assert env1 is False assert env1 is False
assert round(prop_scores1["matching_score"]) == 411 assert round(prop_scores1["matching_score"]) == 411
@ -1301,41 +1238,20 @@ def test_graph_equivalence_with_filesystem_source(ds, fs):
def test_graph_equivalence_with_duplicate_graph(ds): def test_graph_equivalence_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {} prop_scores = {}
env = stix2.Environment().graph_equivalence(ds, ds, prop_scores, **weights) env = stix2.Environment().graph_equivalence(ds, ds, prop_scores)
assert env is True assert env is True
assert round(prop_scores["matching_score"]) == 800 assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8 assert round(prop_scores["len_pairs"]) == 8
def test_graph_equivalence_with_versioning_check_on(ds2, ds): def test_graph_equivalence_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights) env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, versioning_checks=True)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights) env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, versioning_checks=True)
assert env1 is True assert env1 is True
assert round(prop_scores1["matching_score"]) == 789 assert round(prop_scores1["matching_score"]) == 789
@ -1351,26 +1267,12 @@ def test_graph_equivalence_with_versioning_check_on(ds2, ds):
def test_graph_equivalence_with_versioning_check_off(ds2, ds): def test_graph_equivalence_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {} prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights) env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1)
# Switching parameters # Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {} prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights) env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2)
assert env1 is True assert env1 is True
assert round(prop_scores1["matching_score"]) == 789 assert round(prop_scores1["matching_score"]) == 789