From ff5014c606858053ad6eee6a13438a67dffe388f Mon Sep 17 00:00:00 2001 From: Emmanuelle Vargas-Gonzalez Date: Mon, 1 Mar 2021 12:27:52 -0500 Subject: [PATCH] expose configuration options, combine weight dictionary, update tests --- docs/conf.py | 9 +- stix2/environment.py | 109 +++++++++++++---- stix2/equivalence/graph/__init__.py | 60 +++++++--- stix2/equivalence/object/__init__.py | 169 +++++++++++++++----------- stix2/test/v20/test_environment.py | 143 +++------------------- stix2/test/v21/test_environment.py | 172 ++++++--------------------- 6 files changed, 284 insertions(+), 378 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 5d12af3..62e829d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -66,16 +66,9 @@ object_default_sem_eq_weights = json.dumps(WEIGHTS, indent=4, default=lambda o: object_default_sem_eq_weights = object_default_sem_eq_weights.replace('\n', '\n ') object_default_sem_eq_weights = object_default_sem_eq_weights.replace(' "', ' ') object_default_sem_eq_weights = object_default_sem_eq_weights.replace('"\n', '\n') -with open('object_default_sem_eq_weights.rst', 'w') as f: +with open('similarity_weights.rst', 'w') as f: f.write(".. code-block:: python\n\n {}\n\n".format(object_default_sem_eq_weights)) -graph_default_sem_eq_weights = json.dumps(GRAPH_WEIGHTS, indent=4, default=lambda o: o.__name__) -graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace('\n', '\n ') -graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace(' "', ' ') -graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace('"\n', '\n') -with open('graph_default_sem_eq_weights.rst', 'w') as f: - f.write(".. code-block:: python\n\n {}\n\n".format(graph_default_sem_eq_weights)) - def get_property_type(prop): """Convert property classname into pretty string name of property. diff --git a/stix2/environment.py b/stix2/environment.py index 75e5fa5..b37b485 100644 --- a/stix2/environment.py +++ b/stix2/environment.py @@ -189,8 +189,11 @@ class Environment(DataStoreMixin): return None @staticmethod - def object_similarity(obj1, obj2, prop_scores={}, ignore_spec_version=False, - versioning_checks=False, max_depth=1, **weight_dict): + def object_similarity( + obj1, obj2, prop_scores={}, ds1=None, ds2=None, + ignore_spec_version=False, versioning_checks=False, + max_depth=1, **weight_dict + ): """This method returns a measure of how similar the two objects are. Args: @@ -198,8 +201,19 @@ class Environment(DataStoreMixin): obj2: A stix2 object instance prop_scores: A dictionary that can hold individual property scores, weights, contributing score, matching score and sum of weights. - weight_dict: A dictionary that can be used to override settings - in the similarity process + ds1: A DataStore object instance representing your graph + ds2: A DataStore object instance representing your graph + ignore_spec_version: A boolean indicating whether to test object types + that belong to different spec versions (STIX 2.0 and STIX 2.1 for example). + If set to True this check will be skipped. + versioning_checks: A boolean indicating whether to test multiple revisions + of the same object (when present) to maximize similarity against a + particular version. If set to True the algorithm will perform this step. + max_depth: A positive integer indicating the maximum recursion depth the + algorithm can reach when de-referencing objects and performing the + object_similarity algorithm. + weight_dict: A dictionary that can be used to override what checks are done + to objects in the similarity process. Returns: float: A number between 0.0 and 100.0 as a measurement of similarity. @@ -221,12 +235,17 @@ class Environment(DataStoreMixin): see `the Committee Note `__. """ - return object_similarity(obj1, obj2, prop_scores, ignore_spec_version, - versioning_checks, max_depth, **weight_dict) + return object_similarity( + obj1, obj2, prop_scores, ds1, ds2, ignore_spec_version, + versioning_checks, max_depth, **weight_dict + ) @staticmethod - def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, ignore_spec_version=False, - versioning_checks=False, max_depth=1, **weight_dict): + def object_equivalence( + obj1, obj2, prop_scores={}, threshold=70, ds1=None, ds2=None, + ignore_spec_version=False, versioning_checks=False, + max_depth=1, **weight_dict + ): """This method returns a true/false value if two objects are semantically equivalent. Internally, it calls the object_similarity function and compares it against the given threshold value. @@ -239,8 +258,19 @@ class Environment(DataStoreMixin): threshold: A numerical value between 0 and 100 to determine the minimum score to result in successfully calling both objects equivalent. This value can be tuned. - weight_dict: A dictionary that can be used to override settings - in the similarity process + ds1: A DataStore object instance representing your graph + ds2: A DataStore object instance representing your graph + ignore_spec_version: A boolean indicating whether to test object types + that belong to different spec versions (STIX 2.0 and STIX 2.1 for example). + If set to True this check will be skipped. + versioning_checks: A boolean indicating whether to test multiple revisions + of the same object (when present) to maximize similarity against a + particular version. If set to True the algorithm will perform this step. + max_depth: A positive integer indicating the maximum recursion depth the + algorithm can reach when de-referencing objects and performing the + object_similarity algorithm. + weight_dict: A dictionary that can be used to override what checks are done + to objects in the similarity process. Returns: bool: True if the result of the object similarity is greater than or equal to @@ -263,11 +293,16 @@ class Environment(DataStoreMixin): see `the Committee Note `__. """ - return object_equivalence(obj1, obj2, prop_scores, threshold, **weight_dict) + return object_equivalence( + obj1, obj2, prop_scores, threshold, ds1, ds2, + ignore_spec_version, versioning_checks, max_depth, **weight_dict + ) @staticmethod - def graph_similarity(ds1, ds2, prop_scores={}, ignore_spec_version=False, - versioning_checks=False, max_depth=1, **weight_dict): + def graph_similarity( + ds1, ds2, prop_scores={}, ignore_spec_version=False, + versioning_checks=False, max_depth=1, **weight_dict + ): """This method returns a similarity score for two given graphs. Each DataStore can contain a connected or disconnected graph and the final result is weighted over the amount of objects we managed to compare. @@ -279,8 +314,17 @@ class Environment(DataStoreMixin): ds2: A DataStore object instance representing your graph prop_scores: A dictionary that can hold individual property scores, weights, contributing score, matching score and sum of weights. - weight_dict: A dictionary that can be used to override settings - in the similarity process + ignore_spec_version: A boolean indicating whether to test object types + that belong to different spec versions (STIX 2.0 and STIX 2.1 for example). + If set to True this check will be skipped. + versioning_checks: A boolean indicating whether to test multiple revisions + of the same object (when present) to maximize similarity against a + particular version. If set to True the algorithm will perform this step. + max_depth: A positive integer indicating the maximum recursion depth the + algorithm can reach when de-referencing objects and performing the + object_similarity algorithm. + weight_dict: A dictionary that can be used to override what checks are done + to objects in the similarity process. Returns: float: A number between 0.0 and 100.0 as a measurement of similarity. @@ -295,19 +339,24 @@ class Environment(DataStoreMixin): Note: Default weight_dict: - .. include:: ../graph_default_sem_eq_weights.rst + .. include:: ../similarity_weights.rst Note: This implementation follows the Semantic Equivalence Committee Note. see `the Committee Note `__. """ - return graph_similarity(ds1, ds2, prop_scores, ignore_spec_version, - versioning_checks, max_depth, **weight_dict) + return graph_similarity( + ds1, ds2, prop_scores, ignore_spec_version, + versioning_checks, max_depth, **weight_dict + ) @staticmethod - def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, ignore_spec_version=False, - versioning_checks=False, max_depth=1, **weight_dict): + def graph_equivalence( + ds1, ds2, prop_scores={}, threshold=70, + ignore_spec_version=False, versioning_checks=False, + max_depth=1, **weight_dict + ): """This method returns a true/false value if two graphs are semantically equivalent. Internally, it calls the graph_similarity function and compares it against the given threshold value. @@ -320,8 +369,17 @@ class Environment(DataStoreMixin): threshold: A numerical value between 0 and 100 to determine the minimum score to result in successfully calling both graphs equivalent. This value can be tuned. - weight_dict: A dictionary that can be used to override settings - in the similarity process + ignore_spec_version: A boolean indicating whether to test object types + that belong to different spec versions (STIX 2.0 and STIX 2.1 for example). + If set to True this check will be skipped. + versioning_checks: A boolean indicating whether to test multiple revisions + of the same object (when present) to maximize similarity against a + particular version. If set to True the algorithm will perform this step. + max_depth: A positive integer indicating the maximum recursion depth the + algorithm can reach when de-referencing objects and performing the + object_similarity algorithm. + weight_dict: A dictionary that can be used to override what checks are done + to objects in the similarity process. Returns: bool: True if the result of the graph similarity is greater than or equal to @@ -337,11 +395,14 @@ class Environment(DataStoreMixin): Note: Default weight_dict: - .. include:: ../graph_default_sem_eq_weights.rst + .. include:: ../similarity_weights.rst Note: This implementation follows the Semantic Equivalence Committee Note. see `the Committee Note `__. """ - return graph_equivalence(ds1, ds2, prop_scores, threshold, **weight_dict) + return graph_equivalence( + ds1, ds2, prop_scores, threshold, ignore_spec_version, + versioning_checks, max_depth, **weight_dict + ) diff --git a/stix2/equivalence/graph/__init__.py b/stix2/equivalence/graph/__init__.py index 1dcccf1..1d43219 100644 --- a/stix2/equivalence/graph/__init__.py +++ b/stix2/equivalence/graph/__init__.py @@ -10,7 +10,11 @@ from ..object import ( logger = logging.getLogger(__name__) -def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict): +def graph_equivalence( + ds1, ds2, prop_scores={}, threshold=70, + ignore_spec_version=False, versioning_checks=False, + max_depth=1, **weight_dict +): """This method returns a true/false value if two graphs are semantically equivalent. Internally, it calls the graph_similarity function and compares it against the given threshold value. @@ -23,8 +27,17 @@ def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict): threshold: A numerical value between 0 and 100 to determine the minimum score to result in successfully calling both graphs equivalent. This value can be tuned. - weight_dict: A dictionary that can be used to override settings - in the similarity process + ignore_spec_version: A boolean indicating whether to test object types + that belong to different spec versions (STIX 2.0 and STIX 2.1 for example). + If set to True this check will be skipped. + versioning_checks: A boolean indicating whether to test multiple revisions + of the same object (when present) to maximize similarity against a + particular version. If set to True the algorithm will perform this step. + max_depth: A positive integer indicating the maximum recursion depth the + algorithm can reach when de-referencing objects and performing the + object_similarity algorithm. + weight_dict: A dictionary that can be used to override what checks are done + to objects in the similarity process. Returns: bool: True if the result of the graph similarity is greater than or equal to @@ -40,21 +53,26 @@ def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict): Note: Default weight_dict: - .. include:: ../../graph_default_sem_eq_weights.rst + .. include:: ../../similarity_weights.rst Note: This implementation follows the Semantic Equivalence Committee Note. see `the Committee Note `__. """ - similarity_result = graph_similarity(ds1, ds2, prop_scores, **weight_dict) + similarity_result = graph_similarity( + ds1, ds2, prop_scores, ignore_spec_version, + versioning_checks, max_depth, **weight_dict + ) if similarity_result >= threshold: return True return False -def graph_similarity(ds1, ds2, prop_scores={}, ignore_spec_version=False, - versioning_checks=False, max_depth=1, **weight_dict): +def graph_similarity( + ds1, ds2, prop_scores={}, ignore_spec_version=False, + versioning_checks=False, max_depth=1, **weight_dict +): """This method returns a similarity score for two given graphs. Each DataStore can contain a connected or disconnected graph and the final result is weighted over the amount of objects we managed to compare. @@ -66,11 +84,17 @@ def graph_similarity(ds1, ds2, prop_scores={}, ignore_spec_version=False, ds2: A DataStore object instance representing your graph prop_scores: A dictionary that can hold individual property scores, weights, contributing score, matching score and sum of weights. - ignore_spec_version: As - versioning_checks: As - max_depth: As - weight_dict: A dictionary that can be used to override settings - in the similarity process + ignore_spec_version: A boolean indicating whether to test object types + that belong to different spec versions (STIX 2.0 and STIX 2.1 for example). + If set to True this check will be skipped. + versioning_checks: A boolean indicating whether to test multiple revisions + of the same object (when present) to maximize similarity against a + particular version. If set to True the algorithm will perform this step. + max_depth: A positive integer indicating the maximum recursion depth the + algorithm can reach when de-referencing objects and performing the + object_similarity algorithm. + weight_dict: A dictionary that can be used to override what checks are done + to objects in the similarity process. Returns: float: A number between 0.0 and 100.0 as a measurement of similarity. @@ -85,7 +109,7 @@ def graph_similarity(ds1, ds2, prop_scores={}, ignore_spec_version=False, Note: Default weight_dict: - .. include:: ../../graph_default_sem_eq_weights.rst + .. include:: ../../similarity_weights.rst Note: This implementation follows the Semantic Equivalence Committee Note. @@ -107,7 +131,7 @@ def graph_similarity(ds1, ds2, prop_scores={}, ignore_spec_version=False, "max_depth": max_depth, } - if weights["_internal"]["max_depth"] <= 0: + if max_depth <= 0: raise ValueError("'max_depth' must be greater than 0") pairs = _object_pairs( @@ -122,9 +146,11 @@ def graph_similarity(ds1, ds2, prop_scores={}, ignore_spec_version=False, object1_id = object1["id"] object2_id = object2["id"] - result = object_similarity(object1, object2, iprop_score, ds1, ds2, - ignore_spec_version, versioning_checks, - max_depth, **weights) + result = object_similarity( + object1, object2, iprop_score, ds1, ds2, + ignore_spec_version, versioning_checks, + max_depth, **weights + ) if object1_id not in results: results[object1_id] = {"lhs": object1_id, "rhs": object2_id, "prop_score": iprop_score, "value": result} diff --git a/stix2/equivalence/object/__init__.py b/stix2/equivalence/object/__init__.py index 8bae111..71a263c 100644 --- a/stix2/equivalence/object/__init__.py +++ b/stix2/equivalence/object/__init__.py @@ -4,14 +4,18 @@ import itertools import logging import time -from ...datastore import Filter, DataStoreMixin, DataSink, DataSource +from ...datastore import DataSink, DataSource, DataStoreMixin, Filter from ...utils import STIXdatetime, parse_into_datetime from ..pattern import equivalent_patterns logger = logging.getLogger(__name__) -def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict): +def object_equivalence( + obj1, obj2, prop_scores={}, threshold=70, ds1=None, + ds2=None, ignore_spec_version=False, + versioning_checks=False, max_depth=1, **weight_dict +): """This method returns a true/false value if two objects are semantically equivalent. Internally, it calls the object_similarity function and compares it against the given threshold value. @@ -24,8 +28,19 @@ def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict): threshold: A numerical value between 0 and 100 to determine the minimum score to result in successfully calling both objects equivalent. This value can be tuned. - weight_dict: A dictionary that can be used to override settings - in the similarity process + ds1: A DataStore object instance representing your graph + ds2: A DataStore object instance representing your graph + ignore_spec_version: A boolean indicating whether to test object types + that belong to different spec versions (STIX 2.0 and STIX 2.1 for example). + If set to True this check will be skipped. + versioning_checks: A boolean indicating whether to test multiple revisions + of the same object (when present) to maximize similarity against a + particular version. If set to True the algorithm will perform this step. + max_depth: A positive integer indicating the maximum recursion depth the + algorithm can reach when de-referencing objects and performing the + object_similarity algorithm. + weight_dict: A dictionary that can be used to override what checks are done + to objects in the similarity process. Returns: bool: True if the result of the object similarity is greater than or equal to @@ -41,22 +56,27 @@ def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict): Note: Default weight_dict: - .. include:: ../../object_default_sem_eq_weights.rst + .. include:: ../../similarity_weights.rst Note: This implementation follows the Semantic Equivalence Committee Note. see `the Committee Note `__. """ - similarity_result = object_similarity(obj1, obj2, prop_scores, **weight_dict) + similarity_result = object_similarity( + obj1, obj2, prop_scores, ds1, ds2, ignore_spec_version, + versioning_checks, max_depth, **weight_dict + ) if similarity_result >= threshold: return True return False -def object_similarity(obj1, obj2, prop_scores={}, ds1=None, ds2=None, - ignore_spec_version=False, versioning_checks=False, - max_depth=1, **weight_dict): +def object_similarity( + obj1, obj2, prop_scores={}, ds1=None, ds2=None, + ignore_spec_version=False, versioning_checks=False, + max_depth=1, **weight_dict +): """This method returns a measure of similarity depending on how similar the two objects are. @@ -65,13 +85,19 @@ def object_similarity(obj1, obj2, prop_scores={}, ds1=None, ds2=None, obj2: A stix2 object instance prop_scores: A dictionary that can hold individual property scores, weights, contributing score, matching score and sum of weights. - ds1: As - ds2: As - ignore_spec_version: As - versioning_checks: As - max_depth: As - weight_dict: A dictionary that can be used to override settings - in the similarity process + ds1: A DataStore object instance representing your graph + ds2: A DataStore object instance representing your graph + ignore_spec_version: A boolean indicating whether to test object types + that belong to different spec versions (STIX 2.0 and STIX 2.1 for example). + If set to True this check will be skipped. + versioning_checks: A boolean indicating whether to test multiple revisions + of the same object (when present) to maximize similarity against a + particular version. If set to True the algorithm will perform this step. + max_depth: A positive integer indicating the maximum recursion depth the + algorithm can reach when de-referencing objects and performing the + object_similarity algorithm. + weight_dict: A dictionary that can be used to override what checks are done + to objects in the similarity process. Returns: float: A number between 0.0 and 100.0 as a measurement of similarity. @@ -86,7 +112,7 @@ def object_similarity(obj1, obj2, prop_scores={}, ds1=None, ds2=None, Note: Default weight_dict: - .. include:: ../../object_default_sem_eq_weights.rst + .. include:: ../../similarity_weights.rst Note: This implementation follows the Semantic Equivalence Committee Note. @@ -107,7 +133,6 @@ def object_similarity(obj1, obj2, prop_scores={}, ds1=None, ds2=None, } type1, type2 = obj1["type"], obj2["type"] - ignore_spec_version = weights["_internal"]["ignore_spec_version"] if type1 != type2: raise ValueError('The objects to compare must be of the same type!') @@ -140,9 +165,8 @@ def object_similarity(obj1, obj2, prop_scores={}, ds1=None, ds2=None, threshold = weights[type1]["threshold"] contributing_score = w * comp_funct(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], threshold) elif comp_funct == reference_check or comp_funct == list_reference_check: - max_depth_i = weights["_internal"]["max_depth"] - if max_depth_i > 0: - weights["_internal"]["max_depth"] = max_depth_i - 1 + if max_depth > 0: + weights["_internal"]["max_depth"] = max_depth - 1 ds1, ds2 = weights["_internal"]["ds1"], weights["_internal"]["ds2"] if _datastore_check(ds1, ds2): contributing_score = w * comp_funct(obj1[prop], obj2[prop], ds1, ds2, **weights) @@ -155,7 +179,7 @@ def object_similarity(obj1, obj2, prop_scores={}, ds1=None, ds2=None, prop_scores[prop]["method"] = comp_funct.__name__ else: continue # prevent excessive recursion - weights["_internal"]["max_depth"] = max_depth_i + weights["_internal"]["max_depth"] = max_depth else: contributing_score = w * comp_funct(obj1[prop], obj2[prop]) @@ -187,7 +211,7 @@ def object_similarity(obj1, obj2, prop_scores={}, ds1=None, ds2=None, def check_property_present(prop, obj1, obj2): """Helper method checks if a property is present on both objects.""" if prop == "longitude_latitude": - if all(x in obj1 and x in obj2 for x in ['latitude', 'longitude']): + if all(x in obj1 and x in obj2 for x in ('latitude', 'longitude')): return True elif prop in obj1 and prop in obj2: return True @@ -286,12 +310,12 @@ def custom_pattern_based(pattern1, pattern2): return equivalent_patterns(pattern1, pattern2) -def partial_external_reference_based(refs1, refs2): +def partial_external_reference_based(ext_refs1, ext_refs2): """Performs a matching on External References. Args: - refs1: A list of external references. - refs2: A list of external references. + ext_refs1: A list of external references. + ext_refs2: A list of external references. Returns: float: Number between 0.0 and 1.0 depending on matches. @@ -300,44 +324,47 @@ def partial_external_reference_based(refs1, refs2): allowed = {"veris", "cve", "capec", "mitre-attack"} matches = 0 - for ext_ref1 in refs1: - for ext_ref2 in refs2: - sn_match = False - ei_match = False - url_match = False - source_name = None + ref_pairs = itertools.chain( + itertools.product(ext_refs1, ext_refs2), + ) - if check_property_present("source_name", ext_ref1, ext_ref2): - if ext_ref1["source_name"] == ext_ref2["source_name"]: - source_name = ext_ref1["source_name"] - sn_match = True - if check_property_present("external_id", ext_ref1, ext_ref2): - if ext_ref1["external_id"] == ext_ref2["external_id"]: - ei_match = True - if check_property_present("url", ext_ref1, ext_ref2): - if ext_ref1["url"] == ext_ref2["url"]: - url_match = True + for ext_ref1, ext_ref2 in ref_pairs: + sn_match = False + ei_match = False + url_match = False + source_name = None - # Special case: if source_name is a STIX defined name and either - # external_id or url match then its a perfect match and other entries - # can be ignored. - if sn_match and (ei_match or url_match) and source_name in allowed: - result = 1.0 - logger.debug( - "--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'", - refs1, refs2, result, - ) - return result + if check_property_present("source_name", ext_ref1, ext_ref2): + if ext_ref1["source_name"] == ext_ref2["source_name"]: + source_name = ext_ref1["source_name"] + sn_match = True + if check_property_present("external_id", ext_ref1, ext_ref2): + if ext_ref1["external_id"] == ext_ref2["external_id"]: + ei_match = True + if check_property_present("url", ext_ref1, ext_ref2): + if ext_ref1["url"] == ext_ref2["url"]: + url_match = True - # Regular check. If the source_name (not STIX-defined) or external_id or - # url matches then we consider the entry a match. - if (sn_match or ei_match or url_match) and source_name not in allowed: - matches += 1 + # Special case: if source_name is a STIX defined name and either + # external_id or url match then its a perfect match and other entries + # can be ignored. + if sn_match and (ei_match or url_match) and source_name in allowed: + result = 1.0 + logger.debug( + "--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'", + ext_refs1, ext_refs2, result, + ) + return result - result = matches / max(len(refs1), len(refs2)) + # Regular check. If the source_name (not STIX-defined) or external_id or + # url matches then we consider the entry a match. + if (sn_match or ei_match or url_match) and source_name not in allowed: + matches += 1 + + result = matches / max(len(ext_refs1), len(ext_refs2)) logger.debug( "--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'", - refs1, refs2, result, + ext_refs1, ext_refs2, result, ) return result @@ -381,10 +408,11 @@ def _versioned_checks(ref1, ref2, ds1, ds2, **weights): max_depth = weights["_internal"]["max_depth"] for object1, object2 in pairs: - result = object_similarity(object1, object2, ds1=ds1, ds2=ds2, - ignore_spec_version=ignore_spec_version, - versioning_checks=versioning_checks, - max_depth=max_depth, **weights) + result = object_similarity( + object1, object2, ds1, ds2, + ignore_spec_version, versioning_checks, + max_depth, **weights + ) if ref1 not in results: results[ref1] = {"matched": ref2, "value": result} elif result > results[ref1]["value"]: @@ -413,10 +441,11 @@ def reference_check(ref1, ref2, ds1, ds2, **weights): else: o1, o2 = ds1.get(ref1), ds2.get(ref2) if o1 and o2: - result = object_similarity(o1, o2, ds1=ds1, ds2=ds2, - ignore_spec_version=ignore_spec_version, - versioning_checks=versioning_checks, - max_depth=max_depth, **weights) / 100.0 + result = object_similarity( + o1, o2, ds1, ds2, + ignore_spec_version, versioning_checks, + max_depth, **weights + ) / 100.0 logger.debug( "--\t\treference_check '%s' '%s'\tresult: '%s'", @@ -468,8 +497,10 @@ def list_reference_check(refs1, refs2, ds1, ds2, **weights): def _datastore_check(ds1, ds2): - if (issubclass(ds1.__class__, (DataStoreMixin, DataSink, DataSource)) or - issubclass(ds2.__class__, (DataStoreMixin, DataSink, DataSource))): + if ( + issubclass(ds1.__class__, (DataStoreMixin, DataSink, DataSource)) or + issubclass(ds2.__class__, (DataStoreMixin, DataSink, DataSource)) + ): return True return False @@ -586,5 +617,5 @@ WEIGHTS = { "vulnerability": { "name": (30, partial_string_based), "external_references": (70, partial_external_reference_based), - } + }, } # :autodoc-skip: diff --git a/stix2/test/v20/test_environment.py b/stix2/test/v20/test_environment.py index 33e0985..c8867b0 100644 --- a/stix2/test/v20/test_environment.py +++ b/stix2/test/v20/test_environment.py @@ -424,7 +424,7 @@ def test_related_to_by_target(ds): def test_versioned_checks(ds, ds2): - weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() + weights = stix2.equivalence.graph.WEIGHTS.copy() weights.update({ "_internal": { "ignore_spec_version": True, @@ -437,7 +437,7 @@ def test_versioned_checks(ds, ds2): def test_semantic_check_with_versioning(ds, ds2): - weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() + weights = stix2.equivalence.graph.WEIGHTS.copy() weights.update({ "_internal": { "ignore_spec_version": False, @@ -467,13 +467,11 @@ def test_semantic_check_with_versioning(ds, ds2): def test_list_semantic_check(ds, ds2): - weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() + weights = stix2.equivalence.graph.WEIGHTS.copy() weights.update({ "_internal": { "ignore_spec_version": False, "versioning_checks": False, - "ds1": ds, - "ds2": ds2, "max_depth": 1, }, }) @@ -504,39 +502,18 @@ def test_list_semantic_check(ds, ds2): def test_graph_similarity_raises_value_error(ds): - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": False, - "max_depth": -1, - }, - } with pytest.raises(ValueError): prop_scores1 = {} - stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights) + stix2.Environment().graph_similarity(ds, ds2, prop_scores1, max_depth=-1) def test_graph_similarity_with_filesystem_source(ds, fs): - weights = { - "_internal": { - "ignore_spec_version": True, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores1 = {} - env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, **weights) + env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, ignore_spec_version=True) # Switching parameters - weights = { - "_internal": { - "ignore_spec_version": True, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores2 = {} - env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, **weights) + env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, ignore_spec_version=True) assert round(env1) == 25 assert round(prop_scores1["matching_score"]) == 451 @@ -552,41 +529,20 @@ def test_graph_similarity_with_filesystem_source(ds, fs): def test_graph_similarity_with_duplicate_graph(ds): - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores = {} - env = stix2.Environment().graph_similarity(ds, ds, prop_scores, **weights) + env = stix2.Environment().graph_similarity(ds, ds, prop_scores) assert round(env) == 100 assert round(prop_scores["matching_score"]) == 800 assert round(prop_scores["len_pairs"]) == 8 def test_graph_similarity_with_versioning_check_on(ds2, ds): - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": True, - "max_depth": 1, - }, - } prop_scores1 = {} - env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights) + env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, versioning_checks=True) # Switching parameters - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": True, - "max_depth": 1, - }, - } prop_scores2 = {} - env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights) + env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, versioning_checks=True) assert round(env1) == 88 assert round(prop_scores1["matching_score"]) == 789 @@ -602,26 +558,12 @@ def test_graph_similarity_with_versioning_check_on(ds2, ds): def test_graph_similarity_with_versioning_check_off(ds2, ds): - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores1 = {} - env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights) + env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1) # Switching parameters - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores2 = {} - env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights) + env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2) assert round(env1) == 88 assert round(prop_scores1["matching_score"]) == 789 @@ -637,26 +579,12 @@ def test_graph_similarity_with_versioning_check_off(ds2, ds): def test_graph_equivalence_with_filesystem_source(ds, fs): - weights = { - "_internal": { - "ignore_spec_version": True, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores1 = {} - env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, **weights) + env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, ignore_spec_version=True) # Switching parameters - weights = { - "_internal": { - "ignore_spec_version": True, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores2 = {} - env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, **weights) + env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, ignore_spec_version=True) assert env1 is False assert round(prop_scores1["matching_score"]) == 451 @@ -672,41 +600,20 @@ def test_graph_equivalence_with_filesystem_source(ds, fs): def test_graph_equivalence_with_duplicate_graph(ds): - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores = {} - env = stix2.Environment().graph_equivalence(ds, ds, prop_scores, **weights) + env = stix2.Environment().graph_equivalence(ds, ds, prop_scores) assert env is True assert round(prop_scores["matching_score"]) == 800 assert round(prop_scores["len_pairs"]) == 8 def test_graph_equivalence_with_versioning_check_on(ds2, ds): - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": True, - "max_depth": 1, - }, - } prop_scores1 = {} - env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights) + env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, versioning_checks=True) # Switching parameters - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": True, - "max_depth": 1, - }, - } prop_scores2 = {} - env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights) + env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, versioning_checks=True) assert env1 is True assert round(prop_scores1["matching_score"]) == 789 @@ -722,26 +629,12 @@ def test_graph_equivalence_with_versioning_check_on(ds2, ds): def test_graph_equivalence_with_versioning_check_off(ds2, ds): - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores1 = {} - env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights) + env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1) # Switching parameters - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores2 = {} - env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights) + env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2) assert env1 is True assert round(prop_scores1["matching_score"]) == 789 diff --git a/stix2/test/v21/test_environment.py b/stix2/test/v21/test_environment.py index e7bf4da..6a14bf3 100644 --- a/stix2/test/v21/test_environment.py +++ b/stix2/test/v21/test_environment.py @@ -760,16 +760,13 @@ def test_object_similarity_different_spec_version(): "valid_from": (5, stix2.equivalence.object.partial_timestamp_based), "tdelta": 1, # One day interval }, - "_internal": { - "ignore_spec_version": True, # Disables spec_version check. - }, } ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **IND_KWARGS) - env = stix2.Environment().object_similarity(ind1, ind2, **weights) + env = stix2.Environment().object_similarity(ind1, ind2, ignore_spec_version=True, **weights) assert round(env) == 0 - env = stix2.Environment().object_similarity(ind2, ind1, **weights) + env = stix2.Environment().object_similarity(ind2, ind1, ignore_spec_version=True, **weights) assert round(env) == 0 @@ -861,7 +858,9 @@ def test_object_similarity_exact_match(): def test_non_existent_config_for_object(): r1 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS) r2 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS) - assert stix2.Environment().object_similarity(r1, r2) == 0.0 + prop_scores = {} + assert stix2.Environment().object_similarity(r1, r2, prop_scores) == 100.0 + assert prop_scores["object_refs"]["method"] == "partial_list_based" def custom_semantic_equivalence_method(obj1, obj2, **weights): @@ -937,7 +936,8 @@ def test_object_similarity_prop_scores_method_provided(): def test_versioned_checks(ds, ds2): - weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() + # Testing internal method + weights = stix2.equivalence.graph.WEIGHTS.copy() weights.update({ "_internal": { "ignore_spec_version": True, @@ -950,7 +950,7 @@ def test_versioned_checks(ds, ds2): def test_semantic_check_with_versioning(ds, ds2): - weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() + weights = stix2.equivalence.graph.WEIGHTS.copy() weights.update({ "_internal": { "ignore_spec_version": False, @@ -981,7 +981,7 @@ def test_semantic_check_with_versioning(ds, ds2): def test_list_semantic_check(ds, ds2): - weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() + weights = stix2.equivalence.graph.WEIGHTS.copy() weights.update({ "_internal": { "ignore_spec_version": False, @@ -1027,39 +1027,28 @@ def test_list_semantic_check(ds, ds2): def test_graph_similarity_raises_value_error(ds): - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": False, - "max_depth": -1, - }, - } with pytest.raises(ValueError): prop_scores1 = {} - stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights) + stix2.Environment().graph_similarity(ds, ds2, prop_scores1, max_depth=-1) def test_graph_similarity_with_filesystem_source(ds, fs): - weights = { - "_internal": { - "ignore_spec_version": True, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores1 = {} - env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, **weights) + env1 = stix2.Environment().graph_similarity( + fs, ds, prop_scores1, + ignore_spec_version=True, + versioning_checks=False, + max_depth=1, + ) # Switching parameters - weights = { - "_internal": { - "ignore_spec_version": True, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores2 = {} - env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, **weights) + env2 = stix2.Environment().graph_similarity( + ds, fs, prop_scores2, + ignore_spec_version=True, + versioning_checks=False, + max_depth=1, + ) assert round(env1) == 23 assert round(prop_scores1["matching_score"]) == 411 @@ -1154,14 +1143,11 @@ def test_depth_limiting(): "some2_ref": (33, stix2.equivalence.object.reference_check), "name": (34, stix2.equivalence.object.partial_string_based), }, - "_internal": { - "ignore_spec_version": False, - "versioning_checks": False, - "max_depth": 1, - }, } prop_scores1 = {} - env1 = stix2.equivalence.graph.graph_similarity(mem_store1, mem_store2, prop_scores1, **custom_weights) + env1 = stix2.equivalence.graph.graph_similarity( + mem_store1, mem_store2, prop_scores1, **custom_weights + ) assert round(env1) == 38 assert round(prop_scores1["matching_score"]) == 300 @@ -1185,44 +1171,23 @@ def test_depth_limiting(): def test_graph_similarity_with_duplicate_graph(ds): - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores = {} - env = stix2.Environment().graph_similarity(ds, ds, prop_scores, **weights) + env = stix2.Environment().graph_similarity(ds, ds, prop_scores) assert round(env) == 100 assert round(prop_scores["matching_score"]) == 800 assert round(prop_scores["len_pairs"]) == 8 def test_graph_similarity_with_versioning_check_on(ds2, ds): - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": True, - "max_depth": 1, - }, - } prop_scores1 = {} - env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights) + env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, versioning_checks=True) assert round(env1) == 88 assert round(prop_scores1["matching_score"]) == 789 assert round(prop_scores1["len_pairs"]) == 9 # Switching parameters - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores2 = {} - env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights) + env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, versioning_checks=True) assert round(env2) == 88 assert round(prop_scores2["matching_score"]) == 789 assert round(prop_scores2["len_pairs"]) == 9 @@ -1233,29 +1198,15 @@ def test_graph_similarity_with_versioning_check_on(ds2, ds): def test_graph_similarity_with_versioning_check_off(ds2, ds): - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores1 = {} - env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights) + env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1) assert round(env1) == 88 assert round(prop_scores1["matching_score"]) == 789 assert round(prop_scores1["len_pairs"]) == 9 # Switching parameters - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores2 = {} - env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights) + env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2) assert round(env2) == 88 assert round(prop_scores2["matching_score"]) == 789 assert round(prop_scores2["len_pairs"]) == 9 @@ -1266,26 +1217,12 @@ def test_graph_similarity_with_versioning_check_off(ds2, ds): def test_graph_equivalence_with_filesystem_source(ds, fs): - weights = { - "_internal": { - "ignore_spec_version": True, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores1 = {} - env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, **weights) + env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, ignore_spec_version=True) # Switching parameters - weights = { - "_internal": { - "ignore_spec_version": True, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores2 = {} - env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, **weights) + env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, ignore_spec_version=True) assert env1 is False assert round(prop_scores1["matching_score"]) == 411 @@ -1301,41 +1238,20 @@ def test_graph_equivalence_with_filesystem_source(ds, fs): def test_graph_equivalence_with_duplicate_graph(ds): - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores = {} - env = stix2.Environment().graph_equivalence(ds, ds, prop_scores, **weights) + env = stix2.Environment().graph_equivalence(ds, ds, prop_scores) assert env is True assert round(prop_scores["matching_score"]) == 800 assert round(prop_scores["len_pairs"]) == 8 def test_graph_equivalence_with_versioning_check_on(ds2, ds): - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": True, - "max_depth": 1, - }, - } prop_scores1 = {} - env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights) + env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, versioning_checks=True) # Switching parameters - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": True, - "max_depth": 1, - }, - } prop_scores2 = {} - env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights) + env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, versioning_checks=True) assert env1 is True assert round(prop_scores1["matching_score"]) == 789 @@ -1351,26 +1267,12 @@ def test_graph_equivalence_with_versioning_check_on(ds2, ds): def test_graph_equivalence_with_versioning_check_off(ds2, ds): - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores1 = {} - env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights) + env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1) # Switching parameters - weights = { - "_internal": { - "ignore_spec_version": False, - "versioning_checks": False, - "max_depth": 1, - }, - } prop_scores2 = {} - env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights) + env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2) assert env1 is True assert round(prop_scores1["matching_score"]) == 789