diff --git a/stix2/environment.py b/stix2/environment.py index d2c6d3a..34e0a04 100644 --- a/stix2/environment.py +++ b/stix2/environment.py @@ -6,7 +6,6 @@ import time from .core import parse as _parse from .datastore import CompositeDataSource, DataStoreMixin -from .exceptions import SemanticEquivalenceUnsupportedTypeError from .utils import STIXdatetime, parse_into_datetime logger = logging.getLogger(__name__) @@ -228,9 +227,6 @@ class Environment(DataStoreMixin): "aliases": 40, "method": _campaign_checks, }, - "course-of-action": { - "method": _course_of_action_checks, - }, "identity": { "name": 60, "identity_class": 20, @@ -244,9 +240,6 @@ class Environment(DataStoreMixin): "tdelta": 1, # One day interval "method": _indicator_checks, }, - "intrusion-set": { - "method": _intrusion_set_checks, - }, "location": { "longitude_latitude": 34, "region": 33, @@ -259,12 +252,6 @@ class Environment(DataStoreMixin): "name": 80, "method": _malware_checks, }, - "observed-data": { - "method": _observed_data_checks, - }, - "report": { - "method": _report_checks, - }, "threat-actor": { "name": 60, "threat_actor_types": 20, @@ -298,8 +285,14 @@ class Environment(DataStoreMixin): if ignore_spec_version is False and obj1.get("spec_version", "2.0") != obj2.get("spec_version", "2.0"): raise ValueError('The objects to compare must be of the same spec version!') - method = weights[type1]["method"] - matching_score, sum_weights = method(obj1, obj2, **weights[type1]) + try: + method = weights[type1]["method"] + except KeyError: + logger.warning("'%s' type has no semantic equivalence method to call!", type1) + sum_weights = matching_score = 0 + else: + logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"]) + matching_score, sum_weights = method(obj1, obj2, **weights[type1]) if sum_weights <= 0: return 0 @@ -333,7 +326,9 @@ def partial_timestamp_based(t1, t2, tdelta): if not isinstance(t2, STIXdatetime): t2 = parse_into_datetime(t2) t1, t2 = time.mktime(t1.timetuple()), time.mktime(t2.timetuple()) - return 1 - min(abs(t1 - t2) / (86400 * tdelta), 1) + result = 1 - min(abs(t1 - t2) / (86400 * tdelta), 1) + logger.debug("--\t\tpartial_timestamp_based '%s' '%s' tdelta: '%s'\tresult: '%s'", t1, t2, tdelta, result) + return result def partial_list_based(l1, l2): @@ -348,7 +343,9 @@ def partial_list_based(l1, l2): """ l1_set, l2_set = set(l1), set(l2) - return len(l1_set.intersection(l2_set)) / max(len(l1), len(l2)) + result = len(l1_set.intersection(l2_set)) / max(len(l1), len(l2)) + logger.debug("--\t\tpartial_list_based '%s' '%s'\tresult: '%s'", l1, l2, result) + return result def exact_match(val1, val2): @@ -362,9 +359,11 @@ def exact_match(val1, val2): float: 1.0 if the value matches exactly, 0.0 otherwise. """ + result = 0.0 if val1 == val2: - return 1.0 - return 0.0 + result = 1.0 + logger.debug("--\t\texact_match '%s' '%s'\tresult: '%s'", val1, val2, result) + return result def partial_string_based(str1, str2): @@ -379,7 +378,9 @@ def partial_string_based(str1, str2): """ from pyjarowinkler import distance - return distance.get_jaro_distance(str1, str2) + result = distance.get_jaro_distance(str1, str2) + logger.debug("--\t\tpartial_string_based '%s' '%s'\tresult: '%s'", str1, str2, result) + return result def custom_pattern_based(pattern1, pattern2): @@ -440,14 +441,24 @@ def partial_external_reference_based(refs1, refs2): # external_id or url match then its a perfect match and other entries # can be ignored. if sn_match and (ei_match or url_match) and source_name in allowed: - return 1.0 + result = 1.0 + logger.debug( + "--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'", + refs1, refs2, result, + ) + return result # Regular check. If the source_name (not STIX-defined) or external_id or # url matches then we consider the entry a match. if (sn_match or ei_match or url_match) and source_name not in allowed: matches += 1 - return matches / max(len(refs1), len(refs2)) + result = matches / max(len(refs1), len(refs2)) + logger.debug( + "--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'", + refs1, refs2, result, + ) + return result def partial_location_distance(lat1, long1, lat2, long2, threshold): @@ -466,7 +477,12 @@ def partial_location_distance(lat1, long1, lat2, long2, threshold): """ from haversine import haversine, Unit distance = haversine((lat1, long1), (lat2, long2), unit=Unit.KILOMETERS) - return 1 - (distance / threshold) + result = 1 - (distance / threshold) + logger.debug( + "--\t\tpartial_location_distance '%s' '%s' threshold: '%s'\tresult: '%s'", + (lat1, long1), (lat2, long2), threshold, result, + ) + return result def _attack_pattern_checks(obj1, obj2, **weights): @@ -474,15 +490,19 @@ def _attack_pattern_checks(obj1, obj2, **weights): sum_weights = 0.0 if check_property_present("name", obj1, obj2): w = weights["name"] + contributing_score = w * partial_string_based(obj1["name"], obj2["name"]) sum_weights += w - matching_score += w * partial_string_based(obj1["name"], obj2["name"]) + matching_score += contributing_score + logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score) if check_property_present("external_references", obj1, obj2): w = weights["external_references"] - sum_weights += w - matching_score += ( - w * - partial_external_reference_based(obj1["external_references"], obj2["external_references"]) + contributing_score = ( + w * partial_external_reference_based(obj1["external_references"], obj2["external_references"]) ) + sum_weights += w + matching_score += contributing_score + logger.debug("'external_references' check -- weight: %s, contributing score: %s", w, contributing_score) + logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights) return matching_score, sum_weights @@ -491,12 +511,17 @@ def _campaign_checks(obj1, obj2, **weights): sum_weights = 0.0 if check_property_present("name", obj1, obj2): w = weights["name"] + contributing_score = w * partial_string_based(obj1["name"], obj2["name"]) sum_weights += w - matching_score += w * partial_string_based(obj1["name"], obj2["name"]) + matching_score += contributing_score + logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score) if check_property_present("aliases", obj1, obj2): w = weights["aliases"] + contributing_score = w * partial_list_based(obj1["aliases"], obj2["aliases"]) sum_weights += w - matching_score += w * partial_list_based(obj1["aliases"], obj2["aliases"]) + matching_score += contributing_score + logger.debug("'aliases' check -- weight: %s, contributing score: %s", w, contributing_score) + logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights) return matching_score, sum_weights @@ -505,16 +530,23 @@ def _identity_checks(obj1, obj2, **weights): sum_weights = 0.0 if check_property_present("name", obj1, obj2): w = weights["name"] + contributing_score = w * exact_match(obj1["name"], obj2["name"]) sum_weights += w - matching_score += w * exact_match(obj1["name"], obj2["name"]) + matching_score += contributing_score + logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score) if check_property_present("identity_class", obj1, obj2): w = weights["identity_class"] + contributing_score = w * exact_match(obj1["identity_class"], obj2["identity_class"]) sum_weights += w - matching_score += w * exact_match(obj1["identity_class"], obj2["identity_class"]) + matching_score += contributing_score + logger.debug("'identity_class' check -- weight: %s, contributing score: %s", w, contributing_score) if check_property_present("sectors", obj1, obj2): w = weights["sectors"] + contributing_score = w * partial_list_based(obj1["sectors"], obj2["sectors"]) sum_weights += w - matching_score += w * partial_list_based(obj1["sectors"], obj2["sectors"]) + matching_score += contributing_score + logger.debug("'sectors' check -- weight: %s, contributing score: %s", w, contributing_score) + logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights) return matching_score, sum_weights @@ -523,19 +555,26 @@ def _indicator_checks(obj1, obj2, **weights): sum_weights = 0.0 if check_property_present("indicator_types", obj1, obj2): w = weights["indicator_types"] + contributing_score = w * partial_list_based(obj1["indicator_types"], obj2["indicator_types"]) sum_weights += w - matching_score += w * partial_list_based(obj1["indicator_types"], obj2["indicator_types"]) + matching_score += contributing_score + logger.debug("'indicator_types' check -- weight: %s, contributing score: %s", w, contributing_score) if check_property_present("pattern", obj1, obj2): w = weights["pattern"] + contributing_score = w * custom_pattern_based(obj1["pattern"], obj2["pattern"]) sum_weights += w - matching_score += w * custom_pattern_based(obj1["pattern"], obj2["pattern"]) + matching_score += contributing_score + logger.debug("'pattern' check -- weight: %s, contributing score: %s", w, contributing_score) if check_property_present("valid_from", obj1, obj2): w = weights["valid_from"] - sum_weights += w - matching_score += ( + contributing_score = ( w * partial_timestamp_based(obj1["valid_from"], obj2["valid_from"], weights["tdelta"]) ) + sum_weights += w + matching_score += contributing_score + logger.debug("'valid_from' check -- weight: %s, contributing score: %s", w, contributing_score) + logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights) return matching_score, sum_weights @@ -544,19 +583,26 @@ def _location_checks(obj1, obj2, **weights): sum_weights = 0.0 if check_property_present("latitude", obj1, obj2) and check_property_present("longitude", obj1, obj2): w = weights["longitude_latitude"] - sum_weights += w - matching_score += ( + contributing_score = ( w * partial_location_distance(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], weights["threshold"]) ) + sum_weights += w + matching_score += contributing_score + logger.debug("'longitude_latitude' check -- weight: %s, contributing score: %s", w, contributing_score) if check_property_present("region", obj1, obj2): w = weights["region"] + contributing_score = w * exact_match(obj1["region"], obj2["region"]) sum_weights += w - matching_score += w * exact_match(obj1["region"], obj2["region"]) + matching_score += contributing_score + logger.debug("'region' check -- weight: %s, contributing score: %s", w, contributing_score) if check_property_present("country", obj1, obj2): w = weights["country"] + contributing_score = w * exact_match(obj1["country"], obj2["country"]) sum_weights += w - matching_score += w * exact_match(obj1["country"], obj2["country"]) + matching_score += contributing_score + logger.debug("'country' check -- weight: %s, contributing score: %s", w, contributing_score) + logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights) return matching_score, sum_weights @@ -565,12 +611,17 @@ def _malware_checks(obj1, obj2, **weights): sum_weights = 0.0 if check_property_present("malware_types", obj1, obj2): w = weights["malware_types"] + contributing_score = w * partial_list_based(obj1["malware_types"], obj2["malware_types"]) sum_weights += w - matching_score += w * partial_list_based(obj1["malware_types"], obj2["malware_types"]) + matching_score += contributing_score + logger.debug("'malware_types' check -- weight: %s, contributing score: %s", w, contributing_score) if check_property_present("name", obj1, obj2): w = weights["name"] + contributing_score = w * partial_string_based(obj1["name"], obj2["name"]) sum_weights += w - matching_score += w * partial_string_based(obj1["name"], obj2["name"]) + matching_score += contributing_score + logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score) + logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights) return matching_score, sum_weights @@ -579,16 +630,23 @@ def _threat_actor_checks(obj1, obj2, **weights): sum_weights = 0.0 if check_property_present("name", obj1, obj2): w = weights["name"] + contributing_score = w * partial_string_based(obj1["name"], obj2["name"]) sum_weights += w - matching_score += w * partial_string_based(obj1["name"], obj2["name"]) + matching_score += contributing_score + logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score) if check_property_present("threat_actor_types", obj1, obj2): w = weights["threat_actor_types"] + contributing_score = w * partial_list_based(obj1["threat_actor_types"], obj2["threat_actor_types"]) sum_weights += w - matching_score += w * partial_list_based(obj1["threat_actor_types"], obj2["threat_actor_types"]) + matching_score += contributing_score + logger.debug("'threat_actor_types' check -- weight: %s, contributing score: %s", w, contributing_score) if check_property_present("aliases", obj1, obj2): w = weights["aliases"] + contributing_score = w * partial_list_based(obj1["aliases"], obj2["aliases"]) sum_weights += w - matching_score += w * partial_list_based(obj1["aliases"], obj2["aliases"]) + matching_score += contributing_score + logger.debug("'aliases' check -- weight: %s, contributing score: %s", w, contributing_score) + logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights) return matching_score, sum_weights @@ -597,12 +655,17 @@ def _tool_checks(obj1, obj2, **weights): sum_weights = 0.0 if check_property_present("tool_types", obj1, obj2): w = weights["tool_types"] + contributing_score = w * partial_list_based(obj1["tool_types"], obj2["tool_types"]) sum_weights += w - matching_score += w * partial_list_based(obj1["tool_types"], obj2["tool_types"]) + matching_score += contributing_score + logger.debug("'tool_types' check -- weight: %s, contributing score: %s", w, contributing_score) if check_property_present("name", obj1, obj2): w = weights["name"] + contributing_score = w * partial_string_based(obj1["name"], obj2["name"]) sum_weights += w - matching_score += w * partial_string_based(obj1["name"], obj2["name"]) + matching_score += contributing_score + logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score) + logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights) return matching_score, sum_weights @@ -611,29 +674,18 @@ def _vulnerability_checks(obj1, obj2, **weights): sum_weights = 0.0 if check_property_present("name", obj1, obj2): w = weights["name"] + contributing_score = w * partial_string_based(obj1["name"], obj2["name"]) sum_weights += w - matching_score += w * partial_string_based(obj1["name"], obj2["name"]) + matching_score += contributing_score + logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score) if check_property_present("external_references", obj1, obj2): w = weights["external_references"] - sum_weights += w - matching_score += w * partial_external_reference_based( + contributing_score = w * partial_external_reference_based( obj1["external_references"], obj2["external_references"], ) + sum_weights += w + matching_score += contributing_score + logger.debug("'external_references' check -- weight: %s, contributing score: %s", w, contributing_score) + logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights) return matching_score, sum_weights - - -def _course_of_action_checks(obj1, obj2, **weights): - raise SemanticEquivalenceUnsupportedTypeError("course-of-action type has no semantic equivalence implementation!") - - -def _intrusion_set_checks(obj1, obj2, **weights): - raise SemanticEquivalenceUnsupportedTypeError("intrusion-set type has no semantic equivalence implementation!") - - -def _observed_data_checks(obj1, obj2, **weights): - raise SemanticEquivalenceUnsupportedTypeError("observed-data type has no semantic equivalence implementation!") - - -def _report_checks(obj1, obj2, **weights): - raise SemanticEquivalenceUnsupportedTypeError("report type has no semantic equivalence implementation!") diff --git a/stix2/exceptions.py b/stix2/exceptions.py index 6405c2e..d2ec3fc 100644 --- a/stix2/exceptions.py +++ b/stix2/exceptions.py @@ -233,10 +233,3 @@ class STIXDeprecationWarning(DeprecationWarning): Represents usage of a deprecated component of a STIX specification. """ pass - - -class SemanticEquivalenceUnsupportedTypeError(STIXError, TypeError): - """STIX object type not supported by the semantic equivalence approach.""" - - def __init__(self, msg): - super(SemanticEquivalenceUnsupportedTypeError, self).__init__(msg) diff --git a/stix2/test/v21/test_environment.py b/stix2/test/v21/test_environment.py index 62b0c53..a049b25 100644 --- a/stix2/test/v21/test_environment.py +++ b/stix2/test/v21/test_environment.py @@ -6,12 +6,10 @@ import stix2.exceptions from .constants import ( ATTACK_PATTERN_ID, ATTACK_PATTERN_KWARGS, CAMPAIGN_ID, CAMPAIGN_KWARGS, - COURSE_OF_ACTION_ID, COURSE_OF_ACTION_KWARGS, FAKE_TIME, IDENTITY_ID, - IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS, INTRUSION_SET_ID, - INTRUSION_SET_KWARGS, LOCATION_ID, MALWARE_ID, MALWARE_KWARGS, - OBSERVED_DATA_ID, OBSERVED_DATA_KWARGS, RELATIONSHIP_IDS, REPORT_ID, - REPORT_KWARGS, THREAT_ACTOR_ID, THREAT_ACTOR_KWARGS, TOOL_ID, TOOL_KWARGS, - VULNERABILITY_ID, VULNERABILITY_KWARGS, + FAKE_TIME, IDENTITY_ID, IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS, + LOCATION_ID, MALWARE_ID, MALWARE_KWARGS, RELATIONSHIP_IDS, THREAT_ACTOR_ID, + THREAT_ACTOR_KWARGS, TOOL_ID, TOOL_KWARGS, VULNERABILITY_ID, + VULNERABILITY_KWARGS, ) @@ -615,37 +613,6 @@ def test_semantic_equivalence_different_spec_version_raises(): assert str(excinfo.value) == "The objects to compare must be of the same spec version!" -@pytest.mark.parametrize( - "obj1,obj2,ret_val", - [ - ( - stix2.v21.CourseOfAction(id=COURSE_OF_ACTION_ID, **COURSE_OF_ACTION_KWARGS), - stix2.v21.CourseOfAction(id=COURSE_OF_ACTION_ID, **COURSE_OF_ACTION_KWARGS), - "course-of-action type has no semantic equivalence implementation!", - ), - ( - stix2.v21.IntrusionSet(id=INTRUSION_SET_ID, **INTRUSION_SET_KWARGS), - stix2.v21.IntrusionSet(id=INTRUSION_SET_ID, **INTRUSION_SET_KWARGS), - "intrusion-set type has no semantic equivalence implementation!", - ), - ( - stix2.v21.ObservedData(id=OBSERVED_DATA_ID, **OBSERVED_DATA_KWARGS), - stix2.v21.ObservedData(id=OBSERVED_DATA_ID, **OBSERVED_DATA_KWARGS), - "observed-data type has no semantic equivalence implementation!", - ), - ( - stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS), - stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS), - "report type has no semantic equivalence implementation!", - ), - ], -) -def test_semantic_equivalence_on_unsupported_types(obj1, obj2, ret_val): - with pytest.raises(stix2.exceptions.SemanticEquivalenceUnsupportedTypeError) as excinfo: - stix2.Environment().semantically_equivalent(obj1, obj2) - assert ret_val == str(excinfo.value) - - def test_semantic_equivalence_zero_match(): IND_KWARGS = dict( indicator_types=["APTX"], @@ -767,7 +734,7 @@ def test_semantic_equivalence_external_references(refs1, refs2, ret_val): assert value == ret_val -def test_semantic_equivalence_timetamp(): +def test_semantic_equivalence_timestamp(): t1 = "2018-10-17T00:14:20.652Z" t2 = "2018-10-17T12:14:20.652Z" assert stix2.environment.partial_timestamp_based(t1, t2, 1) == 0.5