commit
74eeabab77
|
@ -55,6 +55,7 @@ coverage.xml
|
|||
# Sphinx documentation
|
||||
docs/_build/
|
||||
.ipynb_checkpoints
|
||||
default_sem_eq_weights.rst
|
||||
|
||||
# PyBuilder
|
||||
target/
|
||||
|
|
10
docs/conf.py
10
docs/conf.py
|
@ -1,4 +1,5 @@
|
|||
import datetime
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
|
@ -7,6 +8,7 @@ from six import class_types
|
|||
from sphinx.ext.autodoc import ClassDocumenter
|
||||
|
||||
from stix2.base import _STIXBase
|
||||
from stix2.environment import WEIGHTS
|
||||
from stix2.version import __version__
|
||||
|
||||
sys.path.insert(0, os.path.abspath('..'))
|
||||
|
@ -59,6 +61,14 @@ latex_documents = [
|
|||
(master_doc, 'stix2.tex', 'stix2 Documentation', 'OASIS', 'manual'),
|
||||
]
|
||||
|
||||
# Add a formatted version of environment.WEIGHTS
|
||||
default_sem_eq_weights = json.dumps(WEIGHTS, indent=4, default=lambda o: o.__name__)
|
||||
default_sem_eq_weights = default_sem_eq_weights.replace('\n', '\n ')
|
||||
default_sem_eq_weights = default_sem_eq_weights.replace(' "', ' ')
|
||||
default_sem_eq_weights = default_sem_eq_weights.replace('"\n', '\n')
|
||||
with open('default_sem_eq_weights.rst', 'w') as f:
|
||||
f.write(".. code-block:: py\n\n {}\n\n".format(default_sem_eq_weights))
|
||||
|
||||
|
||||
def get_property_type(prop):
|
||||
"""Convert property classname into pretty string name of property.
|
||||
|
|
File diff suppressed because it is too large
Load Diff
2
setup.py
2
setup.py
|
@ -64,6 +64,6 @@ setup(
|
|||
},
|
||||
extras_require={
|
||||
'taxii': ['taxii2-client'],
|
||||
'semantic': ['haversine', 'pyjarowinkler'],
|
||||
'semantic': ['haversine', 'fuzzywuzzy'],
|
||||
},
|
||||
)
|
||||
|
|
|
@ -193,7 +193,7 @@ class Environment(DataStoreMixin):
|
|||
return None
|
||||
|
||||
@staticmethod
|
||||
def semantically_equivalent(obj1, obj2, **weight_dict):
|
||||
def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
|
||||
"""This method is meant to verify if two objects of the same type are
|
||||
semantically equivalent.
|
||||
|
||||
|
@ -210,68 +210,17 @@ class Environment(DataStoreMixin):
|
|||
Course of Action, Intrusion-Set, Observed-Data, Report are not supported
|
||||
by this implementation. Indicator pattern check is also limited.
|
||||
|
||||
Note:
|
||||
Default weights_dict:
|
||||
|
||||
.. include:: ../default_sem_eq_weights.rst
|
||||
|
||||
Note:
|
||||
This implementation follows the Committee Note on semantic equivalence.
|
||||
see `the Committee Note <link here>`__.
|
||||
|
||||
"""
|
||||
# default weights used for the semantic equivalence process
|
||||
weights = {
|
||||
"attack-pattern": {
|
||||
"name": 30,
|
||||
"external_references": 70,
|
||||
"method": _attack_pattern_checks,
|
||||
},
|
||||
"campaign": {
|
||||
"name": 60,
|
||||
"aliases": 40,
|
||||
"method": _campaign_checks,
|
||||
},
|
||||
"identity": {
|
||||
"name": 60,
|
||||
"identity_class": 20,
|
||||
"sectors": 20,
|
||||
"method": _identity_checks,
|
||||
},
|
||||
"indicator": {
|
||||
"indicator_types": 15,
|
||||
"pattern": 80,
|
||||
"valid_from": 5,
|
||||
"tdelta": 1, # One day interval
|
||||
"method": _indicator_checks,
|
||||
},
|
||||
"location": {
|
||||
"longitude_latitude": 34,
|
||||
"region": 33,
|
||||
"country": 33,
|
||||
"threshold": 1000.0,
|
||||
"method": _location_checks,
|
||||
},
|
||||
"malware": {
|
||||
"malware_types": 20,
|
||||
"name": 80,
|
||||
"method": _malware_checks,
|
||||
},
|
||||
"threat-actor": {
|
||||
"name": 60,
|
||||
"threat_actor_types": 20,
|
||||
"aliases": 20,
|
||||
"method": _threat_actor_checks,
|
||||
},
|
||||
"tool": {
|
||||
"tool_types": 20,
|
||||
"name": 80,
|
||||
"method": _tool_checks,
|
||||
},
|
||||
"vulnerability": {
|
||||
"name": 30,
|
||||
"external_references": 70,
|
||||
"method": _vulnerability_checks,
|
||||
},
|
||||
"_internal": {
|
||||
"ignore_spec_version": False,
|
||||
},
|
||||
}
|
||||
weights = WEIGHTS.copy()
|
||||
|
||||
if weight_dict:
|
||||
weights.update(weight_dict)
|
||||
|
@ -286,17 +235,54 @@ class Environment(DataStoreMixin):
|
|||
raise ValueError('The objects to compare must be of the same spec version!')
|
||||
|
||||
try:
|
||||
method = weights[type1]["method"]
|
||||
weights[type1]
|
||||
except KeyError:
|
||||
logger.warning("'%s' type has no semantic equivalence method to call!", type1)
|
||||
logger.warning("'%s' type has no 'weights' dict specified & thus no semantic equivalence method to call!", type1)
|
||||
sum_weights = matching_score = 0
|
||||
else:
|
||||
logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"])
|
||||
matching_score, sum_weights = method(obj1, obj2, **weights[type1])
|
||||
try:
|
||||
method = weights[type1]["method"]
|
||||
except KeyError:
|
||||
logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"])
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
|
||||
for prop in weights[type1]:
|
||||
if check_property_present(prop, obj1, obj2) or prop == "longitude_latitude":
|
||||
w = weights[type1][prop][0]
|
||||
comp_funct = weights[type1][prop][1]
|
||||
|
||||
if comp_funct == partial_timestamp_based:
|
||||
contributing_score = w * comp_funct(obj1[prop], obj2[prop], weights[type1]["tdelta"])
|
||||
elif comp_funct == partial_location_distance:
|
||||
threshold = weights[type1]["threshold"]
|
||||
contributing_score = w * comp_funct(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], threshold)
|
||||
else:
|
||||
contributing_score = w * comp_funct(obj1[prop], obj2[prop])
|
||||
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
|
||||
prop_scores[prop] = {
|
||||
"weight": w,
|
||||
"contributing_score": contributing_score,
|
||||
}
|
||||
logger.debug("'%s' check -- weight: %s, contributing score: %s", prop, w, contributing_score)
|
||||
|
||||
prop_scores["matching_score"] = matching_score
|
||||
prop_scores["sum_weights"] = sum_weights
|
||||
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
|
||||
else:
|
||||
logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"])
|
||||
try:
|
||||
matching_score, sum_weights = method(obj1, obj2, prop_scores, **weights[type1])
|
||||
except TypeError:
|
||||
# method doesn't support detailed output with prop_scores
|
||||
matching_score, sum_weights = method(obj1, obj2, **weights[type1])
|
||||
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
|
||||
|
||||
if sum_weights <= 0:
|
||||
return 0
|
||||
|
||||
equivalence_score = (matching_score / sum_weights) * 100.0
|
||||
return equivalence_score
|
||||
|
||||
|
@ -377,10 +363,10 @@ def partial_string_based(str1, str2):
|
|||
float: Number between 0.0 and 1.0 depending on match criteria.
|
||||
|
||||
"""
|
||||
from pyjarowinkler import distance
|
||||
result = distance.get_jaro_distance(str1, str2)
|
||||
from fuzzywuzzy import fuzz
|
||||
result = fuzz.token_sort_ratio(str1, str2, force_ascii=False)
|
||||
logger.debug("--\t\tpartial_string_based '%s' '%s'\tresult: '%s'", str1, str2, result)
|
||||
return result
|
||||
return result / 100.0
|
||||
|
||||
|
||||
def custom_pattern_based(pattern1, pattern2):
|
||||
|
@ -485,207 +471,51 @@ def partial_location_distance(lat1, long1, lat2, long2, threshold):
|
|||
return result
|
||||
|
||||
|
||||
def _attack_pattern_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if check_property_present("name", obj1, obj2):
|
||||
w = weights["name"]
|
||||
contributing_score = w * partial_string_based(obj1["name"], obj2["name"])
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
if check_property_present("external_references", obj1, obj2):
|
||||
w = weights["external_references"]
|
||||
contributing_score = (
|
||||
w * partial_external_reference_based(obj1["external_references"], obj2["external_references"])
|
||||
)
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'external_references' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
|
||||
return matching_score, sum_weights
|
||||
|
||||
|
||||
def _campaign_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if check_property_present("name", obj1, obj2):
|
||||
w = weights["name"]
|
||||
contributing_score = w * partial_string_based(obj1["name"], obj2["name"])
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
if check_property_present("aliases", obj1, obj2):
|
||||
w = weights["aliases"]
|
||||
contributing_score = w * partial_list_based(obj1["aliases"], obj2["aliases"])
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'aliases' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
|
||||
return matching_score, sum_weights
|
||||
|
||||
|
||||
def _identity_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if check_property_present("name", obj1, obj2):
|
||||
w = weights["name"]
|
||||
contributing_score = w * exact_match(obj1["name"], obj2["name"])
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
if check_property_present("identity_class", obj1, obj2):
|
||||
w = weights["identity_class"]
|
||||
contributing_score = w * exact_match(obj1["identity_class"], obj2["identity_class"])
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'identity_class' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
if check_property_present("sectors", obj1, obj2):
|
||||
w = weights["sectors"]
|
||||
contributing_score = w * partial_list_based(obj1["sectors"], obj2["sectors"])
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'sectors' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
|
||||
return matching_score, sum_weights
|
||||
|
||||
|
||||
def _indicator_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if check_property_present("indicator_types", obj1, obj2):
|
||||
w = weights["indicator_types"]
|
||||
contributing_score = w * partial_list_based(obj1["indicator_types"], obj2["indicator_types"])
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'indicator_types' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
if check_property_present("pattern", obj1, obj2):
|
||||
w = weights["pattern"]
|
||||
contributing_score = w * custom_pattern_based(obj1["pattern"], obj2["pattern"])
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'pattern' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
if check_property_present("valid_from", obj1, obj2):
|
||||
w = weights["valid_from"]
|
||||
contributing_score = (
|
||||
w *
|
||||
partial_timestamp_based(obj1["valid_from"], obj2["valid_from"], weights["tdelta"])
|
||||
)
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'valid_from' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
|
||||
return matching_score, sum_weights
|
||||
|
||||
|
||||
def _location_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if check_property_present("latitude", obj1, obj2) and check_property_present("longitude", obj1, obj2):
|
||||
w = weights["longitude_latitude"]
|
||||
contributing_score = (
|
||||
w *
|
||||
partial_location_distance(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], weights["threshold"])
|
||||
)
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'longitude_latitude' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
if check_property_present("region", obj1, obj2):
|
||||
w = weights["region"]
|
||||
contributing_score = w * exact_match(obj1["region"], obj2["region"])
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'region' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
if check_property_present("country", obj1, obj2):
|
||||
w = weights["country"]
|
||||
contributing_score = w * exact_match(obj1["country"], obj2["country"])
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'country' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
|
||||
return matching_score, sum_weights
|
||||
|
||||
|
||||
def _malware_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if check_property_present("malware_types", obj1, obj2):
|
||||
w = weights["malware_types"]
|
||||
contributing_score = w * partial_list_based(obj1["malware_types"], obj2["malware_types"])
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'malware_types' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
if check_property_present("name", obj1, obj2):
|
||||
w = weights["name"]
|
||||
contributing_score = w * partial_string_based(obj1["name"], obj2["name"])
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
|
||||
return matching_score, sum_weights
|
||||
|
||||
|
||||
def _threat_actor_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if check_property_present("name", obj1, obj2):
|
||||
w = weights["name"]
|
||||
contributing_score = w * partial_string_based(obj1["name"], obj2["name"])
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
if check_property_present("threat_actor_types", obj1, obj2):
|
||||
w = weights["threat_actor_types"]
|
||||
contributing_score = w * partial_list_based(obj1["threat_actor_types"], obj2["threat_actor_types"])
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'threat_actor_types' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
if check_property_present("aliases", obj1, obj2):
|
||||
w = weights["aliases"]
|
||||
contributing_score = w * partial_list_based(obj1["aliases"], obj2["aliases"])
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'aliases' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
|
||||
return matching_score, sum_weights
|
||||
|
||||
|
||||
def _tool_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if check_property_present("tool_types", obj1, obj2):
|
||||
w = weights["tool_types"]
|
||||
contributing_score = w * partial_list_based(obj1["tool_types"], obj2["tool_types"])
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'tool_types' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
if check_property_present("name", obj1, obj2):
|
||||
w = weights["name"]
|
||||
contributing_score = w * partial_string_based(obj1["name"], obj2["name"])
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
|
||||
return matching_score, sum_weights
|
||||
|
||||
|
||||
def _vulnerability_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if check_property_present("name", obj1, obj2):
|
||||
w = weights["name"]
|
||||
contributing_score = w * partial_string_based(obj1["name"], obj2["name"])
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
if check_property_present("external_references", obj1, obj2):
|
||||
w = weights["external_references"]
|
||||
contributing_score = w * partial_external_reference_based(
|
||||
obj1["external_references"],
|
||||
obj2["external_references"],
|
||||
)
|
||||
sum_weights += w
|
||||
matching_score += contributing_score
|
||||
logger.debug("'external_references' check -- weight: %s, contributing score: %s", w, contributing_score)
|
||||
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
|
||||
return matching_score, sum_weights
|
||||
# default weights used for the semantic equivalence process
|
||||
WEIGHTS = {
|
||||
"attack-pattern": {
|
||||
"name": (30, partial_string_based),
|
||||
"external_references": (70, partial_external_reference_based),
|
||||
},
|
||||
"campaign": {
|
||||
"name": (60, partial_string_based),
|
||||
"aliases": (40, partial_list_based),
|
||||
},
|
||||
"identity": {
|
||||
"name": (60, partial_string_based),
|
||||
"identity_class": (20, exact_match),
|
||||
"sectors": (20, partial_list_based),
|
||||
},
|
||||
"indicator": {
|
||||
"indicator_types": (15, partial_list_based),
|
||||
"pattern": (80, custom_pattern_based),
|
||||
"valid_from": (5, partial_timestamp_based),
|
||||
"tdelta": 1, # One day interval
|
||||
},
|
||||
"location": {
|
||||
"longitude_latitude": (34, partial_location_distance),
|
||||
"region": (33, exact_match),
|
||||
"country": (33, exact_match),
|
||||
"threshold": 1000.0,
|
||||
},
|
||||
"malware": {
|
||||
"malware_types": (20, partial_list_based),
|
||||
"name": (80, partial_string_based),
|
||||
},
|
||||
"threat-actor": {
|
||||
"name": (60, partial_string_based),
|
||||
"threat_actor_types": (20, partial_list_based),
|
||||
"aliases": (20, partial_list_based),
|
||||
},
|
||||
"tool": {
|
||||
"tool_types": (20, partial_list_based),
|
||||
"name": (80, partial_string_based),
|
||||
},
|
||||
"vulnerability": {
|
||||
"name": (30, partial_string_based),
|
||||
"external_references": (70, partial_external_reference_based),
|
||||
},
|
||||
"_internal": {
|
||||
"ignore_spec_version": False,
|
||||
},
|
||||
} #: :autodoc-skip:
|
||||
|
|
|
@ -521,7 +521,7 @@ def test_semantic_equivalence_on_same_vulnerability2():
|
|||
],
|
||||
)
|
||||
VULN_KWARGS2 = dict(
|
||||
name="Zot",
|
||||
name="Foo",
|
||||
external_references=[
|
||||
{
|
||||
"url": "https://example2",
|
||||
|
@ -550,7 +550,7 @@ def test_semantic_equivalence_on_unknown_object():
|
|||
CUSTOM_KWARGS2 = dict(
|
||||
type="x-foobar",
|
||||
id="x-foobar--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061",
|
||||
name="Zot",
|
||||
name="Foo",
|
||||
external_references=[
|
||||
{
|
||||
"url": "https://example2",
|
||||
|
@ -622,11 +622,10 @@ def test_semantic_equivalence_zero_match():
|
|||
)
|
||||
weights = {
|
||||
"indicator": {
|
||||
"indicator_types": 15,
|
||||
"pattern": 80,
|
||||
"valid_from": 0,
|
||||
"indicator_types": (15, stix2.environment.partial_list_based),
|
||||
"pattern": (80, stix2.environment.custom_pattern_based),
|
||||
"valid_from": (5, stix2.environment.partial_timestamp_based),
|
||||
"tdelta": 1, # One day interval
|
||||
"method": stix2.environment._indicator_checks,
|
||||
},
|
||||
"_internal": {
|
||||
"ignore_spec_version": False,
|
||||
|
@ -645,11 +644,10 @@ def test_semantic_equivalence_different_spec_version():
|
|||
)
|
||||
weights = {
|
||||
"indicator": {
|
||||
"indicator_types": 15,
|
||||
"pattern": 80,
|
||||
"valid_from": 0,
|
||||
"indicator_types": (15, stix2.environment.partial_list_based),
|
||||
"pattern": (80, stix2.environment.custom_pattern_based),
|
||||
"valid_from": (5, stix2.environment.partial_timestamp_based),
|
||||
"tdelta": 1, # One day interval
|
||||
"method": stix2.environment._indicator_checks,
|
||||
},
|
||||
"_internal": {
|
||||
"ignore_spec_version": True, # Disables spec_version check.
|
||||
|
@ -750,3 +748,75 @@ def test_non_existent_config_for_object():
|
|||
r1 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
|
||||
r2 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
|
||||
assert stix2.Environment().semantically_equivalent(r1, r2) == 0.0
|
||||
|
||||
|
||||
def custom_semantic_equivalence_method(obj1, obj2, **weights):
|
||||
return 96.0, 100.0
|
||||
|
||||
|
||||
def test_semantic_equivalence_method_provided():
|
||||
# Because `method` is provided, `partial_list_based` will be ignored
|
||||
TOOL2_KWARGS = dict(
|
||||
name="Random Software",
|
||||
tool_types=["information-gathering"],
|
||||
)
|
||||
|
||||
weights = {
|
||||
"tool": {
|
||||
"tool_types": (20, stix2.environment.partial_list_based),
|
||||
"name": (80, stix2.environment.partial_string_based),
|
||||
"method": custom_semantic_equivalence_method,
|
||||
},
|
||||
}
|
||||
|
||||
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
|
||||
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
|
||||
env = stix2.Environment().semantically_equivalent(tool1, tool2, **weights)
|
||||
assert round(env) == 96
|
||||
|
||||
|
||||
def test_semantic_equivalence_prop_scores():
|
||||
TOOL2_KWARGS = dict(
|
||||
name="Random Software",
|
||||
tool_types=["information-gathering"],
|
||||
)
|
||||
|
||||
prop_scores = {}
|
||||
|
||||
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
|
||||
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
|
||||
stix2.Environment().semantically_equivalent(tool1, tool2, prop_scores)
|
||||
assert len(prop_scores) == 4
|
||||
assert round(prop_scores["matching_score"], 1) == 8.8
|
||||
assert round(prop_scores["sum_weights"], 1) == 100.0
|
||||
|
||||
|
||||
def custom_semantic_equivalence_method_prop_scores(obj1, obj2, prop_scores, **weights):
|
||||
prop_scores["matching_score"] = 96.0
|
||||
prop_scores["sum_weights"] = 100.0
|
||||
return 96.0, 100.0
|
||||
|
||||
|
||||
def test_semantic_equivalence_prop_scores_method_provided():
|
||||
TOOL2_KWARGS = dict(
|
||||
name="Random Software",
|
||||
tool_types=["information-gathering"],
|
||||
)
|
||||
|
||||
weights = {
|
||||
"tool": {
|
||||
"tool_types": 20,
|
||||
"name": 80,
|
||||
"method": custom_semantic_equivalence_method_prop_scores,
|
||||
},
|
||||
}
|
||||
|
||||
prop_scores = {}
|
||||
|
||||
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
|
||||
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
|
||||
env = stix2.Environment().semantically_equivalent(tool1, tool2, prop_scores, **weights)
|
||||
assert round(env) == 96
|
||||
assert len(prop_scores) == 2
|
||||
assert prop_scores["matching_score"] == 96.0
|
||||
assert prop_scores["sum_weights"] == 100.0
|
||||
|
|
Loading…
Reference in New Issue