diff --git a/stix2/environment.py b/stix2/environment.py index 8049c0d..41d9de1 100644 --- a/stix2/environment.py +++ b/stix2/environment.py @@ -1,10 +1,14 @@ """Python STIX2 Environment API.""" import copy +import logging +import math from .core import parse as _parse from .datastore import CompositeDataSource, DataStoreMixin +logger = logging.getLogger(__name__) + class ObjectFactory(object): """Easily create STIX objects with default values for certain properties. @@ -187,13 +191,16 @@ class Environment(DataStoreMixin): else: return None - def semantically_equivalent(self, obj1, obj2): + @staticmethod + def semantically_equivalent(obj1, obj2, **weight_dict): """This method is meant to verify if two objects of the same type are semantically equivalent. Args: obj1: A stix2 object instance obj2: A stix2 object instance + weight_dict: A dictionary that can be used to override settings + in the semantic equivalence process Returns: float: A number between 0.0 and 1.0 as a measurement of equivalence. @@ -206,7 +213,58 @@ class Environment(DataStoreMixin): see `the Committee Note `__. """ - equivalence_score = 0.0 + # default weights used for the semantic equivalence process + weigths = { + "attack-pattern": { + "name": 30, + "external_references": 70, + }, + "campaign": { + "name": 60, + "aliases": 40, + }, + "identity": { + "name": 60, + "identity_class": 20, + "sectors": 20, + }, + "indicator": { + "indicator_types": 15, + "pattern": 80, + "valid_from": 5, + }, + "location": { + "longitude_latitude": 34, + "region": 33, + "country": 33, + }, + "malware": { + "malware_types": 20, + "name": 80, + }, + "threat-actor": { + "name": 60, + "threat_actor_types": 20, + "aliases": 20, + }, + "tool": { + "tool_types": 20, + "name": 80, + }, + "vulnerability": { + "name": 30, + "external_references": 70, + }, + "_internal": { + "tdelta": 1, + }, + } + + if weight_dict: + weigths.update(weight_dict) + + matching_score = 0.0 + sum_weights = 0.0 type1, type2 = obj1["type"], obj2["type"] if type1 != type2: @@ -217,61 +275,132 @@ class Environment(DataStoreMixin): if type1 == "attack-pattern": if _check_property_present("name", obj1, obj2): - _partial_string_based(obj1["name"], obj2["name"]) + w = weigths["attack-pattern"]["name"] + sum_weights += w + matching_score += w * _partial_string_based(obj1["name"], obj2["name"]) if _check_property_present("external_references", obj1, obj2): - _partial_external_reference_based(obj1["external_references"], obj2["external_references"]) + w = weigths["attack-pattern"]["external_references"] + sum_weights += w + matching_score += ( + w * + _partial_external_reference_based(obj1["external_references"], obj2["external_references"]) + ) + elif type1 == "campaign": if _check_property_present("name", obj1, obj2): - _partial_string_based(obj1["name"], obj2["name"]) + w = weigths["campaign"]["name"] + sum_weights += w + matching_score += w * _partial_string_based(obj1["name"], obj2["name"]) if _check_property_present("aliases", obj1, obj2): - _partial_list_based(obj1["aliases"], obj2["aliases"]) + w = weigths["campaign"]["aliases"] + sum_weights += w + matching_score += w * _partial_list_based(obj1["aliases"], obj2["aliases"]) + elif type1 == "course-of-action": - pass + logger.warning("%s type is not supported for semantic equivalence", type1) + elif type1 == "identity": if _check_property_present("name", obj1, obj2): - _exact_match(obj1["name"], obj2["name"]) + w = weigths["identity"]["name"] + sum_weights += w + matching_score += w * _exact_match(obj1["name"], obj2["name"]) if _check_property_present("identity_class", obj1, obj2): - _exact_match(obj1["identity_class"], obj2["identity_class"]) + w = weigths["identity"]["identity_class"] + sum_weights += w + matching_score += w * _exact_match(obj1["identity_class"], obj2["identity_class"]) if _check_property_present("sectors", obj1, obj2): - _partial_list_based(obj1["sectors"], obj2["sectors"]) + w = weigths["identity"]["sectors"] + sum_weights += w + matching_score += w * _partial_list_based(obj1["sectors"], obj2["sectors"]) + elif type1 == "indicator": if _check_property_present("indicator_types", obj1, obj2): - _partial_list_based(obj1["indicator_types"], obj2["indicator_types"]) + w = weigths["indicator"]["indicator_types"] + sum_weights += w + matching_score += w * _partial_list_based(obj1["indicator_types"], obj2["indicator_types"]) if _check_property_present("pattern", obj1, obj2): - pass # TODO: needs to be done + w = weigths["indicator"]["pattern"] + sum_weights += w + matching_score += w * _custom_pattern_based(obj1["pattern"], obj2["pattern"]) if _check_property_present("valid_from", obj1, obj2): - _partial_timestamp_based(obj1["valid_from"], obj2["valid_from"]) + w = weigths["indicator"]["valid_from"] + sum_weights += w + matching_score += ( + w * + _partial_timestamp_based(obj1["valid_from"], obj2["valid_from"], weigths["_internal"]["tdelta"]) + ) + elif type1 == "instrusion-set": - pass + logger.warning("%s type is not supported for semantic equivalence", type1) + elif type1 == "location": - pass + if _check_property_present("latitude", obj1, obj2) and _check_property_present("longitude", obj1, obj2): + w = weigths["location"]["longitude_latitude"] + sum_weights += w + matching_score += ( + w * + _partial_location_distance(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"]) + ) + if _check_property_present("region", obj1, obj2): + w = weigths["location"]["region"] + sum_weights += w + matching_score += w * _exact_match(obj1["region"], obj2["region"]) + if _check_property_present("country", obj1, obj2): + w = weigths["location"]["country"] + sum_weights += w + matching_score += w * _exact_match(obj1["country"], obj2["country"]) + elif type1 == "malware": if _check_property_present("malware_types", obj1, obj2): - _partial_list_based(obj1["malware_types"], obj2["malware_types"]) + w = weigths["malware"]["malware_types"] + sum_weights += w + matching_score += w * _partial_list_based(obj1["malware_types"], obj2["malware_types"]) if _check_property_present("name", obj1, obj2): - _partial_string_based(obj1["name"], obj2["name"]) + w = weigths["malware"]["name"] + sum_weights += w + matching_score += w * _partial_string_based(obj1["name"], obj2["name"]) + elif type1 == "observed-data": - pass + logger.warning("%s type is not supported for semantic equivalence", type1) + elif type1 == "report": - pass + logger.warning("%s type is not supported for semantic equivalence", type1) + elif type1 == "threat-actor": if _check_property_present("name", obj1, obj2): - _partial_string_based(obj1["name"], obj2["name"]) + w = weigths["threat-actor"]["name"] + sum_weights += w + matching_score += w * _partial_string_based(obj1["name"], obj2["name"]) if _check_property_present("threat_actor_types", obj1, obj2): - _partial_list_based(obj1["threat_actor_types"], obj2["threat_actor_types"]) + w = weigths["threat-actor"]["threat_actor_types"] + sum_weights += w + matching_score += w * _partial_list_based(obj1["threat_actor_types"], obj2["threat_actor_types"]) if _check_property_present("aliases", obj1, obj2): - _partial_list_based(obj1["aliases"], obj2["aliases"]) + w = weigths["threat-actor"]["aliases"] + sum_weights += w + matching_score += w * _partial_list_based(obj1["aliases"], obj2["aliases"]) + elif type1 == "tool": if _check_property_present("tool_types", obj1, obj2): - _partial_list_based(obj1["tool_types"], obj2["tool_types"]) + w = weigths["tool"]["tool_types"] + sum_weights += w + matching_score += w * _partial_list_based(obj1["tool_types"], obj2["tool_types"]) if _check_property_present("name", obj1, obj2): - _partial_string_based(obj1["name"], obj2["name"]) + w = weigths["tool"]["name"] + sum_weights += w + matching_score += w * _partial_string_based(obj1["name"], obj2["name"]) + elif type1 == "vulnerability": if _check_property_present("name", obj1, obj2): - _partial_string_based(obj1["name"], obj2["name"]) + w = weigths["vulnerability"]["name"] + sum_weights += w + matching_score += w * _partial_string_based(obj1["name"], obj2["name"]) if _check_property_present("external_references", obj1, obj2): - _partial_external_reference_based(obj1["external_references"], obj2["external_references"]) - # TODO: need to actually calculate the value + w = weigths["vulnerability"]["external_references"] + sum_weights += w + matching_score += w * _partial_external_reference_based(obj1["external_references"], obj2["external_references"]) + + equivalence_score = (matching_score / sum_weights) * 100.0 return equivalence_score @@ -281,16 +410,15 @@ def _check_property_present(prop, obj1, obj2): return False -def _partial_timestamp_based(t1, t2): +def _partial_timestamp_based(t1, t2, tdelta): from .utils import parse_into_datetime - tdelta = 1 # One day... stix_t1, stix_t2 = parse_into_datetime(t1), parse_into_datetime(t2) return 1 - min(abs(stix_t1.timestamp() - stix_t2.timestamp()) / (86400 * tdelta), 1) def _partial_list_based(l1, l2): l1_set, l2_set = set(l1), set(l2) - return len(l1_set.intersection(l2_set)) / max(len(l1_set), len(l2_set)) + return len(l1_set.intersection(l2_set)) / max(len(l1), len(l2)) def _exact_match(val1, val2): @@ -304,9 +432,53 @@ def _partial_string_based(str1, str2): return distance.get_jaro_distance(str1, str2) +def _custom_pattern_based(pattern1, pattern2): + return 0 # TODO: Needs to be implemented + + def _partial_external_reference_based(refs1, refs2): - pass # TODO: needs to be done + allowed = set(("veris", "cve", "capec", "mitre-attack")) + matches = 0 + + if len(refs1) >= len(refs2): + l1 = refs1 + l2 = refs2 + else: + l1 = refs2 + l2 = refs1 + + for ext_ref1 in l1: + for ext_ref2 in l2: + sn_match = False + ei_match = False + url_match = False + source_name = None + + if _check_property_present("source_name", ext_ref1, ext_ref2): + if ext_ref1["source_name"] == ext_ref2["source_name"]: + source_name = ext_ref1["source_name"] + sn_match = True + if _check_property_present("external_id", ext_ref1, ext_ref2): + if ext_ref1["external_id"] == ext_ref2["external_id"]: + ei_match = True + if _check_property_present("url", ext_ref1, ext_ref2): + if ext_ref1["url"] == ext_ref2["url"]: + url_match = True + + # Special case: if source_name is a STIX defined name and either + # external_id or url match then its a perfect match and other entries + # can be ignored. + if sn_match and (ei_match or url_match) and source_name in allowed: + return 1.0 + + # Regular check. If the source_name (not STIX-defined) or external_id or + # url matches then we consider the entry a match. + if (sn_match or ei_match or url_match) and source_name not in allowed: + matches += 1 + + return matches / max(len(refs1), len(refs2)) -def _partial_location_distance(loc1, loc2): - pass # TODO: needs to be done +def _partial_location_distance(lat1, long1, lat2, long2): + distance = math.sqrt(((lat2 - lat1) ** 2) + ((long2 - long1) ** 2)) + return 1 - (distance / 1000.0)