WIP: changes to graph_similarity
busted main loop, symmetrical properties not presentpull/1/head
parent
03b3423cbb
commit
489970718f
|
@ -2,12 +2,12 @@
|
||||||
import copy
|
import copy
|
||||||
|
|
||||||
from .datastore import CompositeDataSource, DataStoreMixin
|
from .datastore import CompositeDataSource, DataStoreMixin
|
||||||
from .equivalence.graph import graphically_equivalent
|
from .equivalence.graph import graph_similarity
|
||||||
from .equivalence.object import ( # noqa: F401
|
from .equivalence.object import ( # noqa: F401
|
||||||
WEIGHTS, check_property_present, custom_pattern_based, exact_match,
|
WEIGHTS, check_property_present, custom_pattern_based, exact_match,
|
||||||
list_reference_check, partial_external_reference_based, partial_list_based,
|
list_reference_check, partial_external_reference_based, partial_list_based,
|
||||||
partial_location_distance, partial_string_based, partial_timestamp_based,
|
partial_location_distance, partial_string_based, partial_timestamp_based,
|
||||||
reference_check, semantically_equivalent,
|
reference_check, object_similarity,
|
||||||
)
|
)
|
||||||
from .parsing import parse as _parse
|
from .parsing import parse as _parse
|
||||||
|
|
||||||
|
@ -197,7 +197,7 @@ class Environment(DataStoreMixin):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
|
def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
|
||||||
"""This method verifies if two objects of the same type are
|
"""This method verifies if two objects of the same type are
|
||||||
semantically equivalent.
|
semantically equivalent.
|
||||||
|
|
||||||
|
@ -229,10 +229,10 @@ class Environment(DataStoreMixin):
|
||||||
see `the Committee Note <link here>`__.
|
see `the Committee Note <link here>`__.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
return semantically_equivalent(obj1, obj2, prop_scores, **weight_dict)
|
return object_similarity(obj1, obj2, prop_scores, **weight_dict)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def graphically_equivalent(ds1, ds2, prop_scores={}, **weight_dict):
|
def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
|
||||||
"""This method verifies if two graphs are semantically equivalent.
|
"""This method verifies if two graphs are semantically equivalent.
|
||||||
Each DataStore can contain a connected or disconnected graph and the
|
Each DataStore can contain a connected or disconnected graph and the
|
||||||
final result is weighted over the amount of objects we managed to compare.
|
final result is weighted over the amount of objects we managed to compare.
|
||||||
|
@ -267,4 +267,4 @@ class Environment(DataStoreMixin):
|
||||||
see `the Committee Note <link here>`__.
|
see `the Committee Note <link here>`__.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
return graphically_equivalent(ds1, ds2, prop_scores, **weight_dict)
|
return graph_similarity(ds1, ds2, prop_scores, **weight_dict)
|
||||||
|
|
|
@ -1,15 +1,17 @@
|
||||||
"""Python APIs for STIX 2 Graph-based Semantic Equivalence."""
|
"""Python APIs for STIX 2 Graph-based Semantic Equivalence."""
|
||||||
|
import collections
|
||||||
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from ..object import (
|
from ..object import (
|
||||||
WEIGHTS, exact_match, list_reference_check, partial_string_based,
|
WEIGHTS, exact_match, list_reference_check, partial_string_based,
|
||||||
partial_timestamp_based, reference_check, semantically_equivalent,
|
partial_timestamp_based, reference_check, object_similarity,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def graphically_equivalent(ds1, ds2, prop_scores={}, **weight_dict):
|
def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
|
||||||
"""This method verifies if two graphs are semantically equivalent.
|
"""This method verifies if two graphs are semantically equivalent.
|
||||||
Each DataStore can contain a connected or disconnected graph and the
|
Each DataStore can contain a connected or disconnected graph and the
|
||||||
final result is weighted over the amount of objects we managed to compare.
|
final result is weighted over the amount of objects we managed to compare.
|
||||||
|
@ -44,49 +46,48 @@ def graphically_equivalent(ds1, ds2, prop_scores={}, **weight_dict):
|
||||||
see `the Committee Note <link here>`__.
|
see `the Committee Note <link here>`__.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
results = {}
|
||||||
|
equivalence_score = 0
|
||||||
weights = GRAPH_WEIGHTS.copy()
|
weights = GRAPH_WEIGHTS.copy()
|
||||||
|
|
||||||
if weight_dict:
|
if weight_dict:
|
||||||
weights.update(weight_dict)
|
weights.update(weight_dict)
|
||||||
|
|
||||||
results = {}
|
|
||||||
depth = weights["_internal"]["max_depth"]
|
depth = weights["_internal"]["max_depth"]
|
||||||
|
|
||||||
graph1 = ds1.query([])
|
graph1 = bucket_per_type(ds1.query([]))
|
||||||
graph2 = ds2.query([])
|
graph2 = bucket_per_type(ds2.query([]))
|
||||||
|
pairs = object_pairs(graph1, graph2, weights)
|
||||||
|
|
||||||
graph1.sort(key=lambda x: x["type"])
|
for object1, object2 in pairs:
|
||||||
graph2.sort(key=lambda x: x["type"])
|
iprop_score1 = {}
|
||||||
|
iprop_score2 = {}
|
||||||
if len(graph1) < len(graph2):
|
object1_id = object1["id"]
|
||||||
|
object2_id = object2["id"]
|
||||||
|
weights["_internal"]["max_depth"] = depth
|
||||||
weights["_internal"]["ds1"] = ds1
|
weights["_internal"]["ds1"] = ds1
|
||||||
weights["_internal"]["ds2"] = ds2
|
weights["_internal"]["ds2"] = ds2
|
||||||
g1 = graph1
|
result1 = object_similarity(object1, object2, iprop_score1, **weights)
|
||||||
g2 = graph2
|
|
||||||
else:
|
|
||||||
weights["_internal"]["ds1"] = ds2
|
weights["_internal"]["ds1"] = ds2
|
||||||
weights["_internal"]["ds2"] = ds1
|
weights["_internal"]["ds2"] = ds1
|
||||||
g1 = graph2
|
result2 = object_similarity(object2, object1, iprop_score2, **weights)
|
||||||
g2 = graph1
|
|
||||||
|
|
||||||
for object1 in g1:
|
if object1_id not in results:
|
||||||
for object2 in g2:
|
results[object1_id] = {"lhs": object1["id"], "rhs": object2["id"], "prop_score": iprop_score1, "value": result1}
|
||||||
if object1["type"] == object2["type"] and object1["type"] in weights:
|
elif result1 > results[object1_id]["value"]:
|
||||||
iprop_score = {}
|
results[object1_id] = {"lhs": object1["id"], "rhs": object2["id"], "prop_score": iprop_score1, "value": result1}
|
||||||
result = semantically_equivalent(object1, object2, iprop_score, **weights)
|
|
||||||
objects1_id = object1["id"]
|
|
||||||
weights["_internal"]["max_depth"] = depth
|
|
||||||
|
|
||||||
if objects1_id not in results:
|
if object2_id not in results:
|
||||||
results[objects1_id] = {"matched": object2["id"], "prop_score": iprop_score, "value": result}
|
results[object2_id] = {"lhs": object2["id"], "rhs": object1["id"], "prop_score": iprop_score2, "value": result2}
|
||||||
elif result > results[objects1_id]["value"]:
|
elif result1 > results[object2_id]["value"]:
|
||||||
results[objects1_id] = {"matched": object2["id"], "prop_score": iprop_score, "value": result}
|
results[object2_id] = {"lhs": object2["id"], "rhs": object1["id"], "prop_score": iprop_score2, "value": result2}
|
||||||
|
|
||||||
equivalence_score = 0
|
|
||||||
matching_score = sum(x["value"] for x in results.values())
|
matching_score = sum(x["value"] for x in results.values())
|
||||||
sum_weights = len(results) * 100.0
|
sum_weights = len(results)
|
||||||
if sum_weights > 0:
|
if sum_weights > 0:
|
||||||
equivalence_score = (matching_score / sum_weights) * 100
|
equivalence_score = matching_score / sum_weights
|
||||||
|
|
||||||
prop_scores["matching_score"] = matching_score
|
prop_scores["matching_score"] = matching_score
|
||||||
prop_scores["sum_weights"] = sum_weights
|
prop_scores["sum_weights"] = sum_weights
|
||||||
prop_scores["summary"] = results
|
prop_scores["summary"] = results
|
||||||
|
@ -100,6 +101,22 @@ def graphically_equivalent(ds1, ds2, prop_scores={}, **weight_dict):
|
||||||
return equivalence_score
|
return equivalence_score
|
||||||
|
|
||||||
|
|
||||||
|
def bucket_per_type(g):
|
||||||
|
buckets = collections.defaultdict(list)
|
||||||
|
[buckets[obj["type"]].append(obj) for obj in g]
|
||||||
|
return buckets
|
||||||
|
|
||||||
|
|
||||||
|
def object_pairs(g1, g2, w):
|
||||||
|
types_in_common = set(g1.keys()).intersection(g2.keys())
|
||||||
|
testable_types = types_in_common.intersection(w.keys())
|
||||||
|
|
||||||
|
return itertools.chain.from_iterable(
|
||||||
|
itertools.product(g1[stix_type], g2[stix_type])
|
||||||
|
for stix_type in testable_types
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# default weights used for the graph semantic equivalence process
|
# default weights used for the graph semantic equivalence process
|
||||||
GRAPH_WEIGHTS = WEIGHTS.copy()
|
GRAPH_WEIGHTS = WEIGHTS.copy()
|
||||||
GRAPH_WEIGHTS.update({
|
GRAPH_WEIGHTS.update({
|
||||||
|
|
|
@ -9,7 +9,7 @@ from ..pattern import equivalent_patterns
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
|
def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
|
||||||
"""This method verifies if two objects of the same type are
|
"""This method verifies if two objects of the same type are
|
||||||
semantically equivalent.
|
semantically equivalent.
|
||||||
|
|
||||||
|
@ -312,7 +312,7 @@ def _versioned_checks(ref1, ref2, ds1, ds2, **weights):
|
||||||
if len(objects1) > 0 and len(objects2) > 0:
|
if len(objects1) > 0 and len(objects2) > 0:
|
||||||
for o1 in objects1:
|
for o1 in objects1:
|
||||||
for o2 in objects2:
|
for o2 in objects2:
|
||||||
result = semantically_equivalent(o1, o2, **weights)
|
result = object_similarity(o1, o2, **weights)
|
||||||
if ref1 not in results:
|
if ref1 not in results:
|
||||||
results[ref1] = {"matched": ref2, "value": result}
|
results[ref1] = {"matched": ref2, "value": result}
|
||||||
elif result > results[ref1]["value"]:
|
elif result > results[ref1]["value"]:
|
||||||
|
@ -337,7 +337,7 @@ def reference_check(ref1, ref2, ds1, ds2, **weights):
|
||||||
else:
|
else:
|
||||||
o1, o2 = ds1.get(ref1), ds2.get(ref2)
|
o1, o2 = ds1.get(ref1), ds2.get(ref2)
|
||||||
if o1 and o2:
|
if o1 and o2:
|
||||||
result = semantically_equivalent(o1, o2, **weights) / 100.0
|
result = object_similarity(o1, o2, **weights) / 100.0
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"--\t\treference_check '%s' '%s'\tresult: '%s'",
|
"--\t\treference_check '%s' '%s'\tresult: '%s'",
|
||||||
|
|
|
@ -429,7 +429,7 @@ def test_related_to_by_target(ds):
|
||||||
def test_semantic_equivalence_on_same_attack_pattern1():
|
def test_semantic_equivalence_on_same_attack_pattern1():
|
||||||
ap1 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS)
|
ap1 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS)
|
||||||
ap2 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS)
|
ap2 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS)
|
||||||
env = stix2.Environment().semantically_equivalent(ap1, ap2)
|
env = stix2.Environment().object_similarity(ap1, ap2)
|
||||||
assert round(env) == 100
|
assert round(env) == 100
|
||||||
|
|
||||||
|
|
||||||
|
@ -445,14 +445,14 @@ def test_semantic_equivalence_on_same_attack_pattern2():
|
||||||
)
|
)
|
||||||
ap1 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_KWARGS)
|
ap1 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_KWARGS)
|
||||||
ap2 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_KWARGS)
|
ap2 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_KWARGS)
|
||||||
env = stix2.Environment().semantically_equivalent(ap1, ap2)
|
env = stix2.Environment().object_similarity(ap1, ap2)
|
||||||
assert round(env) == 100
|
assert round(env) == 100
|
||||||
|
|
||||||
|
|
||||||
def test_semantic_equivalence_on_same_campaign1():
|
def test_semantic_equivalence_on_same_campaign1():
|
||||||
camp1 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
|
camp1 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
|
||||||
camp2 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
|
camp2 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
|
||||||
env = stix2.Environment().semantically_equivalent(camp1, camp2)
|
env = stix2.Environment().object_similarity(camp1, camp2)
|
||||||
assert round(env) == 100
|
assert round(env) == 100
|
||||||
|
|
||||||
|
|
||||||
|
@ -464,14 +464,14 @@ def test_semantic_equivalence_on_same_campaign2():
|
||||||
)
|
)
|
||||||
camp1 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMP_KWARGS)
|
camp1 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMP_KWARGS)
|
||||||
camp2 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMP_KWARGS)
|
camp2 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMP_KWARGS)
|
||||||
env = stix2.Environment().semantically_equivalent(camp1, camp2)
|
env = stix2.Environment().object_similarity(camp1, camp2)
|
||||||
assert round(env) == 100
|
assert round(env) == 100
|
||||||
|
|
||||||
|
|
||||||
def test_semantic_equivalence_on_same_identity1():
|
def test_semantic_equivalence_on_same_identity1():
|
||||||
iden1 = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
|
iden1 = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
|
||||||
iden2 = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
|
iden2 = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
|
||||||
env = stix2.Environment().semantically_equivalent(iden1, iden2)
|
env = stix2.Environment().object_similarity(iden1, iden2)
|
||||||
assert round(env) == 100
|
assert round(env) == 100
|
||||||
|
|
||||||
|
|
||||||
|
@ -483,14 +483,14 @@ def test_semantic_equivalence_on_same_identity2():
|
||||||
)
|
)
|
||||||
iden1 = stix2.v21.Identity(id=IDENTITY_ID, **IDEN_KWARGS)
|
iden1 = stix2.v21.Identity(id=IDENTITY_ID, **IDEN_KWARGS)
|
||||||
iden2 = stix2.v21.Identity(id=IDENTITY_ID, **IDEN_KWARGS)
|
iden2 = stix2.v21.Identity(id=IDENTITY_ID, **IDEN_KWARGS)
|
||||||
env = stix2.Environment().semantically_equivalent(iden1, iden2)
|
env = stix2.Environment().object_similarity(iden1, iden2)
|
||||||
assert round(env) == 100
|
assert round(env) == 100
|
||||||
|
|
||||||
|
|
||||||
def test_semantic_equivalence_on_same_indicator():
|
def test_semantic_equivalence_on_same_indicator():
|
||||||
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
||||||
ind2 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
ind2 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
||||||
env = stix2.Environment().semantically_equivalent(ind1, ind2)
|
env = stix2.Environment().object_similarity(ind1, ind2)
|
||||||
assert round(env) == 100
|
assert round(env) == 100
|
||||||
|
|
||||||
|
|
||||||
|
@ -498,7 +498,7 @@ def test_semantic_equivalence_on_same_location1():
|
||||||
location_kwargs = dict(latitude=45, longitude=179)
|
location_kwargs = dict(latitude=45, longitude=179)
|
||||||
loc1 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
|
loc1 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
|
||||||
loc2 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
|
loc2 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
|
||||||
env = stix2.Environment().semantically_equivalent(loc1, loc2)
|
env = stix2.Environment().object_similarity(loc1, loc2)
|
||||||
assert round(env) == 100
|
assert round(env) == 100
|
||||||
|
|
||||||
|
|
||||||
|
@ -511,7 +511,7 @@ def test_semantic_equivalence_on_same_location2():
|
||||||
)
|
)
|
||||||
loc1 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
|
loc1 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
|
||||||
loc2 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
|
loc2 = stix2.v21.Location(id=LOCATION_ID, **location_kwargs)
|
||||||
env = stix2.Environment().semantically_equivalent(loc1, loc2)
|
env = stix2.Environment().object_similarity(loc1, loc2)
|
||||||
assert round(env) == 100
|
assert round(env) == 100
|
||||||
|
|
||||||
|
|
||||||
|
@ -519,21 +519,21 @@ def test_semantic_equivalence_location_with_no_latlong():
|
||||||
loc_kwargs = dict(country="US", administrative_area="US-DC")
|
loc_kwargs = dict(country="US", administrative_area="US-DC")
|
||||||
loc1 = stix2.v21.Location(id=LOCATION_ID, **LOCATION_KWARGS)
|
loc1 = stix2.v21.Location(id=LOCATION_ID, **LOCATION_KWARGS)
|
||||||
loc2 = stix2.v21.Location(id=LOCATION_ID, **loc_kwargs)
|
loc2 = stix2.v21.Location(id=LOCATION_ID, **loc_kwargs)
|
||||||
env = stix2.Environment().semantically_equivalent(loc1, loc2)
|
env = stix2.Environment().object_similarity(loc1, loc2)
|
||||||
assert round(env) != 100
|
assert round(env) != 100
|
||||||
|
|
||||||
|
|
||||||
def test_semantic_equivalence_on_same_malware():
|
def test_semantic_equivalence_on_same_malware():
|
||||||
malw1 = stix2.v21.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
|
malw1 = stix2.v21.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
|
||||||
malw2 = stix2.v21.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
|
malw2 = stix2.v21.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
|
||||||
env = stix2.Environment().semantically_equivalent(malw1, malw2)
|
env = stix2.Environment().object_similarity(malw1, malw2)
|
||||||
assert round(env) == 100
|
assert round(env) == 100
|
||||||
|
|
||||||
|
|
||||||
def test_semantic_equivalence_on_same_threat_actor1():
|
def test_semantic_equivalence_on_same_threat_actor1():
|
||||||
ta1 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS)
|
ta1 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS)
|
||||||
ta2 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS)
|
ta2 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS)
|
||||||
env = stix2.Environment().semantically_equivalent(ta1, ta2)
|
env = stix2.Environment().object_similarity(ta1, ta2)
|
||||||
assert round(env) == 100
|
assert round(env) == 100
|
||||||
|
|
||||||
|
|
||||||
|
@ -545,21 +545,21 @@ def test_semantic_equivalence_on_same_threat_actor2():
|
||||||
)
|
)
|
||||||
ta1 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_KWARGS)
|
ta1 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_KWARGS)
|
||||||
ta2 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_KWARGS)
|
ta2 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_KWARGS)
|
||||||
env = stix2.Environment().semantically_equivalent(ta1, ta2)
|
env = stix2.Environment().object_similarity(ta1, ta2)
|
||||||
assert round(env) == 100
|
assert round(env) == 100
|
||||||
|
|
||||||
|
|
||||||
def test_semantic_equivalence_on_same_tool():
|
def test_semantic_equivalence_on_same_tool():
|
||||||
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
|
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
|
||||||
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
|
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
|
||||||
env = stix2.Environment().semantically_equivalent(tool1, tool2)
|
env = stix2.Environment().object_similarity(tool1, tool2)
|
||||||
assert round(env) == 100
|
assert round(env) == 100
|
||||||
|
|
||||||
|
|
||||||
def test_semantic_equivalence_on_same_vulnerability1():
|
def test_semantic_equivalence_on_same_vulnerability1():
|
||||||
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
|
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
|
||||||
vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
|
vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
|
||||||
env = stix2.Environment().semantically_equivalent(vul1, vul2)
|
env = stix2.Environment().object_similarity(vul1, vul2)
|
||||||
assert round(env) == 100
|
assert round(env) == 100
|
||||||
|
|
||||||
|
|
||||||
|
@ -584,7 +584,7 @@ def test_semantic_equivalence_on_same_vulnerability2():
|
||||||
)
|
)
|
||||||
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS1)
|
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS1)
|
||||||
vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS2)
|
vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS2)
|
||||||
env = stix2.Environment().semantically_equivalent(vul1, vul2)
|
env = stix2.Environment().object_similarity(vul1, vul2)
|
||||||
assert round(env) == 0.0
|
assert round(env) == 0.0
|
||||||
|
|
||||||
|
|
||||||
|
@ -640,7 +640,7 @@ def test_semantic_equivalence_on_unknown_object():
|
||||||
}
|
}
|
||||||
cust1 = stix2.parse(CUSTOM_KWARGS1, allow_custom=True)
|
cust1 = stix2.parse(CUSTOM_KWARGS1, allow_custom=True)
|
||||||
cust2 = stix2.parse(CUSTOM_KWARGS2, allow_custom=True)
|
cust2 = stix2.parse(CUSTOM_KWARGS2, allow_custom=True)
|
||||||
env = stix2.Environment().semantically_equivalent(cust1, cust2, **weights)
|
env = stix2.Environment().object_similarity(cust1, cust2, **weights)
|
||||||
assert round(env) == 0
|
assert round(env) == 0
|
||||||
|
|
||||||
|
|
||||||
|
@ -648,7 +648,7 @@ def test_semantic_equivalence_different_type_raises():
|
||||||
with pytest.raises(ValueError) as excinfo:
|
with pytest.raises(ValueError) as excinfo:
|
||||||
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
|
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
|
||||||
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
||||||
stix2.Environment().semantically_equivalent(vul1, ind1)
|
stix2.Environment().object_similarity(vul1, ind1)
|
||||||
|
|
||||||
assert str(excinfo.value) == "The objects to compare must be of the same type!"
|
assert str(excinfo.value) == "The objects to compare must be of the same type!"
|
||||||
|
|
||||||
|
@ -661,7 +661,7 @@ def test_semantic_equivalence_different_spec_version_raises():
|
||||||
)
|
)
|
||||||
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
||||||
ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **V20_KWARGS)
|
ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **V20_KWARGS)
|
||||||
stix2.Environment().semantically_equivalent(ind1, ind2)
|
stix2.Environment().object_similarity(ind1, ind2)
|
||||||
|
|
||||||
assert str(excinfo.value) == "The objects to compare must be of the same spec version!"
|
assert str(excinfo.value) == "The objects to compare must be of the same spec version!"
|
||||||
|
|
||||||
|
@ -686,7 +686,7 @@ def test_semantic_equivalence_zero_match():
|
||||||
}
|
}
|
||||||
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
||||||
ind2 = stix2.v21.Indicator(id=INDICATOR_ID, **IND_KWARGS)
|
ind2 = stix2.v21.Indicator(id=INDICATOR_ID, **IND_KWARGS)
|
||||||
env = stix2.Environment().semantically_equivalent(ind1, ind2, **weights)
|
env = stix2.Environment().object_similarity(ind1, ind2, **weights)
|
||||||
assert round(env) == 0
|
assert round(env) == 0
|
||||||
|
|
||||||
|
|
||||||
|
@ -708,7 +708,7 @@ def test_semantic_equivalence_different_spec_version():
|
||||||
}
|
}
|
||||||
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
||||||
ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **IND_KWARGS)
|
ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **IND_KWARGS)
|
||||||
env = stix2.Environment().semantically_equivalent(ind1, ind2, **weights)
|
env = stix2.Environment().object_similarity(ind1, ind2, **weights)
|
||||||
assert round(env) == 0
|
assert round(env) == 0
|
||||||
|
|
||||||
|
|
||||||
|
@ -800,7 +800,7 @@ def test_semantic_equivalence_exact_match():
|
||||||
def test_non_existent_config_for_object():
|
def test_non_existent_config_for_object():
|
||||||
r1 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
|
r1 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
|
||||||
r2 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
|
r2 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
|
||||||
assert stix2.Environment().semantically_equivalent(r1, r2) == 0.0
|
assert stix2.Environment().object_similarity(r1, r2) == 0.0
|
||||||
|
|
||||||
|
|
||||||
def custom_semantic_equivalence_method(obj1, obj2, **weights):
|
def custom_semantic_equivalence_method(obj1, obj2, **weights):
|
||||||
|
@ -824,7 +824,7 @@ def test_semantic_equivalence_method_provided():
|
||||||
|
|
||||||
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
|
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
|
||||||
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
|
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
|
||||||
env = stix2.Environment().semantically_equivalent(tool1, tool2, **weights)
|
env = stix2.Environment().object_similarity(tool1, tool2, **weights)
|
||||||
assert round(env) == 96
|
assert round(env) == 96
|
||||||
|
|
||||||
|
|
||||||
|
@ -838,7 +838,7 @@ def test_semantic_equivalence_prop_scores():
|
||||||
|
|
||||||
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
|
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
|
||||||
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
|
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
|
||||||
stix2.Environment().semantically_equivalent(tool1, tool2, prop_scores)
|
stix2.Environment().object_similarity(tool1, tool2, prop_scores)
|
||||||
assert len(prop_scores) == 4
|
assert len(prop_scores) == 4
|
||||||
assert round(prop_scores["matching_score"], 1) == 8.9
|
assert round(prop_scores["matching_score"], 1) == 8.9
|
||||||
assert round(prop_scores["sum_weights"], 1) == 100.0
|
assert round(prop_scores["sum_weights"], 1) == 100.0
|
||||||
|
@ -868,7 +868,7 @@ def test_semantic_equivalence_prop_scores_method_provided():
|
||||||
|
|
||||||
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
|
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
|
||||||
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
|
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
|
||||||
env = stix2.Environment().semantically_equivalent(tool1, tool2, prop_scores, **weights)
|
env = stix2.Environment().object_similarity(tool1, tool2, prop_scores, **weights)
|
||||||
assert round(env) == 96
|
assert round(env) == 96
|
||||||
assert len(prop_scores) == 2
|
assert len(prop_scores) == 2
|
||||||
assert prop_scores["matching_score"] == 96.0
|
assert prop_scores["matching_score"] == 96.0
|
||||||
|
@ -964,12 +964,19 @@ def test_graph_equivalence_with_filesystem_source(ds):
|
||||||
"max_depth": 1,
|
"max_depth": 1,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
prop_scores = {}
|
prop_scores1 = {}
|
||||||
|
prop_scores2 = {}
|
||||||
fs = stix2.FileSystemSource(FS_PATH)
|
fs = stix2.FileSystemSource(FS_PATH)
|
||||||
env = stix2.Environment().graphically_equivalent(fs, ds, prop_scores, **weights)
|
env = stix2.Environment().graph_similarity(fs, ds, prop_scores1, **weights)
|
||||||
assert round(env) == 24
|
assert round(env) == 26
|
||||||
assert round(prop_scores["matching_score"]) == 122
|
assert round(prop_scores1["matching_score"]) == 460
|
||||||
assert round(prop_scores["sum_weights"]) == 500
|
assert round(prop_scores1["sum_weights"]) == 18
|
||||||
|
|
||||||
|
env = stix2.Environment().graph_similarity(ds, fs, prop_scores2, **weights)
|
||||||
|
assert round(env) == 47
|
||||||
|
assert round(prop_scores2["matching_score"]) == 852
|
||||||
|
assert round(prop_scores2["sum_weights"]) == 18
|
||||||
|
assert prop_scores1 == prop_scores2
|
||||||
|
|
||||||
|
|
||||||
def test_graph_equivalence_with_duplicate_graph(ds):
|
def test_graph_equivalence_with_duplicate_graph(ds):
|
||||||
|
@ -981,10 +988,10 @@ def test_graph_equivalence_with_duplicate_graph(ds):
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
prop_scores = {}
|
prop_scores = {}
|
||||||
env = stix2.Environment().graphically_equivalent(ds, ds, prop_scores, **weights)
|
env = stix2.Environment().graph_similarity(ds, ds, prop_scores, **weights)
|
||||||
assert round(env) == 100
|
assert round(env) == 100
|
||||||
assert round(prop_scores["matching_score"]) == 800
|
assert round(prop_scores["matching_score"]) == 800
|
||||||
assert round(prop_scores["sum_weights"]) == 800
|
assert round(prop_scores["sum_weights"]) == 8
|
||||||
|
|
||||||
|
|
||||||
def test_graph_equivalence_with_versioning_check_on(ds2, ds):
|
def test_graph_equivalence_with_versioning_check_on(ds2, ds):
|
||||||
|
@ -996,10 +1003,10 @@ def test_graph_equivalence_with_versioning_check_on(ds2, ds):
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
prop_scores = {}
|
prop_scores = {}
|
||||||
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
|
env = stix2.Environment().graph_similarity(ds, ds2, prop_scores, **weights)
|
||||||
assert round(env) == 93
|
assert round(env) == 93
|
||||||
assert round(prop_scores["matching_score"]) == 745
|
assert round(prop_scores["matching_score"]) == 745
|
||||||
assert round(prop_scores["sum_weights"]) == 800
|
assert round(prop_scores["sum_weights"]) == 8
|
||||||
|
|
||||||
|
|
||||||
def test_graph_equivalence_with_versioning_check_off(ds2, ds):
|
def test_graph_equivalence_with_versioning_check_off(ds2, ds):
|
||||||
|
@ -1011,7 +1018,7 @@ def test_graph_equivalence_with_versioning_check_off(ds2, ds):
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
prop_scores = {}
|
prop_scores = {}
|
||||||
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
|
env = stix2.Environment().graph_similarity(ds, ds2, prop_scores, **weights)
|
||||||
assert round(env) == 93
|
assert round(env) == 93
|
||||||
assert round(prop_scores["matching_score"]) == 745
|
assert round(prop_scores["matching_score"]) == 745
|
||||||
assert round(prop_scores["sum_weights"]) == 800
|
assert round(prop_scores["sum_weights"]) == 8
|
||||||
|
|
Loading…
Reference in New Issue