make changes according to feedback. allow for custom objects to be supplied to method
parent
e138753576
commit
4eaaee89dc
|
@ -7,6 +7,7 @@ import time
|
|||
|
||||
from .core import parse as _parse
|
||||
from .datastore import CompositeDataSource, DataStoreMixin
|
||||
from .exceptions import SemanticEquivalenceUnsupportedTypeError
|
||||
from .utils import STIXdatetime, parse_into_datetime
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -207,204 +208,98 @@ class Environment(DataStoreMixin):
|
|||
Returns:
|
||||
float: A number between 0.0 and 100.0 as a measurement of equivalence.
|
||||
|
||||
Warnings:
|
||||
Not all objects are supported.
|
||||
Warning:
|
||||
Course of Action, Intrusion-Set, Observed-Data, Report are not supported
|
||||
by this implementation. Indicator pattern check is also limited.
|
||||
|
||||
Notes:
|
||||
Note:
|
||||
This implementation follows the Committee Note on semantic equivalence.
|
||||
see `the Committee Note <link here>`__.
|
||||
|
||||
"""
|
||||
# default weights used for the semantic equivalence process
|
||||
weigths = {
|
||||
weights = {
|
||||
"attack-pattern": {
|
||||
"name": 30,
|
||||
"external_references": 70,
|
||||
"method": _attack_pattern_checks,
|
||||
},
|
||||
"campaign": {
|
||||
"name": 60,
|
||||
"aliases": 40,
|
||||
"method": _campaign_checks,
|
||||
},
|
||||
"course-of-action": {
|
||||
"method": _course_of_action_checks,
|
||||
},
|
||||
"identity": {
|
||||
"name": 60,
|
||||
"identity_class": 20,
|
||||
"sectors": 20,
|
||||
"method": _identity_checks,
|
||||
},
|
||||
"indicator": {
|
||||
"indicator_types": 15,
|
||||
"pattern": 80,
|
||||
"valid_from": 5,
|
||||
"tdelta": 1, # One day interval
|
||||
"method": _indicator_checks,
|
||||
},
|
||||
"intrusion-set": {
|
||||
"method": _intrusion_set_checks,
|
||||
},
|
||||
"location": {
|
||||
"longitude_latitude": 34,
|
||||
"region": 33,
|
||||
"country": 33,
|
||||
"method": _location_checks,
|
||||
},
|
||||
"malware": {
|
||||
"malware_types": 20,
|
||||
"name": 80,
|
||||
"method": _malware_checks,
|
||||
},
|
||||
"observed-data": {
|
||||
"method": _observed_data_checks,
|
||||
},
|
||||
"report": {
|
||||
"method": _report_checks,
|
||||
},
|
||||
"threat-actor": {
|
||||
"name": 60,
|
||||
"threat_actor_types": 20,
|
||||
"aliases": 20,
|
||||
"method": _threat_actor_checks,
|
||||
},
|
||||
"tool": {
|
||||
"tool_types": 20,
|
||||
"name": 80,
|
||||
"method": _tool_checks,
|
||||
},
|
||||
"vulnerability": {
|
||||
"name": 30,
|
||||
"external_references": 70,
|
||||
"method": _vulnerability_checks,
|
||||
},
|
||||
"_internal": {
|
||||
"tdelta": 1, # One day interval
|
||||
"ignore_spec_version": False,
|
||||
},
|
||||
}
|
||||
|
||||
if weight_dict:
|
||||
weigths.update(weight_dict)
|
||||
weights.update(weight_dict)
|
||||
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
type1, type2 = obj1["type"], obj2["type"]
|
||||
ignore_spec_version = weights["_internal"]["ignore_spec_version"]
|
||||
|
||||
if type1 != type2:
|
||||
raise ValueError('The objects to compare must be of the same type!')
|
||||
|
||||
if obj1.get("spec_version", "2.0") != obj2.get("spec_version", "2.0"):
|
||||
if ignore_spec_version is False and obj1.get("spec_version", "2.0") != obj2.get("spec_version", "2.0"):
|
||||
raise ValueError('The objects to compare must be of the same spec version!')
|
||||
|
||||
if type1 == "attack-pattern":
|
||||
if _check_property_present("name", obj1, obj2):
|
||||
w = weigths["attack-pattern"]["name"]
|
||||
sum_weights += w
|
||||
matching_score += w * _partial_string_based(obj1["name"], obj2["name"])
|
||||
if _check_property_present("external_references", obj1, obj2):
|
||||
w = weigths["attack-pattern"]["external_references"]
|
||||
sum_weights += w
|
||||
matching_score += (
|
||||
w *
|
||||
_partial_external_reference_based(obj1["external_references"], obj2["external_references"])
|
||||
)
|
||||
|
||||
elif type1 == "campaign":
|
||||
if _check_property_present("name", obj1, obj2):
|
||||
w = weigths["campaign"]["name"]
|
||||
sum_weights += w
|
||||
matching_score += w * _partial_string_based(obj1["name"], obj2["name"])
|
||||
if _check_property_present("aliases", obj1, obj2):
|
||||
w = weigths["campaign"]["aliases"]
|
||||
sum_weights += w
|
||||
matching_score += w * _partial_list_based(obj1["aliases"], obj2["aliases"])
|
||||
|
||||
elif type1 == "course-of-action":
|
||||
logger.warning("%s type has no semantic equivalence implementation", type1)
|
||||
return 0
|
||||
|
||||
elif type1 == "identity":
|
||||
if _check_property_present("name", obj1, obj2):
|
||||
w = weigths["identity"]["name"]
|
||||
sum_weights += w
|
||||
matching_score += w * _exact_match(obj1["name"], obj2["name"])
|
||||
if _check_property_present("identity_class", obj1, obj2):
|
||||
w = weigths["identity"]["identity_class"]
|
||||
sum_weights += w
|
||||
matching_score += w * _exact_match(obj1["identity_class"], obj2["identity_class"])
|
||||
if _check_property_present("sectors", obj1, obj2):
|
||||
w = weigths["identity"]["sectors"]
|
||||
sum_weights += w
|
||||
matching_score += w * _partial_list_based(obj1["sectors"], obj2["sectors"])
|
||||
|
||||
elif type1 == "indicator":
|
||||
if _check_property_present("indicator_types", obj1, obj2):
|
||||
w = weigths["indicator"]["indicator_types"]
|
||||
sum_weights += w
|
||||
matching_score += w * _partial_list_based(obj1["indicator_types"], obj2["indicator_types"])
|
||||
if _check_property_present("pattern", obj1, obj2):
|
||||
w = weigths["indicator"]["pattern"]
|
||||
sum_weights += w
|
||||
matching_score += w * _custom_pattern_based(obj1["pattern"], obj2["pattern"])
|
||||
if _check_property_present("valid_from", obj1, obj2):
|
||||
w = weigths["indicator"]["valid_from"]
|
||||
sum_weights += w
|
||||
matching_score += (
|
||||
w *
|
||||
_partial_timestamp_based(obj1["valid_from"], obj2["valid_from"], weigths["_internal"]["tdelta"])
|
||||
)
|
||||
|
||||
elif type1 == "intrusion-set":
|
||||
logger.warning("%s type has no semantic equivalence implementation", type1)
|
||||
return 0
|
||||
|
||||
elif type1 == "location":
|
||||
if _check_property_present("latitude", obj1, obj2) and _check_property_present("longitude", obj1, obj2):
|
||||
w = weigths["location"]["longitude_latitude"]
|
||||
sum_weights += w
|
||||
matching_score += (
|
||||
w *
|
||||
_partial_location_distance(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"])
|
||||
)
|
||||
if _check_property_present("region", obj1, obj2):
|
||||
w = weigths["location"]["region"]
|
||||
sum_weights += w
|
||||
matching_score += w * _exact_match(obj1["region"], obj2["region"])
|
||||
if _check_property_present("country", obj1, obj2):
|
||||
w = weigths["location"]["country"]
|
||||
sum_weights += w
|
||||
matching_score += w * _exact_match(obj1["country"], obj2["country"])
|
||||
|
||||
elif type1 == "malware":
|
||||
if _check_property_present("malware_types", obj1, obj2):
|
||||
w = weigths["malware"]["malware_types"]
|
||||
sum_weights += w
|
||||
matching_score += w * _partial_list_based(obj1["malware_types"], obj2["malware_types"])
|
||||
if _check_property_present("name", obj1, obj2):
|
||||
w = weigths["malware"]["name"]
|
||||
sum_weights += w
|
||||
matching_score += w * _partial_string_based(obj1["name"], obj2["name"])
|
||||
|
||||
elif type1 == "observed-data":
|
||||
logger.warning("%s type has no semantic equivalence implementation", type1)
|
||||
return 0
|
||||
|
||||
elif type1 == "report":
|
||||
logger.warning("%s type has no semantic equivalence implementation", type1)
|
||||
return 0
|
||||
|
||||
elif type1 == "threat-actor":
|
||||
if _check_property_present("name", obj1, obj2):
|
||||
w = weigths["threat-actor"]["name"]
|
||||
sum_weights += w
|
||||
matching_score += w * _partial_string_based(obj1["name"], obj2["name"])
|
||||
if _check_property_present("threat_actor_types", obj1, obj2):
|
||||
w = weigths["threat-actor"]["threat_actor_types"]
|
||||
sum_weights += w
|
||||
matching_score += w * _partial_list_based(obj1["threat_actor_types"], obj2["threat_actor_types"])
|
||||
if _check_property_present("aliases", obj1, obj2):
|
||||
w = weigths["threat-actor"]["aliases"]
|
||||
sum_weights += w
|
||||
matching_score += w * _partial_list_based(obj1["aliases"], obj2["aliases"])
|
||||
|
||||
elif type1 == "tool":
|
||||
if _check_property_present("tool_types", obj1, obj2):
|
||||
w = weigths["tool"]["tool_types"]
|
||||
sum_weights += w
|
||||
matching_score += w * _partial_list_based(obj1["tool_types"], obj2["tool_types"])
|
||||
if _check_property_present("name", obj1, obj2):
|
||||
w = weigths["tool"]["name"]
|
||||
sum_weights += w
|
||||
matching_score += w * _partial_string_based(obj1["name"], obj2["name"])
|
||||
|
||||
elif type1 == "vulnerability":
|
||||
if _check_property_present("name", obj1, obj2):
|
||||
w = weigths["vulnerability"]["name"]
|
||||
sum_weights += w
|
||||
matching_score += w * _partial_string_based(obj1["name"], obj2["name"])
|
||||
if _check_property_present("external_references", obj1, obj2):
|
||||
w = weigths["vulnerability"]["external_references"]
|
||||
sum_weights += w
|
||||
matching_score += w * _partial_external_reference_based(obj1["external_references"], obj2["external_references"])
|
||||
method = weights[type1]["method"]
|
||||
matching_score, sum_weights = method(obj1, obj2, **weights[type1])
|
||||
|
||||
if sum_weights <= 0:
|
||||
return 0
|
||||
|
@ -413,13 +308,13 @@ class Environment(DataStoreMixin):
|
|||
return equivalence_score
|
||||
|
||||
|
||||
def _check_property_present(prop, obj1, obj2):
|
||||
def check_property_present(prop, obj1, obj2):
|
||||
if prop in obj1 and prop in obj2:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _partial_timestamp_based(t1, t2, tdelta):
|
||||
def partial_timestamp_based(t1, t2, tdelta):
|
||||
if not isinstance(t1, STIXdatetime):
|
||||
t1 = parse_into_datetime(t1)
|
||||
if not isinstance(t2, STIXdatetime):
|
||||
|
@ -428,28 +323,28 @@ def _partial_timestamp_based(t1, t2, tdelta):
|
|||
return 1 - min(abs(t1 - t2) / (86400 * tdelta), 1)
|
||||
|
||||
|
||||
def _partial_list_based(l1, l2):
|
||||
def partial_list_based(l1, l2):
|
||||
l1_set, l2_set = set(l1), set(l2)
|
||||
return len(l1_set.intersection(l2_set)) / max(len(l1), len(l2))
|
||||
|
||||
|
||||
def _exact_match(val1, val2):
|
||||
def exact_match(val1, val2):
|
||||
if val1 == val2:
|
||||
return 1.0
|
||||
return 0.0
|
||||
|
||||
|
||||
def _partial_string_based(str1, str2):
|
||||
def partial_string_based(str1, str2):
|
||||
from pyjarowinkler import distance
|
||||
return distance.get_jaro_distance(str1, str2)
|
||||
|
||||
|
||||
def _custom_pattern_based(pattern1, pattern2):
|
||||
logger.warning("Checking for Indicator pattern equivalence is currently not implemented!")
|
||||
return 0 # TODO: Needs to be implemented
|
||||
def custom_pattern_based(pattern1, pattern2):
|
||||
logger.warning("Indicator pattern equivalence is not fully defined; will default to zero if not completely identical")
|
||||
return exact_match(pattern1, pattern2) # TODO: Implement pattern based equivalence
|
||||
|
||||
|
||||
def _partial_external_reference_based(refs1, refs2):
|
||||
def partial_external_reference_based(refs1, refs2):
|
||||
allowed = set(("veris", "cve", "capec", "mitre-attack"))
|
||||
matches = 0
|
||||
|
||||
|
@ -467,14 +362,14 @@ def _partial_external_reference_based(refs1, refs2):
|
|||
url_match = False
|
||||
source_name = None
|
||||
|
||||
if _check_property_present("source_name", ext_ref1, ext_ref2):
|
||||
if check_property_present("source_name", ext_ref1, ext_ref2):
|
||||
if ext_ref1["source_name"] == ext_ref2["source_name"]:
|
||||
source_name = ext_ref1["source_name"]
|
||||
sn_match = True
|
||||
if _check_property_present("external_id", ext_ref1, ext_ref2):
|
||||
if check_property_present("external_id", ext_ref1, ext_ref2):
|
||||
if ext_ref1["external_id"] == ext_ref2["external_id"]:
|
||||
ei_match = True
|
||||
if _check_property_present("url", ext_ref1, ext_ref2):
|
||||
if check_property_present("url", ext_ref1, ext_ref2):
|
||||
if ext_ref1["url"] == ext_ref2["url"]:
|
||||
url_match = True
|
||||
|
||||
|
@ -492,6 +387,176 @@ def _partial_external_reference_based(refs1, refs2):
|
|||
return matches / max(len(refs1), len(refs2))
|
||||
|
||||
|
||||
def _partial_location_distance(lat1, long1, lat2, long2):
|
||||
def partial_location_distance(lat1, long1, lat2, long2):
|
||||
distance = math.sqrt(((lat2 - lat1) ** 2) + ((long2 - long1) ** 2))
|
||||
return 1 - (distance / 1000.0)
|
||||
|
||||
|
||||
def _attack_pattern_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if check_property_present("name", obj1, obj2):
|
||||
w = weights["name"]
|
||||
sum_weights += w
|
||||
matching_score += w * partial_string_based(obj1["name"], obj2["name"])
|
||||
if check_property_present("external_references", obj1, obj2):
|
||||
w = weights["external_references"]
|
||||
sum_weights += w
|
||||
matching_score += (
|
||||
w *
|
||||
partial_external_reference_based(obj1["external_references"], obj2["external_references"])
|
||||
)
|
||||
return matching_score, sum_weights
|
||||
|
||||
|
||||
def _campaign_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if check_property_present("name", obj1, obj2):
|
||||
w = weights["name"]
|
||||
sum_weights += w
|
||||
matching_score += w * partial_string_based(obj1["name"], obj2["name"])
|
||||
if check_property_present("aliases", obj1, obj2):
|
||||
w = weights["aliases"]
|
||||
sum_weights += w
|
||||
matching_score += w * partial_list_based(obj1["aliases"], obj2["aliases"])
|
||||
return matching_score, sum_weights
|
||||
|
||||
|
||||
def _identity_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if check_property_present("name", obj1, obj2):
|
||||
w = weights["name"]
|
||||
sum_weights += w
|
||||
matching_score += w * exact_match(obj1["name"], obj2["name"])
|
||||
if check_property_present("identity_class", obj1, obj2):
|
||||
w = weights["identity_class"]
|
||||
sum_weights += w
|
||||
matching_score += w * exact_match(obj1["identity_class"], obj2["identity_class"])
|
||||
if check_property_present("sectors", obj1, obj2):
|
||||
w = weights["sectors"]
|
||||
sum_weights += w
|
||||
matching_score += w * partial_list_based(obj1["sectors"], obj2["sectors"])
|
||||
return matching_score, sum_weights
|
||||
|
||||
|
||||
def _indicator_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if check_property_present("indicator_types", obj1, obj2):
|
||||
w = weights["indicator_types"]
|
||||
sum_weights += w
|
||||
matching_score += w * partial_list_based(obj1["indicator_types"], obj2["indicator_types"])
|
||||
if check_property_present("pattern", obj1, obj2):
|
||||
w = weights["pattern"]
|
||||
sum_weights += w
|
||||
matching_score += w * custom_pattern_based(obj1["pattern"], obj2["pattern"])
|
||||
if check_property_present("valid_from", obj1, obj2):
|
||||
w = weights["valid_from"]
|
||||
sum_weights += w
|
||||
matching_score += (
|
||||
w *
|
||||
partial_timestamp_based(obj1["valid_from"], obj2["valid_from"], weights["tdelta"])
|
||||
)
|
||||
return matching_score, sum_weights
|
||||
|
||||
|
||||
def _location_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if check_property_present("latitude", obj1, obj2) and check_property_present("longitude", obj1, obj2):
|
||||
w = weights["longitude_latitude"]
|
||||
sum_weights += w
|
||||
matching_score += (
|
||||
w *
|
||||
partial_location_distance(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"])
|
||||
)
|
||||
if check_property_present("region", obj1, obj2):
|
||||
w = weights["region"]
|
||||
sum_weights += w
|
||||
matching_score += w * exact_match(obj1["region"], obj2["region"])
|
||||
if check_property_present("country", obj1, obj2):
|
||||
w = weights["country"]
|
||||
sum_weights += w
|
||||
matching_score += w * exact_match(obj1["country"], obj2["country"])
|
||||
return matching_score, sum_weights
|
||||
|
||||
|
||||
def _malware_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if check_property_present("malware_types", obj1, obj2):
|
||||
w = weights["malware_types"]
|
||||
sum_weights += w
|
||||
matching_score += w * partial_list_based(obj1["malware_types"], obj2["malware_types"])
|
||||
if check_property_present("name", obj1, obj2):
|
||||
w = weights["name"]
|
||||
sum_weights += w
|
||||
matching_score += w * partial_string_based(obj1["name"], obj2["name"])
|
||||
return matching_score, sum_weights
|
||||
|
||||
|
||||
def _threat_actor_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if check_property_present("name", obj1, obj2):
|
||||
w = weights["name"]
|
||||
sum_weights += w
|
||||
matching_score += w * partial_string_based(obj1["name"], obj2["name"])
|
||||
if check_property_present("threat_actor_types", obj1, obj2):
|
||||
w = weights["threat_actor_types"]
|
||||
sum_weights += w
|
||||
matching_score += w * partial_list_based(obj1["threat_actor_types"], obj2["threat_actor_types"])
|
||||
if check_property_present("aliases", obj1, obj2):
|
||||
w = weights["aliases"]
|
||||
sum_weights += w
|
||||
matching_score += w * partial_list_based(obj1["aliases"], obj2["aliases"])
|
||||
return matching_score, sum_weights
|
||||
|
||||
|
||||
def _tool_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if check_property_present("tool_types", obj1, obj2):
|
||||
w = weights["tool_types"]
|
||||
sum_weights += w
|
||||
matching_score += w * partial_list_based(obj1["tool_types"], obj2["tool_types"])
|
||||
if check_property_present("name", obj1, obj2):
|
||||
w = weights["name"]
|
||||
sum_weights += w
|
||||
matching_score += w * partial_string_based(obj1["name"], obj2["name"])
|
||||
return matching_score, sum_weights
|
||||
|
||||
|
||||
def _vulnerability_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if check_property_present("name", obj1, obj2):
|
||||
w = weights["name"]
|
||||
sum_weights += w
|
||||
matching_score += w * partial_string_based(obj1["name"], obj2["name"])
|
||||
if check_property_present("external_references", obj1, obj2):
|
||||
w = weights["external_references"]
|
||||
sum_weights += w
|
||||
matching_score += w * partial_external_reference_based(
|
||||
obj1["external_references"],
|
||||
obj2["external_references"],
|
||||
)
|
||||
return matching_score, sum_weights
|
||||
|
||||
|
||||
def _course_of_action_checks(obj1, obj2, **weights):
|
||||
raise SemanticEquivalenceUnsupportedTypeError("course-of-action type has no semantic equivalence implementation!")
|
||||
|
||||
|
||||
def _intrusion_set_checks(obj1, obj2, **weights):
|
||||
raise SemanticEquivalenceUnsupportedTypeError("intrusion-set type has no semantic equivalence implementation!")
|
||||
|
||||
|
||||
def _observed_data_checks(obj1, obj2, **weights):
|
||||
raise SemanticEquivalenceUnsupportedTypeError("observed-data type has no semantic equivalence implementation!")
|
||||
|
||||
|
||||
def _report_checks(obj1, obj2, **weights):
|
||||
raise SemanticEquivalenceUnsupportedTypeError("report type has no semantic equivalence implementation!")
|
||||
|
|
|
@ -216,3 +216,10 @@ class TLPMarkingDefinitionError(STIXError, AssertionError):
|
|||
def __str__(self):
|
||||
msg = "Marking {0} does not match spec marking {1}!"
|
||||
return msg.format(self.user_obj, self.spec_obj)
|
||||
|
||||
|
||||
class SemanticEquivalenceUnsupportedTypeError(STIXError, TypeError):
|
||||
"""STIX object type not supported by the semantic equivalence approach."""
|
||||
|
||||
def __init__(self, msg):
|
||||
super(SemanticEquivalenceUnsupportedTypeError, self).__init__(msg)
|
||||
|
|
|
@ -2,6 +2,7 @@ import pytest
|
|||
|
||||
import stix2
|
||||
import stix2.environment
|
||||
import stix2.exceptions
|
||||
|
||||
from .constants import (
|
||||
ATTACK_PATTERN_ID, ATTACK_PATTERN_KWARGS, CAMPAIGN_ID, CAMPAIGN_KWARGS,
|
||||
|
@ -444,7 +445,7 @@ def test_semantic_equivalence_on_same_indicator():
|
|||
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
||||
ind2 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
||||
env = stix2.Environment().semantically_equivalent(ind1, ind2)
|
||||
assert round(env) == 20 # No support for pattern, hence the 20
|
||||
assert round(env) == 100
|
||||
|
||||
|
||||
def test_semantic_equivalence_on_same_location1():
|
||||
|
@ -556,9 +557,36 @@ def test_semantic_equivalence_on_unknown_object():
|
|||
},
|
||||
],
|
||||
)
|
||||
|
||||
def _x_foobar_checks(obj1, obj2, **weights):
|
||||
matching_score = 0.0
|
||||
sum_weights = 0.0
|
||||
if stix2.environment.check_property_present("external_references", obj1, obj2):
|
||||
w = weights["external_references"]
|
||||
sum_weights += w
|
||||
matching_score += w * stix2.environment.partial_external_reference_based(
|
||||
obj1["external_references"],
|
||||
obj2["external_references"],
|
||||
)
|
||||
if stix2.environment.check_property_present("name", obj1, obj2):
|
||||
w = weights["name"]
|
||||
sum_weights += w
|
||||
matching_score += w * stix2.environment.partial_string_based(obj1["name"], obj2["name"])
|
||||
return matching_score, sum_weights
|
||||
|
||||
weights = {
|
||||
"x-foobar": {
|
||||
"external_references": 40,
|
||||
"name": 60,
|
||||
"method": _x_foobar_checks,
|
||||
},
|
||||
"_internal": {
|
||||
"ignore_spec_version": False,
|
||||
},
|
||||
}
|
||||
cust1 = stix2.parse(CUSTOM_KWARGS1, allow_custom=True)
|
||||
cust2 = stix2.parse(CUSTOM_KWARGS2, allow_custom=True)
|
||||
env = stix2.Environment().semantically_equivalent(cust1, cust2)
|
||||
env = stix2.Environment().semantically_equivalent(cust1, cust2, **weights)
|
||||
assert round(env) == 0
|
||||
|
||||
|
||||
|
@ -584,22 +612,35 @@ def test_semantic_equivalence_different_spec_version_raises():
|
|||
assert str(excinfo.value) == "The objects to compare must be of the same spec version!"
|
||||
|
||||
|
||||
def test_semantic_equivalence_on_unsupported_types():
|
||||
coa1 = stix2.v21.CourseOfAction(id=COURSE_OF_ACTION_ID, **COURSE_OF_ACTION_KWARGS)
|
||||
ints1 = stix2.v21.IntrusionSet(id=INTRUSION_SET_ID, **INTRUSION_SET_KWARGS)
|
||||
obs1 = stix2.v21.ObservedData(id=OBSERVED_DATA_ID, **OBSERVED_DATA_KWARGS)
|
||||
rep1 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
|
||||
|
||||
coa2 = stix2.v21.CourseOfAction(id=COURSE_OF_ACTION_ID, **COURSE_OF_ACTION_KWARGS)
|
||||
ints2 = stix2.v21.IntrusionSet(id=INTRUSION_SET_ID, **INTRUSION_SET_KWARGS)
|
||||
obs2 = stix2.v21.ObservedData(id=OBSERVED_DATA_ID, **OBSERVED_DATA_KWARGS)
|
||||
rep2 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
|
||||
|
||||
obj_list = [(coa1, coa2), (ints1, ints2), (obs1, obs2), (rep1, rep2)]
|
||||
|
||||
for obj1, obj2 in obj_list:
|
||||
env = stix2.Environment().semantically_equivalent(obj1, obj2)
|
||||
assert round(env) == 0
|
||||
@pytest.mark.parametrize(
|
||||
"obj1,obj2,ret_val",
|
||||
[
|
||||
(
|
||||
stix2.v21.CourseOfAction(id=COURSE_OF_ACTION_ID, **COURSE_OF_ACTION_KWARGS),
|
||||
stix2.v21.CourseOfAction(id=COURSE_OF_ACTION_ID, **COURSE_OF_ACTION_KWARGS),
|
||||
"course-of-action type has no semantic equivalence implementation!",
|
||||
),
|
||||
(
|
||||
stix2.v21.IntrusionSet(id=INTRUSION_SET_ID, **INTRUSION_SET_KWARGS),
|
||||
stix2.v21.IntrusionSet(id=INTRUSION_SET_ID, **INTRUSION_SET_KWARGS),
|
||||
"intrusion-set type has no semantic equivalence implementation!",
|
||||
),
|
||||
(
|
||||
stix2.v21.ObservedData(id=OBSERVED_DATA_ID, **OBSERVED_DATA_KWARGS),
|
||||
stix2.v21.ObservedData(id=OBSERVED_DATA_ID, **OBSERVED_DATA_KWARGS),
|
||||
"observed-data type has no semantic equivalence implementation!",
|
||||
),
|
||||
(
|
||||
stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS),
|
||||
stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS),
|
||||
"report type has no semantic equivalence implementation!",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_semantic_equivalence_on_unsupported_types(obj1, obj2, ret_val):
|
||||
with pytest.raises(stix2.exceptions.SemanticEquivalenceUnsupportedTypeError) as excinfo:
|
||||
stix2.Environment().semantically_equivalent(obj1, obj2)
|
||||
assert ret_val == str(excinfo.value)
|
||||
|
||||
|
||||
def test_semantic_equivalence_zero_match():
|
||||
|
@ -607,54 +648,21 @@ def test_semantic_equivalence_zero_match():
|
|||
indicator_types=["APTX"],
|
||||
pattern="[ipv4-addr:value = '192.168.1.1']",
|
||||
)
|
||||
weigths = {
|
||||
"attack-pattern": {
|
||||
"name": 30,
|
||||
"external_references": 70,
|
||||
},
|
||||
"campaign": {
|
||||
"name": 60,
|
||||
"aliases": 40,
|
||||
},
|
||||
"identity": {
|
||||
"name": 60,
|
||||
"identity_class": 20,
|
||||
"sectors": 20,
|
||||
},
|
||||
weights = {
|
||||
"indicator": {
|
||||
"indicator_types": 15,
|
||||
"pattern": 85,
|
||||
"pattern": 80,
|
||||
"valid_from": 0,
|
||||
},
|
||||
"location": {
|
||||
"longitude_latitude": 34,
|
||||
"region": 33,
|
||||
"country": 33,
|
||||
},
|
||||
"malware": {
|
||||
"malware_types": 20,
|
||||
"name": 80,
|
||||
},
|
||||
"threat-actor": {
|
||||
"name": 60,
|
||||
"threat_actor_types": 20,
|
||||
"aliases": 20,
|
||||
},
|
||||
"tool": {
|
||||
"tool_types": 20,
|
||||
"name": 80,
|
||||
},
|
||||
"vulnerability": {
|
||||
"name": 30,
|
||||
"external_references": 70,
|
||||
"tdelta": 1, # One day interval
|
||||
"method": stix2.environment._indicator_checks,
|
||||
},
|
||||
"_internal": {
|
||||
"tdelta": 1,
|
||||
"ignore_spec_version": False,
|
||||
},
|
||||
}
|
||||
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
||||
ind2 = stix2.v21.Indicator(id=INDICATOR_ID, **IND_KWARGS)
|
||||
env = stix2.Environment().semantically_equivalent(ind1, ind2, **weigths)
|
||||
env = stix2.Environment().semantically_equivalent(ind1, ind2, **weights)
|
||||
assert round(env) == 0
|
||||
|
||||
|
||||
|
@ -727,17 +735,17 @@ def test_semantic_equivalence_zero_match():
|
|||
],
|
||||
)
|
||||
def test_semantic_equivalence_external_references(refs1, refs2, ret_val):
|
||||
value = stix2.environment._partial_external_reference_based(refs1, refs2)
|
||||
value = stix2.environment.partial_external_reference_based(refs1, refs2)
|
||||
assert value == ret_val
|
||||
|
||||
|
||||
def test_semantic_equivalence_timetamp():
|
||||
t1 = "2018-10-17T00:14:20.652Z"
|
||||
t2 = "2018-10-17T12:14:20.652Z"
|
||||
assert stix2.environment._partial_timestamp_based(t1, t2, 1) == 0.5
|
||||
assert stix2.environment.partial_timestamp_based(t1, t2, 1) == 0.5
|
||||
|
||||
|
||||
def test_semantic_equivalence_exact_match():
|
||||
t1 = "2018-10-17T00:14:20.652Z"
|
||||
t2 = "2018-10-17T12:14:20.652Z"
|
||||
assert stix2.environment._exact_match(t1, t2) == 0.0
|
||||
assert stix2.environment.exact_match(t1, t2) == 0.0
|
||||
|
|
Loading…
Reference in New Issue