diff --git a/setup.py b/setup.py index 07de2a4..185c76c 100644 --- a/setup.py +++ b/setup.py @@ -63,5 +63,6 @@ setup( }, extras_require={ 'taxii': ['taxii2-client'], + 'semantic': ['pyjarowinkler'], }, ) diff --git a/stix2/environment.py b/stix2/environment.py index 104fdb2..8049c0d 100644 --- a/stix2/environment.py +++ b/stix2/environment.py @@ -186,3 +186,127 @@ class Environment(DataStoreMixin): return self.get(creator_id) else: return None + + def semantically_equivalent(self, obj1, obj2): + """This method is meant to verify if two objects of the same type are + semantically equivalent. + + Args: + obj1: A stix2 object instance + obj2: A stix2 object instance + + Returns: + float: A number between 0.0 and 1.0 as a measurement of equivalence. + + Warnings: + Not all objects are supported. + + Notes: + This implementation follows the Committee Note on semantic equivalence. + see `the Committee Note `__. + + """ + equivalence_score = 0.0 + type1, type2 = obj1["type"], obj2["type"] + + if type1 != type2: + raise ValueError('The objects to compare must be of the same type!') + + if obj1.get("spec_version", "") != obj2.get("spec_version", ""): + raise ValueError('The objects to compare must be of the same spec version!') + + if type1 == "attack-pattern": + if _check_property_present("name", obj1, obj2): + _partial_string_based(obj1["name"], obj2["name"]) + if _check_property_present("external_references", obj1, obj2): + _partial_external_reference_based(obj1["external_references"], obj2["external_references"]) + elif type1 == "campaign": + if _check_property_present("name", obj1, obj2): + _partial_string_based(obj1["name"], obj2["name"]) + if _check_property_present("aliases", obj1, obj2): + _partial_list_based(obj1["aliases"], obj2["aliases"]) + elif type1 == "course-of-action": + pass + elif type1 == "identity": + if _check_property_present("name", obj1, obj2): + _exact_match(obj1["name"], obj2["name"]) + if _check_property_present("identity_class", obj1, obj2): + _exact_match(obj1["identity_class"], obj2["identity_class"]) + if _check_property_present("sectors", obj1, obj2): + _partial_list_based(obj1["sectors"], obj2["sectors"]) + elif type1 == "indicator": + if _check_property_present("indicator_types", obj1, obj2): + _partial_list_based(obj1["indicator_types"], obj2["indicator_types"]) + if _check_property_present("pattern", obj1, obj2): + pass # TODO: needs to be done + if _check_property_present("valid_from", obj1, obj2): + _partial_timestamp_based(obj1["valid_from"], obj2["valid_from"]) + elif type1 == "instrusion-set": + pass + elif type1 == "location": + pass + elif type1 == "malware": + if _check_property_present("malware_types", obj1, obj2): + _partial_list_based(obj1["malware_types"], obj2["malware_types"]) + if _check_property_present("name", obj1, obj2): + _partial_string_based(obj1["name"], obj2["name"]) + elif type1 == "observed-data": + pass + elif type1 == "report": + pass + elif type1 == "threat-actor": + if _check_property_present("name", obj1, obj2): + _partial_string_based(obj1["name"], obj2["name"]) + if _check_property_present("threat_actor_types", obj1, obj2): + _partial_list_based(obj1["threat_actor_types"], obj2["threat_actor_types"]) + if _check_property_present("aliases", obj1, obj2): + _partial_list_based(obj1["aliases"], obj2["aliases"]) + elif type1 == "tool": + if _check_property_present("tool_types", obj1, obj2): + _partial_list_based(obj1["tool_types"], obj2["tool_types"]) + if _check_property_present("name", obj1, obj2): + _partial_string_based(obj1["name"], obj2["name"]) + elif type1 == "vulnerability": + if _check_property_present("name", obj1, obj2): + _partial_string_based(obj1["name"], obj2["name"]) + if _check_property_present("external_references", obj1, obj2): + _partial_external_reference_based(obj1["external_references"], obj2["external_references"]) + # TODO: need to actually calculate the value + return equivalence_score + + +def _check_property_present(prop, obj1, obj2): + if prop in obj1 and prop in obj2: + return True + return False + + +def _partial_timestamp_based(t1, t2): + from .utils import parse_into_datetime + tdelta = 1 # One day... + stix_t1, stix_t2 = parse_into_datetime(t1), parse_into_datetime(t2) + return 1 - min(abs(stix_t1.timestamp() - stix_t2.timestamp()) / (86400 * tdelta), 1) + + +def _partial_list_based(l1, l2): + l1_set, l2_set = set(l1), set(l2) + return len(l1_set.intersection(l2_set)) / max(len(l1_set), len(l2_set)) + + +def _exact_match(val1, val2): + if val1 == val2: + return 1.0 + return 0.0 + + +def _partial_string_based(str1, str2): + from pyjarowinkler import distance + return distance.get_jaro_distance(str1, str2) + + +def _partial_external_reference_based(refs1, refs2): + pass # TODO: needs to be done + + +def _partial_location_distance(loc1, loc2): + pass # TODO: needs to be done