Merge pull request #491 from emmanvg/graph-eq-changes

Graph Equivalence Changes
pull/1/head
Chris Lenk 2021-02-18 23:44:39 -05:00 committed by GitHub
commit 64608e7bea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 992 additions and 237 deletions

View File

@ -2,18 +2,10 @@
import copy
from .datastore import CompositeDataSource, DataStoreMixin
from .equivalence.graph import graphically_equivalent
from .equivalence.object import ( # noqa: F401
WEIGHTS, check_property_present, custom_pattern_based, exact_match,
list_reference_check, partial_external_reference_based, partial_list_based,
partial_location_distance, partial_string_based, partial_timestamp_based,
reference_check, semantically_equivalent,
)
from .equivalence.graph import graph_equivalence, graph_similarity
from .equivalence.object import object_equivalence, object_similarity
from .parsing import parse as _parse
# TODO: Remove all unused imports that now belong to the equivalence module in the next major release.
# Kept for backwards compatibility.
class ObjectFactory(object):
"""Easily create STIX objects with default values for certain properties.
@ -197,9 +189,8 @@ class Environment(DataStoreMixin):
return None
@staticmethod
def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
"""This method verifies if two objects of the same type are
semantically equivalent.
def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
"""This method returns a measure of how similar the two objects are.
Args:
obj1: A stix2 object instance
@ -207,13 +198,13 @@ class Environment(DataStoreMixin):
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the semantic equivalence process
in the similarity process
Returns:
float: A number between 0.0 and 100.0 as a measurement of equivalence.
float: A number between 0.0 and 100.0 as a measurement of similarity.
Warning:
Object types need to have property weights defined for the equivalence process.
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.object` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
@ -229,14 +220,54 @@ class Environment(DataStoreMixin):
see `the Committee Note <link here>`__.
"""
return semantically_equivalent(obj1, obj2, prop_scores, **weight_dict)
return object_similarity(obj1, obj2, prop_scores, **weight_dict)
@staticmethod
def graphically_equivalent(ds1, ds2, prop_scores={}, **weight_dict):
"""This method verifies if two graphs are semantically equivalent.
def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict):
"""This method returns a true/false value if two objects are semantically equivalent.
Internally, it calls the object_similarity function and compares it against the given
threshold value.
Args:
obj1: A stix2 object instance
obj2: A stix2 object instance
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both objects equivalent. This
value can be tuned.
weight_dict: A dictionary that can be used to override settings
in the similarity process
Returns:
bool: True if the result of the object similarity is greater than or equal to
the threshold value. False otherwise.
Warning:
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.object` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weight_dict:
.. include:: ../object_default_sem_eq_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
return object_equivalence(obj1, obj2, prop_scores, threshold, **weight_dict)
@staticmethod
def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
"""This method returns a similarity score for two given graphs.
Each DataStore can contain a connected or disconnected graph and the
final result is weighted over the amount of objects we managed to compare.
This approach builds on top of the object-based semantic equivalence process
This approach builds on top of the object-based similarity process
and each comparison can return a value between 0 and 100.
Args:
@ -245,13 +276,13 @@ class Environment(DataStoreMixin):
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the semantic equivalence process
in the similarity process
Returns:
float: A number between 0.0 and 100.0 as a measurement of equivalence.
float: A number between 0.0 and 100.0 as a measurement of similarity.
Warning:
Object types need to have property weights defined for the equivalence process.
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.graph` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
@ -267,4 +298,44 @@ class Environment(DataStoreMixin):
see `the Committee Note <link here>`__.
"""
return graphically_equivalent(ds1, ds2, prop_scores, **weight_dict)
return graph_similarity(ds1, ds2, prop_scores, **weight_dict)
@staticmethod
def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict):
"""This method returns a true/false value if two graphs are semantically equivalent.
Internally, it calls the graph_similarity function and compares it against the given
threshold value.
Args:
ds1: A DataStore object instance representing your graph
ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both graphs equivalent. This
value can be tuned.
weight_dict: A dictionary that can be used to override settings
in the similarity process
Returns:
bool: True if the result of the graph similarity is greater than or equal to
the threshold value. False otherwise.
Warning:
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.graph` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weight_dict:
.. include:: ../graph_default_sem_eq_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
return graph_equivalence(ds1, ds2, prop_scores, threshold, **weight_dict)

View File

@ -1,4 +1,4 @@
"""Python APIs for STIX 2 Semantic Equivalence.
"""Python APIs for STIX 2 Semantic Equivalence and Similarity.
.. autosummary::
:toctree: equivalence

View File

@ -1,41 +1,44 @@
"""Python APIs for STIX 2 Graph-based Semantic Equivalence."""
"""Python APIs for STIX 2 Graph-based Semantic Equivalence and Similarity."""
import logging
from ..object import (
WEIGHTS, exact_match, list_reference_check, partial_string_based,
partial_timestamp_based, reference_check, semantically_equivalent,
WEIGHTS, _bucket_per_type, _object_pairs, exact_match,
list_reference_check, object_similarity, partial_string_based,
partial_timestamp_based, reference_check,
)
logger = logging.getLogger(__name__)
def graphically_equivalent(ds1, ds2, prop_scores={}, **weight_dict):
"""This method verifies if two graphs are semantically equivalent.
Each DataStore can contain a connected or disconnected graph and the
final result is weighted over the amount of objects we managed to compare.
This approach builds on top of the object-based semantic equivalence process
and each comparison can return a value between 0 and 100.
def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict):
"""This method returns a true/false value if two graphs are semantically equivalent.
Internally, it calls the graph_similarity function and compares it against the given
threshold value.
Args:
ds1: A DataStore object instance representing your graph
ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both graphs equivalent. This
value can be tuned.
weight_dict: A dictionary that can be used to override settings
in the semantic equivalence process
in the similarity process
Returns:
float: A number between 0.0 and 100.0 as a measurement of equivalence.
bool: True if the result of the graph similarity is greater than or equal to
the threshold value. False otherwise.
Warning:
Object types need to have property weights defined for the equivalence process.
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.graph` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weights_dict:
Default weight_dict:
.. include:: ../../graph_default_sem_eq_weights.rst
@ -44,63 +47,103 @@ def graphically_equivalent(ds1, ds2, prop_scores={}, **weight_dict):
see `the Committee Note <link here>`__.
"""
similarity_result = graph_similarity(ds1, ds2, prop_scores, **weight_dict)
if similarity_result >= threshold:
return True
return False
def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
"""This method returns a similarity score for two given graphs.
Each DataStore can contain a connected or disconnected graph and the
final result is weighted over the amount of objects we managed to compare.
This approach builds on top of the object-based similarity process
and each comparison can return a value between 0 and 100.
Args:
ds1: A DataStore object instance representing your graph
ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the similarity process
Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity.
Warning:
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.graph` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weight_dict:
.. include:: ../../graph_default_sem_eq_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
results = {}
similarity_score = 0
weights = GRAPH_WEIGHTS.copy()
if weight_dict:
weights.update(weight_dict)
results = {}
depth = weights["_internal"]["max_depth"]
if weights["_internal"]["max_depth"] <= 0:
raise ValueError("weight_dict['_internal']['max_depth'] must be greater than 0")
graph1 = ds1.query([])
graph2 = ds2.query([])
pairs = _object_pairs(
_bucket_per_type(ds1.query([])),
_bucket_per_type(ds2.query([])),
weights,
)
graph1.sort(key=lambda x: x["type"])
graph2.sort(key=lambda x: x["type"])
weights["_internal"]["ds1"] = ds1
weights["_internal"]["ds2"] = ds2
if len(graph1) < len(graph2):
weights["_internal"]["ds1"] = ds1
weights["_internal"]["ds2"] = ds2
g1 = graph1
g2 = graph2
else:
weights["_internal"]["ds1"] = ds2
weights["_internal"]["ds2"] = ds1
g1 = graph2
g2 = graph1
logger.debug("Starting graph similarity process between DataStores: '%s' and '%s'", ds1.id, ds2.id)
for object1, object2 in pairs:
iprop_score = {}
object1_id = object1["id"]
object2_id = object2["id"]
for object1 in g1:
for object2 in g2:
if object1["type"] == object2["type"] and object1["type"] in weights:
iprop_score = {}
result = semantically_equivalent(object1, object2, iprop_score, **weights)
objects1_id = object1["id"]
weights["_internal"]["max_depth"] = depth
result = object_similarity(object1, object2, iprop_score, **weights)
if objects1_id not in results:
results[objects1_id] = {"matched": object2["id"], "prop_score": iprop_score, "value": result}
elif result > results[objects1_id]["value"]:
results[objects1_id] = {"matched": object2["id"], "prop_score": iprop_score, "value": result}
if object1_id not in results:
results[object1_id] = {"lhs": object1_id, "rhs": object2_id, "prop_score": iprop_score, "value": result}
elif result > results[object1_id]["value"]:
results[object1_id] = {"lhs": object1_id, "rhs": object2_id, "prop_score": iprop_score, "value": result}
if object2_id not in results:
results[object2_id] = {"lhs": object2_id, "rhs": object1_id, "prop_score": iprop_score, "value": result}
elif result > results[object2_id]["value"]:
results[object2_id] = {"lhs": object2_id, "rhs": object1_id, "prop_score": iprop_score, "value": result}
equivalence_score = 0
matching_score = sum(x["value"] for x in results.values())
sum_weights = len(results) * 100.0
if sum_weights > 0:
equivalence_score = (matching_score / sum_weights) * 100
len_pairs = len(results)
if len_pairs > 0:
similarity_score = matching_score / len_pairs
prop_scores["matching_score"] = matching_score
prop_scores["sum_weights"] = sum_weights
prop_scores["len_pairs"] = len_pairs
prop_scores["summary"] = results
logger.debug(
"DONE\t\tSUM_WEIGHT: %.2f\tMATCHING_SCORE: %.2f\t SCORE: %.2f",
sum_weights,
"DONE\t\tLEN_PAIRS: %.2f\tMATCHING_SCORE: %.2f\t SIMILARITY_SCORE: %.2f",
len_pairs,
matching_score,
equivalence_score,
similarity_score,
)
return equivalence_score
return similarity_score
# default weights used for the graph semantic equivalence process
# default weights used for the graph similarity process
GRAPH_WEIGHTS = WEIGHTS.copy()
GRAPH_WEIGHTS.update({
"grouping": {

View File

@ -1,4 +1,6 @@
"""Python APIs for STIX 2 Object-based Semantic Equivalence."""
"""Python APIs for STIX 2 Object-based Semantic Equivalence and Similarity."""
import collections
import itertools
import logging
import time
@ -9,9 +11,52 @@ from ..pattern import equivalent_patterns
logger = logging.getLogger(__name__)
def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
"""This method verifies if two objects of the same type are
semantically equivalent.
def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict):
"""This method returns a true/false value if two objects are semantically equivalent.
Internally, it calls the object_similarity function and compares it against the given
threshold value.
Args:
obj1: A stix2 object instance
obj2: A stix2 object instance
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both objects equivalent. This
value can be tuned.
weight_dict: A dictionary that can be used to override settings
in the similarity process
Returns:
bool: True if the result of the object similarity is greater than or equal to
the threshold value. False otherwise.
Warning:
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.object` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weight_dict:
.. include:: ../../object_default_sem_eq_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
similarity_result = object_similarity(obj1, obj2, prop_scores, **weight_dict)
if similarity_result >= threshold:
return True
return False
def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
"""This method returns a measure of similarity depending on how
similar the two objects are.
Args:
obj1: A stix2 object instance
@ -19,20 +64,20 @@ def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the semantic equivalence process
in the similarity process
Returns:
float: A number between 0.0 and 100.0 as a measurement of equivalence.
float: A number between 0.0 and 100.0 as a measurement of similarity.
Warning:
Object types need to have property weights defined for the equivalence process.
Object types need to have property weights defined for the similarity process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.object` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weights_dict:
Default weight_dict:
.. include:: ../../object_default_sem_eq_weights.rst
@ -58,13 +103,13 @@ def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
try:
weights[type1]
except KeyError:
logger.warning("'%s' type has no 'weights' dict specified & thus no semantic equivalence method to call!", type1)
logger.warning("'%s' type has no 'weights' dict specified & thus no object similarity method to call!", type1)
sum_weights = matching_score = 0
else:
try:
method = weights[type1]["method"]
except KeyError:
logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"])
logger.debug("Starting object similarity process between: '%s' and '%s'", obj1["id"], obj2["id"])
matching_score = 0.0
sum_weights = 0.0
@ -80,12 +125,13 @@ def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
contributing_score = w * comp_funct(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], threshold)
elif comp_funct == reference_check or comp_funct == list_reference_check:
max_depth = weights["_internal"]["max_depth"]
if max_depth < 0:
continue # prevent excessive recursion
if max_depth > 0:
weights["_internal"]["max_depth"] = max_depth - 1
ds1, ds2 = weights["_internal"]["ds1"], weights["_internal"]["ds2"]
contributing_score = w * comp_funct(obj1[prop], obj2[prop], ds1, ds2, **weights)
else:
weights["_internal"]["max_depth"] -= 1
ds1, ds2 = weights["_internal"]["ds1"], weights["_internal"]["ds2"]
contributing_score = w * comp_funct(obj1[prop], obj2[prop], ds1, ds2, **weights)
continue # prevent excessive recursion
weights["_internal"]["max_depth"] = max_depth
else:
contributing_score = w * comp_funct(obj1[prop], obj2[prop])
@ -102,7 +148,7 @@ def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
prop_scores["sum_weights"] = sum_weights
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
else:
logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"])
logger.debug("Starting object similarity process between: '%s' and '%s'", obj1["id"], obj2["id"])
try:
matching_score, sum_weights = method(obj1, obj2, prop_scores, **weights[type1])
except TypeError:
@ -304,19 +350,24 @@ def partial_location_distance(lat1, long1, lat2, long2, threshold):
def _versioned_checks(ref1, ref2, ds1, ds2, **weights):
"""Checks multiple object versions if present in graph.
Maximizes for the semantic equivalence score of a particular version."""
Maximizes for the similarity score of a particular version."""
results = {}
objects1 = ds1.query([Filter("id", "=", ref1)])
objects2 = ds2.query([Filter("id", "=", ref2)])
if len(objects1) > 0 and len(objects2) > 0:
for o1 in objects1:
for o2 in objects2:
result = semantically_equivalent(o1, o2, **weights)
if ref1 not in results:
results[ref1] = {"matched": ref2, "value": result}
elif result > results[ref1]["value"]:
results[ref1] = {"matched": ref2, "value": result}
pairs = _object_pairs(
_bucket_per_type(objects1),
_bucket_per_type(objects2),
weights,
)
for object1, object2 in pairs:
result = object_similarity(object1, object2, **weights)
if ref1 not in results:
results[ref1] = {"matched": ref2, "value": result}
elif result > results[ref1]["value"]:
results[ref1] = {"matched": ref2, "value": result}
result = results.get(ref1, {}).get("value", 0.0)
logger.debug(
"--\t\t_versioned_checks '%s' '%s'\tresult: '%s'",
@ -326,18 +377,18 @@ def _versioned_checks(ref1, ref2, ds1, ds2, **weights):
def reference_check(ref1, ref2, ds1, ds2, **weights):
"""For two references, de-reference the object and perform object-based
semantic equivalence. The score influences the result of an edge check."""
"""For two references, de-reference the object and perform object_similarity.
The score influences the result of an edge check."""
type1, type2 = ref1.split("--")[0], ref2.split("--")[0]
result = 0.0
if type1 == type2:
if type1 == type2 and type1 in weights:
if weights["_internal"]["versioning_checks"]:
result = _versioned_checks(ref1, ref2, ds1, ds2, **weights) / 100.0
else:
o1, o2 = ds1.get(ref1), ds2.get(ref2)
if o1 and o2:
result = semantically_equivalent(o1, o2, **weights) / 100.0
result = object_similarity(o1, o2, **weights) / 100.0
logger.debug(
"--\t\treference_check '%s' '%s'\tresult: '%s'",
@ -348,38 +399,35 @@ def reference_check(ref1, ref2, ds1, ds2, **weights):
def list_reference_check(refs1, refs2, ds1, ds2, **weights):
"""For objects that contain multiple references (i.e., object_refs) perform
the same de-reference procedure and perform object-based semantic equivalence.
the same de-reference procedure and perform object_similarity.
The score influences the objects containing these references. The result is
weighted on the amount of unique objects that could 1) be de-referenced 2) """
results = {}
if len(refs1) >= len(refs2):
l1 = refs1
l2 = refs2
b1 = ds1
b2 = ds2
else:
l1 = refs2
l2 = refs1
b1 = ds2
b2 = ds1
l1.sort()
l2.sort()
pairs = _object_pairs(
_bucket_per_type(refs1, "id-split"),
_bucket_per_type(refs2, "id-split"),
weights,
)
for ref1 in l1:
for ref2 in l2:
type1, type2 = ref1.split("--")[0], ref2.split("--")[0]
if type1 == type2:
score = reference_check(ref1, ref2, b1, b2, **weights) * 100.0
for ref1, ref2 in pairs:
type1, type2 = ref1.split("--")[0], ref2.split("--")[0]
if type1 == type2:
score = reference_check(ref1, ref2, ds1, ds2, **weights)
if ref1 not in results:
results[ref1] = {"matched": ref2, "value": score}
elif score > results[ref1]["value"]:
results[ref1] = {"matched": ref2, "value": score}
if ref1 not in results:
results[ref1] = {"matched": ref2, "value": score}
elif score > results[ref1]["value"]:
results[ref1] = {"matched": ref2, "value": score}
if ref2 not in results:
results[ref2] = {"matched": ref1, "value": score}
elif score > results[ref2]["value"]:
results[ref2] = {"matched": ref1, "value": score}
result = 0.0
total_sum = sum(x["value"] for x in results.values())
max_score = len(results) * 100.0
max_score = len(results)
if max_score > 0:
result = total_sum / max_score
@ -391,7 +439,34 @@ def list_reference_check(refs1, refs2, ds1, ds2, **weights):
return result
# default weights used for the semantic equivalence process
def _bucket_per_type(graph, mode="type"):
"""Given a list of objects or references, bucket them by type.
Depending on the list type: extract from 'type' property or using
the 'id'.
"""
buckets = collections.defaultdict(list)
if mode == "type":
[buckets[obj["type"]].append(obj) for obj in graph]
elif mode == "id-split":
[buckets[obj.split("--")[0]].append(obj) for obj in graph]
return buckets
def _object_pairs(graph1, graph2, weights):
"""Returns a generator with the product of the comparable
objects for the graph similarity process. It determines
objects in common between graphs and objects with weights.
"""
types_in_common = set(graph1.keys()).intersection(graph2.keys())
testable_types = types_in_common.intersection(weights.keys())
return itertools.chain.from_iterable(
itertools.product(graph1[stix_type], graph2[stix_type])
for stix_type in testable_types
)
# default weights used for the similarity process
WEIGHTS = {
"attack-pattern": {
"name": (30, partial_string_based),

View File

@ -1,3 +1,4 @@
import json
import os
import pytest
@ -67,6 +68,11 @@ def ds2():
yield stix2.MemoryStore(stix_objs)
@pytest.fixture
def fs():
yield stix2.FileSystemSource(FS_PATH)
def test_object_factory_created_by_ref_str():
factory = stix2.ObjectFactory(created_by_ref=IDENTITY_ID)
ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
@ -497,7 +503,20 @@ def test_list_semantic_check(ds, ds2):
assert round(score) == 1
def test_graph_equivalence_with_filesystem_source(ds):
def test_graph_similarity_raises_value_error(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": -1,
},
}
with pytest.raises(ValueError):
prop_scores1 = {}
stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
def test_graph_similarity_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
@ -505,12 +524,151 @@ def test_graph_equivalence_with_filesystem_source(ds):
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, **weights)
assert round(env1) == 25
assert round(prop_scores1["matching_score"]) == 451
assert round(prop_scores1["len_pairs"]) == 18
assert round(env2) == 25
assert round(prop_scores2["matching_score"]) == 451
assert round(prop_scores2["len_pairs"]) == 18
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_similarity_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {}
fs = stix2.FileSystemSource(FS_PATH)
env = stix2.Environment().graphically_equivalent(fs, ds, prop_scores, **weights)
assert round(env) == 28
assert round(prop_scores["matching_score"]) == 139
assert round(prop_scores["sum_weights"]) == 500
env = stix2.Environment().graph_similarity(ds, ds, prop_scores, **weights)
assert round(env) == 100
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_similarity_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_similarity_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_equivalence_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, **weights)
assert env1 is False
assert round(prop_scores1["matching_score"]) == 451
assert round(prop_scores1["len_pairs"]) == 18
assert env2 is False
assert round(prop_scores2["matching_score"]) == 451
assert round(prop_scores2["len_pairs"]) == 18
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_equivalence_with_duplicate_graph(ds):
@ -522,10 +680,10 @@ def test_graph_equivalence_with_duplicate_graph(ds):
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds, prop_scores, **weights)
assert round(env) == 100
env = stix2.Environment().graph_equivalence(ds, ds, prop_scores, **weights)
assert env is True
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["sum_weights"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_equivalence_with_versioning_check_on(ds2, ds):
@ -536,11 +694,31 @@ def test_graph_equivalence_with_versioning_check_on(ds2, ds):
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
assert round(env) == 93
assert round(prop_scores["matching_score"]) == 745
assert round(prop_scores["sum_weights"]) == 800
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
assert env2 is True
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_equivalence_with_versioning_check_off(ds2, ds):
@ -551,8 +729,28 @@ def test_graph_equivalence_with_versioning_check_off(ds2, ds):
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
assert round(env) == 93
assert round(prop_scores["matching_score"]) == 745
assert round(prop_scores["sum_weights"]) == 800
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
assert env2 is True
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)

View File

@ -1,3 +1,4 @@
import json
import os
import pytest
@ -37,7 +38,7 @@ def ds():
@pytest.fixture
def ds2():
def ds2_objects():
cam = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
idy = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
ind = stix2.v21.Indicator(id=INDICATOR_ID, created_by_ref=idy.id, **INDICATOR_KWARGS)
@ -68,7 +69,17 @@ def ds2():
published="2021-04-09T08:22:22Z", object_refs=stix_objs,
)
stix_objs.append(reprt)
yield stix2.MemoryStore(stix_objs)
yield stix_objs
@pytest.fixture
def ds2(ds2_objects):
yield stix2.MemoryStore(ds2_objects)
@pytest.fixture
def fs():
yield stix2.FileSystemSource(FS_PATH)
def test_object_factory_created_by_ref_str():
@ -426,14 +437,14 @@ def test_related_to_by_target(ds):
assert any(x['id'] == INDICATOR_ID for x in resp)
def test_semantic_equivalence_on_same_attack_pattern1():
def test_object_similarity_on_same_attack_pattern1():
ap1 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS)
ap2 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS)
env = stix2.Environment().semantically_equivalent(ap1, ap2)
env = stix2.Environment().object_similarity(ap1, ap2)
assert round(env) == 100
def test_semantic_equivalence_on_same_attack_pattern2():
def test_object_similarity_on_same_attack_pattern2():
ATTACK_KWARGS = dict(
name="Phishing",
external_references=[
@ -445,18 +456,18 @@ def test_semantic_equivalence_on_same_attack_pattern2():
)
ap1 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_KWARGS)
ap2 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_KWARGS)
env = stix2.Environment().semantically_equivalent(ap1, ap2)
env = stix2.Environment().object_similarity(ap1, ap2)
assert round(env) == 100
def test_semantic_equivalence_on_same_campaign1():
def test_object_similarity_on_same_campaign1():
camp1 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
camp2 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
env = stix2.Environment().semantically_equivalent(camp1, camp2)
env = stix2.Environment().object_similarity(camp1, camp2)
assert round(env) == 100
def test_semantic_equivalence_on_same_campaign2():
def test_object_similarity_on_same_campaign2():
CAMP_KWARGS = dict(
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector.",
@ -464,18 +475,18 @@ def test_semantic_equivalence_on_same_campaign2():
)
camp1 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMP_KWARGS)
camp2 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMP_KWARGS)
env = stix2.Environment().semantically_equivalent(camp1, camp2)
env = stix2.Environment().object_similarity(camp1, camp2)
assert round(env) == 100
def test_semantic_equivalence_on_same_identity1():
def test_object_similarity_on_same_identity1():
iden1 = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
iden2 = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
env = stix2.Environment().semantically_equivalent(iden1, iden2)
env = stix2.Environment().object_similarity(iden1, iden2)
assert round(env) == 100
def test_semantic_equivalence_on_same_identity2():
def test_object_similarity_on_same_identity2():
IDEN_KWARGS = dict(
name="John Smith",
identity_class="individual",
@ -483,26 +494,26 @@ def test_semantic_equivalence_on_same_identity2():
)
iden1 = stix2.v21.Identity(id=IDENTITY_ID, **IDEN_KWARGS)
iden2 = stix2.v21.Identity(id=IDENTITY_ID, **IDEN_KWARGS)
env = stix2.Environment().semantically_equivalent(iden1, iden2)
env = stix2.Environment().object_similarity(iden1, iden2)
assert round(env) == 100
def test_semantic_equivalence_on_same_indicator():
def test_object_similarity_on_same_indicator():
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
env = stix2.Environment().semantically_equivalent(ind1, ind2)
env = stix2.Environment().object_similarity(ind1, ind2)
assert round(env) == 100
def test_semantic_equivalence_on_same_location1():
def test_object_similarity_on_same_location1():
location_kwargs = dict(latitude=45, longitude=179)
loc1 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
loc2 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
env = stix2.Environment().semantically_equivalent(loc1, loc2)
env = stix2.Environment().object_similarity(loc1, loc2)
assert round(env) == 100
def test_semantic_equivalence_on_same_location2():
def test_object_similarity_on_same_location2():
location_kwargs = dict(
latitude=38.889,
longitude=-77.023,
@ -511,33 +522,33 @@ def test_semantic_equivalence_on_same_location2():
)
loc1 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
loc2 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
env = stix2.Environment().semantically_equivalent(loc1, loc2)
env = stix2.Environment().object_similarity(loc1, loc2)
assert round(env) == 100
def test_semantic_equivalence_location_with_no_latlong():
def test_object_similarity_location_with_no_latlong():
loc_kwargs = dict(country="US", administrative_area="US-DC")
loc1 = stix2.v21.Location(id=LOCATION_ID, **LOCATION_KWARGS)
loc2 = stix2.v21.Location(id=LOCATION_ID, **loc_kwargs)
env = stix2.Environment().semantically_equivalent(loc1, loc2)
env = stix2.Environment().object_similarity(loc1, loc2)
assert round(env) != 100
def test_semantic_equivalence_on_same_malware():
def test_object_similarity_on_same_malware():
malw1 = stix2.v21.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
malw2 = stix2.v21.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
env = stix2.Environment().semantically_equivalent(malw1, malw2)
env = stix2.Environment().object_similarity(malw1, malw2)
assert round(env) == 100
def test_semantic_equivalence_on_same_threat_actor1():
def test_object_similarity_on_same_threat_actor1():
ta1 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS)
ta2 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS)
env = stix2.Environment().semantically_equivalent(ta1, ta2)
env = stix2.Environment().object_similarity(ta1, ta2)
assert round(env) == 100
def test_semantic_equivalence_on_same_threat_actor2():
def test_object_similarity_on_same_threat_actor2():
THREAT_KWARGS = dict(
threat_actor_types=["crime-syndicate"],
aliases=["super-evil"],
@ -545,25 +556,38 @@ def test_semantic_equivalence_on_same_threat_actor2():
)
ta1 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_KWARGS)
ta2 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_KWARGS)
env = stix2.Environment().semantically_equivalent(ta1, ta2)
env = stix2.Environment().object_similarity(ta1, ta2)
assert round(env) == 100
def test_semantic_equivalence_on_same_tool():
def test_object_similarity_on_same_tool():
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
env = stix2.Environment().semantically_equivalent(tool1, tool2)
env = stix2.Environment().object_similarity(tool1, tool2)
assert round(env) == 100
def test_semantic_equivalence_on_same_vulnerability1():
def test_object_similarity_on_same_vulnerability1():
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
env = stix2.Environment().semantically_equivalent(vul1, vul2)
prop_scores = {}
env = stix2.Environment().object_similarity(vul1, vul2, prop_scores)
assert round(env) == 100
assert round(prop_scores["matching_score"]) == 30
assert round(prop_scores["sum_weights"]) == 30
def test_semantic_equivalence_on_same_vulnerability2():
def test_object_equivalence_on_same_vulnerability1():
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
prop_scores = {}
env = stix2.Environment().object_equivalence(vul1, vul2, prop_scores)
assert env is True
assert round(prop_scores["matching_score"]) == 30
assert round(prop_scores["sum_weights"]) == 30
def test_object_similarity_on_same_vulnerability2():
VULN_KWARGS1 = dict(
name="Heartbleed",
external_references=[
@ -584,11 +608,42 @@ def test_semantic_equivalence_on_same_vulnerability2():
)
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS1)
vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS2)
env = stix2.Environment().semantically_equivalent(vul1, vul2)
prop_scores = {}
env = stix2.Environment().object_similarity(vul1, vul2, prop_scores)
assert round(env) == 0.0
assert round(prop_scores["matching_score"]) == 0
assert round(prop_scores["sum_weights"]) == 100
def test_semantic_equivalence_on_unknown_object():
def test_object_equivalence_on_same_vulnerability2():
VULN_KWARGS1 = dict(
name="Heartbleed",
external_references=[
{
"url": "https://example",
"source_name": "some-source",
},
],
)
VULN_KWARGS2 = dict(
name="Foo",
external_references=[
{
"url": "https://example2",
"source_name": "some-source2",
},
],
)
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS1)
vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS2)
prop_scores = {}
env = stix2.Environment().object_equivalence(vul1, vul2, prop_scores)
assert env is False
assert round(prop_scores["matching_score"]) == 0
assert round(prop_scores["sum_weights"]) == 100
def test_object_similarity_on_unknown_object():
CUSTOM_KWARGS1 = dict(
type="x-foobar",
id="x-foobar--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061",
@ -615,17 +670,17 @@ def test_semantic_equivalence_on_unknown_object():
def _x_foobar_checks(obj1, obj2, **weights):
matching_score = 0.0
sum_weights = 0.0
if stix2.environment.check_property_present("external_references", obj1, obj2):
if stix2.equivalence.object.check_property_present("external_references", obj1, obj2):
w = weights["external_references"]
sum_weights += w
matching_score += w * stix2.environment.partial_external_reference_based(
matching_score += w * stix2.equivalence.object.partial_external_reference_based(
obj1["external_references"],
obj2["external_references"],
)
if stix2.environment.check_property_present("name", obj1, obj2):
if stix2.equivalence.object.check_property_present("name", obj1, obj2):
w = weights["name"]
sum_weights += w
matching_score += w * stix2.environment.partial_string_based(obj1["name"], obj2["name"])
matching_score += w * stix2.equivalence.object.partial_string_based(obj1["name"], obj2["name"])
return matching_score, sum_weights
weights = {
@ -640,20 +695,20 @@ def test_semantic_equivalence_on_unknown_object():
}
cust1 = stix2.parse(CUSTOM_KWARGS1, allow_custom=True)
cust2 = stix2.parse(CUSTOM_KWARGS2, allow_custom=True)
env = stix2.Environment().semantically_equivalent(cust1, cust2, **weights)
env = stix2.Environment().object_similarity(cust1, cust2, **weights)
assert round(env) == 0
def test_semantic_equivalence_different_type_raises():
def test_object_similarity_different_type_raises():
with pytest.raises(ValueError) as excinfo:
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
stix2.Environment().semantically_equivalent(vul1, ind1)
stix2.Environment().object_similarity(vul1, ind1)
assert str(excinfo.value) == "The objects to compare must be of the same type!"
def test_semantic_equivalence_different_spec_version_raises():
def test_object_similarity_different_spec_version_raises():
with pytest.raises(ValueError) as excinfo:
V20_KWARGS = dict(
labels=['malicious-activity'],
@ -661,23 +716,24 @@ def test_semantic_equivalence_different_spec_version_raises():
)
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **V20_KWARGS)
stix2.Environment().semantically_equivalent(ind1, ind2)
stix2.Environment().object_similarity(ind1, ind2)
assert str(excinfo.value) == "The objects to compare must be of the same spec version!"
def test_semantic_equivalence_zero_match():
def test_object_similarity_zero_match():
IND_KWARGS = dict(
indicator_types=["APTX"],
indicator_types=["malicious-activity", "bar"],
pattern="[ipv4-addr:value = '192.168.1.1']",
pattern_type="stix",
valid_from="2019-01-01T12:34:56Z",
labels=["APTX", "foo"],
)
weights = {
"indicator": {
"indicator_types": (15, stix2.environment.partial_list_based),
"pattern": (80, stix2.environment.custom_pattern_based),
"valid_from": (5, stix2.environment.partial_timestamp_based),
"indicator_types": (15, stix2.equivalence.object.partial_list_based),
"pattern": (80, stix2.equivalence.object.custom_pattern_based),
"valid_from": (5, stix2.equivalence.object.partial_timestamp_based),
"tdelta": 1, # One day interval
},
"_internal": {
@ -686,20 +742,22 @@ def test_semantic_equivalence_zero_match():
}
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v21.Indicator(id=INDICATOR_ID, **IND_KWARGS)
env = stix2.Environment().semantically_equivalent(ind1, ind2, **weights)
assert round(env) == 0
env = stix2.Environment().object_similarity(ind1, ind2, **weights)
assert round(env) == 8
env = stix2.Environment().object_similarity(ind2, ind1, **weights)
assert round(env) == 8
def test_semantic_equivalence_different_spec_version():
def test_object_similarity_different_spec_version():
IND_KWARGS = dict(
labels=["APTX"],
pattern="[ipv4-addr:value = '192.168.1.1']",
)
weights = {
"indicator": {
"indicator_types": (15, stix2.environment.partial_list_based),
"pattern": (80, stix2.environment.custom_pattern_based),
"valid_from": (5, stix2.environment.partial_timestamp_based),
"indicator_types": (15, stix2.equivalence.object.partial_list_based),
"pattern": (80, stix2.equivalence.object.custom_pattern_based),
"valid_from": (5, stix2.equivalence.object.partial_timestamp_based),
"tdelta": 1, # One day interval
},
"_internal": {
@ -708,7 +766,10 @@ def test_semantic_equivalence_different_spec_version():
}
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **IND_KWARGS)
env = stix2.Environment().semantically_equivalent(ind1, ind2, **weights)
env = stix2.Environment().object_similarity(ind1, ind2, **weights)
assert round(env) == 0
env = stix2.Environment().object_similarity(ind2, ind1, **weights)
assert round(env) == 0
@ -780,34 +841,34 @@ def test_semantic_equivalence_different_spec_version():
),
],
)
def test_semantic_equivalence_external_references(refs1, refs2, ret_val):
value = stix2.environment.partial_external_reference_based(refs1, refs2)
def test_object_similarity_external_references(refs1, refs2, ret_val):
value = stix2.equivalence.object.partial_external_reference_based(refs1, refs2)
assert value == ret_val
def test_semantic_equivalence_timestamp():
def test_object_similarity_timestamp():
t1 = "2018-10-17T00:14:20.652Z"
t2 = "2018-10-17T12:14:20.652Z"
assert stix2.environment.partial_timestamp_based(t1, t2, 1) == 0.5
assert stix2.equivalence.object.partial_timestamp_based(t1, t2, 1) == 0.5
def test_semantic_equivalence_exact_match():
def test_object_similarity_exact_match():
t1 = "2018-10-17T00:14:20.652Z"
t2 = "2018-10-17T12:14:20.652Z"
assert stix2.environment.exact_match(t1, t2) == 0.0
assert stix2.equivalence.object.exact_match(t1, t2) == 0.0
def test_non_existent_config_for_object():
r1 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
r2 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
assert stix2.Environment().semantically_equivalent(r1, r2) == 0.0
assert stix2.Environment().object_similarity(r1, r2) == 0.0
def custom_semantic_equivalence_method(obj1, obj2, **weights):
return 96.0, 100.0
def test_semantic_equivalence_method_provided():
def test_object_similarity_method_provided():
# Because `method` is provided, `partial_list_based` will be ignored
TOOL2_KWARGS = dict(
name="Random Software",
@ -816,19 +877,19 @@ def test_semantic_equivalence_method_provided():
weights = {
"tool": {
"tool_types": (20, stix2.environment.partial_list_based),
"name": (80, stix2.environment.partial_string_based),
"tool_types": (20, stix2.equivalence.object.partial_list_based),
"name": (80, stix2.equivalence.object.partial_string_based),
"method": custom_semantic_equivalence_method,
},
}
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
env = stix2.Environment().semantically_equivalent(tool1, tool2, **weights)
env = stix2.Environment().object_similarity(tool1, tool2, **weights)
assert round(env) == 96
def test_semantic_equivalence_prop_scores():
def test_object_similarity_prop_scores():
TOOL2_KWARGS = dict(
name="Random Software",
tool_types=["information-gathering"],
@ -838,7 +899,7 @@ def test_semantic_equivalence_prop_scores():
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
stix2.Environment().semantically_equivalent(tool1, tool2, prop_scores)
stix2.Environment().object_similarity(tool1, tool2, prop_scores)
assert len(prop_scores) == 4
assert round(prop_scores["matching_score"], 1) == 8.9
assert round(prop_scores["sum_weights"], 1) == 100.0
@ -850,7 +911,7 @@ def custom_semantic_equivalence_method_prop_scores(obj1, obj2, prop_scores, **we
return 96.0, 100.0
def test_semantic_equivalence_prop_scores_method_provided():
def test_object_similarity_prop_scores_method_provided():
TOOL2_KWARGS = dict(
name="Random Software",
tool_types=["information-gathering"],
@ -868,7 +929,7 @@ def test_semantic_equivalence_prop_scores_method_provided():
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
env = stix2.Environment().semantically_equivalent(tool1, tool2, prop_scores, **weights)
env = stix2.Environment().object_similarity(tool1, tool2, prop_scores, **weights)
assert round(env) == 96
assert len(prop_scores) == 2
assert prop_scores["matching_score"] == 96.0
@ -955,8 +1016,30 @@ def test_list_semantic_check(ds, ds2):
)
assert round(score) == 1
score = stix2.equivalence.object.list_reference_check(
object_refs2,
object_refs1,
ds2,
ds,
**weights,
)
assert round(score) == 1
def test_graph_equivalence_with_filesystem_source(ds):
def test_graph_similarity_raises_value_error(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": -1,
},
}
with pytest.raises(ValueError):
prop_scores1 = {}
stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
def test_graph_similarity_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
@ -964,12 +1047,257 @@ def test_graph_equivalence_with_filesystem_source(ds):
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, **weights)
assert round(env1) == 23
assert round(prop_scores1["matching_score"]) == 411
assert round(prop_scores1["len_pairs"]) == 18
assert round(env2) == 23
assert round(prop_scores2["matching_score"]) == 411
assert round(prop_scores2["len_pairs"]) == 18
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_depth_limiting():
g1 = [
{
"type": "foo",
"id": "foo--07f9dd2a-1cce-45bb-8cbe-dba3f007aafd",
"spec_version": "2.1",
"created": "1986-02-08T00:20:17Z",
"modified": "1989-12-11T06:54:29Z",
"some1_ref": "foo--700a8a3c-9936-412f-b4eb-ede466476180",
"some2_ref": "foo--f4a999a3-df94-499d-9cac-6c02e21775ee",
},
{
"type": "foo",
"id": "foo--700a8a3c-9936-412f-b4eb-ede466476180",
"spec_version": "2.1",
"created": "1989-01-06T10:31:54Z",
"modified": "1995-06-18T10:25:01Z",
"some1_ref": "foo--705afd45-eb56-43fc-a214-313d63d199a3",
},
{
"type": "foo",
"id": "foo--705afd45-eb56-43fc-a214-313d63d199a3",
"spec_version": "2.1",
"created": "1977-11-06T21:19:29Z",
"modified": "1997-12-02T20:33:34Z",
},
{
"type": "foo",
"id": "foo--f4a999a3-df94-499d-9cac-6c02e21775ee",
"spec_version": "2.1",
"created": "1991-09-17T00:40:52Z",
"modified": "1992-12-06T11:02:47Z",
"name": "alice",
},
]
g2 = [
{
"type": "foo",
"id": "foo--71570479-3e6e-48d2-81fb-897454dec55d",
"spec_version": "2.1",
"created": "1975-12-22T05:20:38Z",
"modified": "1980-11-11T01:09:03Z",
"some1_ref": "foo--4aeda39b-31fa-4ffb-a847-d8edc175a579",
"some2_ref": "foo--941e48d6-3100-4419-9e8c-cf1eb59e71b2",
},
{
"type": "foo",
"id": "foo--4aeda39b-31fa-4ffb-a847-d8edc175a579",
"spec_version": "2.1",
"created": "1976-01-05T08:32:03Z",
"modified": "1980-11-09T05:41:02Z",
"some1_ref": "foo--689252c3-5d20-43ff-bbf7-c8e45d713768",
},
{
"type": "foo",
"id": "foo--689252c3-5d20-43ff-bbf7-c8e45d713768",
"spec_version": "2.1",
"created": "1974-09-11T18:56:30Z",
"modified": "1976-10-31T11:59:43Z",
},
{
"type": "foo",
"id": "foo--941e48d6-3100-4419-9e8c-cf1eb59e71b2",
"spec_version": "2.1",
"created": "1985-01-03T01:07:03Z",
"modified": "1992-07-20T21:32:31Z",
"name": "alice",
},
]
mem_store1 = stix2.MemorySource(g1)
mem_store2 = stix2.MemorySource(g2)
custom_weights = {
"foo": {
"some1_ref": (33, stix2.equivalence.object.reference_check),
"some2_ref": (33, stix2.equivalence.object.reference_check),
"name": (34, stix2.equivalence.object.partial_string_based),
},
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.equivalence.graph.graph_similarity(mem_store1, mem_store2, prop_scores1, **custom_weights)
assert round(env1) == 38
assert round(prop_scores1["matching_score"]) == 300
assert round(prop_scores1["len_pairs"]) == 8
# from 'alice' check in de-reference
assert prop_scores1['summary']['foo--71570479-3e6e-48d2-81fb-897454dec55d']['prop_score']['some2_ref']['weight'] == 33
assert prop_scores1['summary']['foo--07f9dd2a-1cce-45bb-8cbe-dba3f007aafd']['prop_score']['some2_ref']['weight'] == 33
# Switching parameters
prop_scores2 = {}
env2 = stix2.equivalence.graph.graph_similarity(
mem_store2, mem_store1, prop_scores2, **custom_weights
)
assert round(env2) == 38
assert round(prop_scores2["matching_score"]) == 300
assert round(prop_scores2["len_pairs"]) == 8
# from 'alice' check in de-reference
assert prop_scores2['summary']['foo--71570479-3e6e-48d2-81fb-897454dec55d']['prop_score']['some2_ref']['weight'] == 33
assert prop_scores2['summary']['foo--07f9dd2a-1cce-45bb-8cbe-dba3f007aafd']['prop_score']['some2_ref']['weight'] == 33
def test_graph_similarity_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {}
fs = stix2.FileSystemSource(FS_PATH)
env = stix2.Environment().graphically_equivalent(fs, ds, prop_scores, **weights)
assert round(env) == 24
assert round(prop_scores["matching_score"]) == 122
assert round(prop_scores["sum_weights"]) == 500
env = stix2.Environment().graph_similarity(ds, ds, prop_scores, **weights)
assert round(env) == 100
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_similarity_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights)
assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_similarity_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights)
assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_equivalence_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, **weights)
assert env1 is False
assert round(prop_scores1["matching_score"]) == 411
assert round(prop_scores1["len_pairs"]) == 18
assert env2 is False
assert round(prop_scores2["matching_score"]) == 411
assert round(prop_scores2["len_pairs"]) == 18
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_equivalence_with_duplicate_graph(ds):
@ -981,10 +1309,10 @@ def test_graph_equivalence_with_duplicate_graph(ds):
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds, prop_scores, **weights)
assert round(env) == 100
env = stix2.Environment().graph_equivalence(ds, ds, prop_scores, **weights)
assert env is True
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["sum_weights"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_equivalence_with_versioning_check_on(ds2, ds):
@ -995,11 +1323,31 @@ def test_graph_equivalence_with_versioning_check_on(ds2, ds):
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
assert round(env) == 93
assert round(prop_scores["matching_score"]) == 745
assert round(prop_scores["sum_weights"]) == 800
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
assert env2 is True
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)
def test_graph_equivalence_with_versioning_check_off(ds2, ds):
@ -1010,8 +1358,28 @@ def test_graph_equivalence_with_versioning_check_off(ds2, ds):
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
assert round(env) == 93
assert round(prop_scores["matching_score"]) == 745
assert round(prop_scores["sum_weights"]) == 800
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
assert env2 is True
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
prop_scores1["matching_score"] = round(prop_scores1["matching_score"], 3)
prop_scores2["matching_score"] = round(prop_scores2["matching_score"], 3)
assert json.dumps(prop_scores1, sort_keys=True, indent=4) == json.dumps(prop_scores2, sort_keys=True, indent=4)