diff --git a/.isort.cfg b/.isort.cfg index d644f60..db580a5 100644 --- a/.isort.cfg +++ b/.isort.cfg @@ -4,7 +4,9 @@ not_skip = __init__.py known_third_party = antlr4, dateutil, + haversine, medallion, + pyjarowinkler, pytest, pytz, requests, diff --git a/.travis.yml b/.travis.yml index 261f125..c05ec72 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,16 +1,13 @@ sudo: false language: python cache: pip +dist: xenial python: - "2.7" - "3.4" - "3.5" - "3.6" -matrix: - include: - - python: 3.7 # https://github.com/travis-ci/travis-ci/issues/9069#issuecomment-425720905 - dist: xenial - sudo: true + - "3.7" install: - pip install -U pip setuptools - pip install tox-travis pre-commit diff --git a/setup.py b/setup.py index 497bf01..ea20795 100644 --- a/setup.py +++ b/setup.py @@ -64,5 +64,6 @@ setup( }, extras_require={ 'taxii': ['taxii2-client'], + 'semantic': ['haversine', 'pyjarowinkler'], }, ) diff --git a/stix2/environment.py b/stix2/environment.py index 104fdb2..d2c6d3a 100644 --- a/stix2/environment.py +++ b/stix2/environment.py @@ -1,9 +1,15 @@ """Python STIX2 Environment API.""" import copy +import logging +import time from .core import parse as _parse from .datastore import CompositeDataSource, DataStoreMixin +from .exceptions import SemanticEquivalenceUnsupportedTypeError +from .utils import STIXdatetime, parse_into_datetime + +logger = logging.getLogger(__name__) class ObjectFactory(object): @@ -186,3 +192,448 @@ class Environment(DataStoreMixin): return self.get(creator_id) else: return None + + @staticmethod + def semantically_equivalent(obj1, obj2, **weight_dict): + """This method is meant to verify if two objects of the same type are + semantically equivalent. + + Args: + obj1: A stix2 object instance + obj2: A stix2 object instance + weight_dict: A dictionary that can be used to override settings + in the semantic equivalence process + + Returns: + float: A number between 0.0 and 100.0 as a measurement of equivalence. + + Warning: + Course of Action, Intrusion-Set, Observed-Data, Report are not supported + by this implementation. Indicator pattern check is also limited. + + Note: + This implementation follows the Committee Note on semantic equivalence. + see `the Committee Note `__. + + """ + # default weights used for the semantic equivalence process + weights = { + "attack-pattern": { + "name": 30, + "external_references": 70, + "method": _attack_pattern_checks, + }, + "campaign": { + "name": 60, + "aliases": 40, + "method": _campaign_checks, + }, + "course-of-action": { + "method": _course_of_action_checks, + }, + "identity": { + "name": 60, + "identity_class": 20, + "sectors": 20, + "method": _identity_checks, + }, + "indicator": { + "indicator_types": 15, + "pattern": 80, + "valid_from": 5, + "tdelta": 1, # One day interval + "method": _indicator_checks, + }, + "intrusion-set": { + "method": _intrusion_set_checks, + }, + "location": { + "longitude_latitude": 34, + "region": 33, + "country": 33, + "threshold": 1000.0, + "method": _location_checks, + }, + "malware": { + "malware_types": 20, + "name": 80, + "method": _malware_checks, + }, + "observed-data": { + "method": _observed_data_checks, + }, + "report": { + "method": _report_checks, + }, + "threat-actor": { + "name": 60, + "threat_actor_types": 20, + "aliases": 20, + "method": _threat_actor_checks, + }, + "tool": { + "tool_types": 20, + "name": 80, + "method": _tool_checks, + }, + "vulnerability": { + "name": 30, + "external_references": 70, + "method": _vulnerability_checks, + }, + "_internal": { + "ignore_spec_version": False, + }, + } + + if weight_dict: + weights.update(weight_dict) + + type1, type2 = obj1["type"], obj2["type"] + ignore_spec_version = weights["_internal"]["ignore_spec_version"] + + if type1 != type2: + raise ValueError('The objects to compare must be of the same type!') + + if ignore_spec_version is False and obj1.get("spec_version", "2.0") != obj2.get("spec_version", "2.0"): + raise ValueError('The objects to compare must be of the same spec version!') + + method = weights[type1]["method"] + matching_score, sum_weights = method(obj1, obj2, **weights[type1]) + + if sum_weights <= 0: + return 0 + + equivalence_score = (matching_score / sum_weights) * 100.0 + return equivalence_score + + +def check_property_present(prop, obj1, obj2): + """Helper method checks if a property is present on both objects.""" + if prop in obj1 and prop in obj2: + return True + return False + + +def partial_timestamp_based(t1, t2, tdelta): + """Performs a timestamp-based matching via checking how close one timestamp is to another. + + Args: + t1: A datetime string or STIXdatetime object. + t2: A datetime string or STIXdatetime object. + tdelta (float): A given time delta. This number is multiplied by 86400 (1 day) to + extend or shrink your time change tolerance. + + Returns: + float: Number between 0.0 and 1.0 depending on match criteria. + + """ + if not isinstance(t1, STIXdatetime): + t1 = parse_into_datetime(t1) + if not isinstance(t2, STIXdatetime): + t2 = parse_into_datetime(t2) + t1, t2 = time.mktime(t1.timetuple()), time.mktime(t2.timetuple()) + return 1 - min(abs(t1 - t2) / (86400 * tdelta), 1) + + +def partial_list_based(l1, l2): + """Performs a partial list matching via finding the intersection between common values. + + Args: + l1: A list of values. + l2: A list of values. + + Returns: + float: 1.0 if the value matches exactly, 0.0 otherwise. + + """ + l1_set, l2_set = set(l1), set(l2) + return len(l1_set.intersection(l2_set)) / max(len(l1), len(l2)) + + +def exact_match(val1, val2): + """Performs an exact value match based on two values + + Args: + val1: A value suitable for an equality test. + val2: A value suitable for an equality test. + + Returns: + float: 1.0 if the value matches exactly, 0.0 otherwise. + + """ + if val1 == val2: + return 1.0 + return 0.0 + + +def partial_string_based(str1, str2): + """Performs a partial string match using the Jaro-Winkler distance algorithm. + + Args: + str1: A string value to check. + str2: A string value to check. + + Returns: + float: Number between 0.0 and 1.0 depending on match criteria. + + """ + from pyjarowinkler import distance + return distance.get_jaro_distance(str1, str2) + + +def custom_pattern_based(pattern1, pattern2): + """Performs a matching on Indicator Patterns. + + Args: + pattern1: An Indicator pattern + pattern2: An Indicator pattern + + Returns: + float: Number between 0.0 and 1.0 depending on match criteria. + + """ + logger.warning("Indicator pattern equivalence is not fully defined; will default to zero if not completely identical") + return exact_match(pattern1, pattern2) # TODO: Implement pattern based equivalence + + +def partial_external_reference_based(refs1, refs2): + """Performs a matching on External References. + + Args: + refs1: A list of external references. + refs2: A list of external references. + + Returns: + float: Number between 0.0 and 1.0 depending on matches. + + """ + allowed = set(("veris", "cve", "capec", "mitre-attack")) + matches = 0 + + if len(refs1) >= len(refs2): + l1 = refs1 + l2 = refs2 + else: + l1 = refs2 + l2 = refs1 + + for ext_ref1 in l1: + for ext_ref2 in l2: + sn_match = False + ei_match = False + url_match = False + source_name = None + + if check_property_present("source_name", ext_ref1, ext_ref2): + if ext_ref1["source_name"] == ext_ref2["source_name"]: + source_name = ext_ref1["source_name"] + sn_match = True + if check_property_present("external_id", ext_ref1, ext_ref2): + if ext_ref1["external_id"] == ext_ref2["external_id"]: + ei_match = True + if check_property_present("url", ext_ref1, ext_ref2): + if ext_ref1["url"] == ext_ref2["url"]: + url_match = True + + # Special case: if source_name is a STIX defined name and either + # external_id or url match then its a perfect match and other entries + # can be ignored. + if sn_match and (ei_match or url_match) and source_name in allowed: + return 1.0 + + # Regular check. If the source_name (not STIX-defined) or external_id or + # url matches then we consider the entry a match. + if (sn_match or ei_match or url_match) and source_name not in allowed: + matches += 1 + + return matches / max(len(refs1), len(refs2)) + + +def partial_location_distance(lat1, long1, lat2, long2, threshold): + """Given two coordinates perform a matching based on its distance using the Haversine Formula. + + Args: + lat1: Latitude value for first coordinate point. + lat2: Latitude value for second coordinate point. + long1: Longitude value for first coordinate point. + long2: Longitude value for second coordinate point. + threshold (float): A kilometer measurement for the threshold distance between these two points. + + Returns: + float: Number between 0.0 and 1.0 depending on match. + + """ + from haversine import haversine, Unit + distance = haversine((lat1, long1), (lat2, long2), unit=Unit.KILOMETERS) + return 1 - (distance / threshold) + + +def _attack_pattern_checks(obj1, obj2, **weights): + matching_score = 0.0 + sum_weights = 0.0 + if check_property_present("name", obj1, obj2): + w = weights["name"] + sum_weights += w + matching_score += w * partial_string_based(obj1["name"], obj2["name"]) + if check_property_present("external_references", obj1, obj2): + w = weights["external_references"] + sum_weights += w + matching_score += ( + w * + partial_external_reference_based(obj1["external_references"], obj2["external_references"]) + ) + return matching_score, sum_weights + + +def _campaign_checks(obj1, obj2, **weights): + matching_score = 0.0 + sum_weights = 0.0 + if check_property_present("name", obj1, obj2): + w = weights["name"] + sum_weights += w + matching_score += w * partial_string_based(obj1["name"], obj2["name"]) + if check_property_present("aliases", obj1, obj2): + w = weights["aliases"] + sum_weights += w + matching_score += w * partial_list_based(obj1["aliases"], obj2["aliases"]) + return matching_score, sum_weights + + +def _identity_checks(obj1, obj2, **weights): + matching_score = 0.0 + sum_weights = 0.0 + if check_property_present("name", obj1, obj2): + w = weights["name"] + sum_weights += w + matching_score += w * exact_match(obj1["name"], obj2["name"]) + if check_property_present("identity_class", obj1, obj2): + w = weights["identity_class"] + sum_weights += w + matching_score += w * exact_match(obj1["identity_class"], obj2["identity_class"]) + if check_property_present("sectors", obj1, obj2): + w = weights["sectors"] + sum_weights += w + matching_score += w * partial_list_based(obj1["sectors"], obj2["sectors"]) + return matching_score, sum_weights + + +def _indicator_checks(obj1, obj2, **weights): + matching_score = 0.0 + sum_weights = 0.0 + if check_property_present("indicator_types", obj1, obj2): + w = weights["indicator_types"] + sum_weights += w + matching_score += w * partial_list_based(obj1["indicator_types"], obj2["indicator_types"]) + if check_property_present("pattern", obj1, obj2): + w = weights["pattern"] + sum_weights += w + matching_score += w * custom_pattern_based(obj1["pattern"], obj2["pattern"]) + if check_property_present("valid_from", obj1, obj2): + w = weights["valid_from"] + sum_weights += w + matching_score += ( + w * + partial_timestamp_based(obj1["valid_from"], obj2["valid_from"], weights["tdelta"]) + ) + return matching_score, sum_weights + + +def _location_checks(obj1, obj2, **weights): + matching_score = 0.0 + sum_weights = 0.0 + if check_property_present("latitude", obj1, obj2) and check_property_present("longitude", obj1, obj2): + w = weights["longitude_latitude"] + sum_weights += w + matching_score += ( + w * + partial_location_distance(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], weights["threshold"]) + ) + if check_property_present("region", obj1, obj2): + w = weights["region"] + sum_weights += w + matching_score += w * exact_match(obj1["region"], obj2["region"]) + if check_property_present("country", obj1, obj2): + w = weights["country"] + sum_weights += w + matching_score += w * exact_match(obj1["country"], obj2["country"]) + return matching_score, sum_weights + + +def _malware_checks(obj1, obj2, **weights): + matching_score = 0.0 + sum_weights = 0.0 + if check_property_present("malware_types", obj1, obj2): + w = weights["malware_types"] + sum_weights += w + matching_score += w * partial_list_based(obj1["malware_types"], obj2["malware_types"]) + if check_property_present("name", obj1, obj2): + w = weights["name"] + sum_weights += w + matching_score += w * partial_string_based(obj1["name"], obj2["name"]) + return matching_score, sum_weights + + +def _threat_actor_checks(obj1, obj2, **weights): + matching_score = 0.0 + sum_weights = 0.0 + if check_property_present("name", obj1, obj2): + w = weights["name"] + sum_weights += w + matching_score += w * partial_string_based(obj1["name"], obj2["name"]) + if check_property_present("threat_actor_types", obj1, obj2): + w = weights["threat_actor_types"] + sum_weights += w + matching_score += w * partial_list_based(obj1["threat_actor_types"], obj2["threat_actor_types"]) + if check_property_present("aliases", obj1, obj2): + w = weights["aliases"] + sum_weights += w + matching_score += w * partial_list_based(obj1["aliases"], obj2["aliases"]) + return matching_score, sum_weights + + +def _tool_checks(obj1, obj2, **weights): + matching_score = 0.0 + sum_weights = 0.0 + if check_property_present("tool_types", obj1, obj2): + w = weights["tool_types"] + sum_weights += w + matching_score += w * partial_list_based(obj1["tool_types"], obj2["tool_types"]) + if check_property_present("name", obj1, obj2): + w = weights["name"] + sum_weights += w + matching_score += w * partial_string_based(obj1["name"], obj2["name"]) + return matching_score, sum_weights + + +def _vulnerability_checks(obj1, obj2, **weights): + matching_score = 0.0 + sum_weights = 0.0 + if check_property_present("name", obj1, obj2): + w = weights["name"] + sum_weights += w + matching_score += w * partial_string_based(obj1["name"], obj2["name"]) + if check_property_present("external_references", obj1, obj2): + w = weights["external_references"] + sum_weights += w + matching_score += w * partial_external_reference_based( + obj1["external_references"], + obj2["external_references"], + ) + return matching_score, sum_weights + + +def _course_of_action_checks(obj1, obj2, **weights): + raise SemanticEquivalenceUnsupportedTypeError("course-of-action type has no semantic equivalence implementation!") + + +def _intrusion_set_checks(obj1, obj2, **weights): + raise SemanticEquivalenceUnsupportedTypeError("intrusion-set type has no semantic equivalence implementation!") + + +def _observed_data_checks(obj1, obj2, **weights): + raise SemanticEquivalenceUnsupportedTypeError("observed-data type has no semantic equivalence implementation!") + + +def _report_checks(obj1, obj2, **weights): + raise SemanticEquivalenceUnsupportedTypeError("report type has no semantic equivalence implementation!") diff --git a/stix2/exceptions.py b/stix2/exceptions.py index d2ec3fc..6405c2e 100644 --- a/stix2/exceptions.py +++ b/stix2/exceptions.py @@ -233,3 +233,10 @@ class STIXDeprecationWarning(DeprecationWarning): Represents usage of a deprecated component of a STIX specification. """ pass + + +class SemanticEquivalenceUnsupportedTypeError(STIXError, TypeError): + """STIX object type not supported by the semantic equivalence approach.""" + + def __init__(self, msg): + super(SemanticEquivalenceUnsupportedTypeError, self).__init__(msg) diff --git a/stix2/test/v21/test_environment.py b/stix2/test/v21/test_environment.py index 90f31cb..62b0c53 100644 --- a/stix2/test/v21/test_environment.py +++ b/stix2/test/v21/test_environment.py @@ -1,11 +1,17 @@ import pytest import stix2 +import stix2.environment +import stix2.exceptions from .constants import ( - CAMPAIGN_ID, CAMPAIGN_KWARGS, FAKE_TIME, IDENTITY_ID, IDENTITY_KWARGS, - INDICATOR_ID, INDICATOR_KWARGS, MALWARE_ID, MALWARE_KWARGS, - RELATIONSHIP_IDS, + ATTACK_PATTERN_ID, ATTACK_PATTERN_KWARGS, CAMPAIGN_ID, CAMPAIGN_KWARGS, + COURSE_OF_ACTION_ID, COURSE_OF_ACTION_KWARGS, FAKE_TIME, IDENTITY_ID, + IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS, INTRUSION_SET_ID, + INTRUSION_SET_KWARGS, LOCATION_ID, MALWARE_ID, MALWARE_KWARGS, + OBSERVED_DATA_ID, OBSERVED_DATA_KWARGS, RELATIONSHIP_IDS, REPORT_ID, + REPORT_KWARGS, THREAT_ACTOR_ID, THREAT_ACTOR_KWARGS, TOOL_ID, TOOL_KWARGS, + VULNERABILITY_ID, VULNERABILITY_KWARGS, ) @@ -375,3 +381,399 @@ def test_related_to_by_target(ds): assert len(resp) == 2 assert any(x['id'] == CAMPAIGN_ID for x in resp) assert any(x['id'] == INDICATOR_ID for x in resp) + + +def test_semantic_equivalence_on_same_attack_pattern1(): + ap1 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS) + ap2 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS) + env = stix2.Environment().semantically_equivalent(ap1, ap2) + assert round(env) == 100 + + +def test_semantic_equivalence_on_same_attack_pattern2(): + ATTACK_KWARGS = dict( + name="Phishing", + external_references=[ + { + "url": "https://example2", + "source_name": "some-source2", + }, + ], + ) + ap1 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_KWARGS) + ap2 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_KWARGS) + env = stix2.Environment().semantically_equivalent(ap1, ap2) + assert round(env) == 100 + + +def test_semantic_equivalence_on_same_campaign1(): + camp1 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS) + camp2 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS) + env = stix2.Environment().semantically_equivalent(camp1, camp2) + assert round(env) == 100 + + +def test_semantic_equivalence_on_same_campaign2(): + CAMP_KWARGS = dict( + name="Green Group Attacks Against Finance", + description="Campaign by Green Group against a series of targets in the financial services sector.", + aliases=["super-green", "some-green"], + ) + camp1 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMP_KWARGS) + camp2 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMP_KWARGS) + env = stix2.Environment().semantically_equivalent(camp1, camp2) + assert round(env) == 100 + + +def test_semantic_equivalence_on_same_identity1(): + iden1 = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS) + iden2 = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS) + env = stix2.Environment().semantically_equivalent(iden1, iden2) + assert round(env) == 100 + + +def test_semantic_equivalence_on_same_identity2(): + IDEN_KWARGS = dict( + name="John Smith", + identity_class="individual", + sectors=["government", "critical-infrastructure"], + ) + iden1 = stix2.v21.Identity(id=IDENTITY_ID, **IDEN_KWARGS) + iden2 = stix2.v21.Identity(id=IDENTITY_ID, **IDEN_KWARGS) + env = stix2.Environment().semantically_equivalent(iden1, iden2) + assert round(env) == 100 + + +def test_semantic_equivalence_on_same_indicator(): + ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) + ind2 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) + env = stix2.Environment().semantically_equivalent(ind1, ind2) + assert round(env) == 100 + + +def test_semantic_equivalence_on_same_location1(): + LOCATION_KWARGS = dict(latitude=45, longitude=179) + loc1 = stix2.v21.Location(id=LOCATION_ID, **LOCATION_KWARGS) + loc2 = stix2.v21.Location(id=LOCATION_ID, **LOCATION_KWARGS) + env = stix2.Environment().semantically_equivalent(loc1, loc2) + assert round(env) == 100 + + +def test_semantic_equivalence_on_same_location2(): + LOCATION_KWARGS = dict( + latitude=38.889, + longitude=-77.023, + region="northern-america", + country="us", + ) + loc1 = stix2.v21.Location(id=LOCATION_ID, **LOCATION_KWARGS) + loc2 = stix2.v21.Location(id=LOCATION_ID, **LOCATION_KWARGS) + env = stix2.Environment().semantically_equivalent(loc1, loc2) + assert round(env) == 100 + + +def test_semantic_equivalence_on_same_malware(): + malw1 = stix2.v21.Malware(id=MALWARE_ID, **MALWARE_KWARGS) + malw2 = stix2.v21.Malware(id=MALWARE_ID, **MALWARE_KWARGS) + env = stix2.Environment().semantically_equivalent(malw1, malw2) + assert round(env) == 100 + + +def test_semantic_equivalence_on_same_threat_actor1(): + ta1 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS) + ta2 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS) + env = stix2.Environment().semantically_equivalent(ta1, ta2) + assert round(env) == 100 + + +def test_semantic_equivalence_on_same_threat_actor2(): + THREAT_KWARGS = dict( + threat_actor_types=["crime-syndicate"], + aliases=["super-evil"], + name="Evil Org", + ) + ta1 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_KWARGS) + ta2 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_KWARGS) + env = stix2.Environment().semantically_equivalent(ta1, ta2) + assert round(env) == 100 + + +def test_semantic_equivalence_on_same_tool(): + tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS) + tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS) + env = stix2.Environment().semantically_equivalent(tool1, tool2) + assert round(env) == 100 + + +def test_semantic_equivalence_on_same_vulnerability1(): + vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS) + vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS) + env = stix2.Environment().semantically_equivalent(vul1, vul2) + assert round(env) == 100 + + +def test_semantic_equivalence_on_same_vulnerability2(): + VULN_KWARGS1 = dict( + name="Heartbleed", + external_references=[ + { + "url": "https://example", + "source_name": "some-source", + }, + ], + ) + VULN_KWARGS2 = dict( + name="Zot", + external_references=[ + { + "url": "https://example2", + "source_name": "some-source2", + }, + ], + ) + vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS1) + vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS2) + env = stix2.Environment().semantically_equivalent(vul1, vul2) + assert round(env) == 0.0 + + +def test_semantic_equivalence_on_unknown_object(): + CUSTOM_KWARGS1 = dict( + type="x-foobar", + id="x-foobar--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061", + name="Heartbleed", + external_references=[ + { + "url": "https://example", + "source_name": "some-source", + }, + ], + ) + CUSTOM_KWARGS2 = dict( + type="x-foobar", + id="x-foobar--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061", + name="Zot", + external_references=[ + { + "url": "https://example2", + "source_name": "some-source2", + }, + ], + ) + + def _x_foobar_checks(obj1, obj2, **weights): + matching_score = 0.0 + sum_weights = 0.0 + if stix2.environment.check_property_present("external_references", obj1, obj2): + w = weights["external_references"] + sum_weights += w + matching_score += w * stix2.environment.partial_external_reference_based( + obj1["external_references"], + obj2["external_references"], + ) + if stix2.environment.check_property_present("name", obj1, obj2): + w = weights["name"] + sum_weights += w + matching_score += w * stix2.environment.partial_string_based(obj1["name"], obj2["name"]) + return matching_score, sum_weights + + weights = { + "x-foobar": { + "external_references": 40, + "name": 60, + "method": _x_foobar_checks, + }, + "_internal": { + "ignore_spec_version": False, + }, + } + cust1 = stix2.parse(CUSTOM_KWARGS1, allow_custom=True) + cust2 = stix2.parse(CUSTOM_KWARGS2, allow_custom=True) + env = stix2.Environment().semantically_equivalent(cust1, cust2, **weights) + assert round(env) == 0 + + +def test_semantic_equivalence_different_type_raises(): + with pytest.raises(ValueError) as excinfo: + vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS) + ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) + stix2.Environment().semantically_equivalent(vul1, ind1) + + assert str(excinfo.value) == "The objects to compare must be of the same type!" + + +def test_semantic_equivalence_different_spec_version_raises(): + with pytest.raises(ValueError) as excinfo: + V20_KWARGS = dict( + labels=['malicious-activity'], + pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']", + ) + ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) + ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **V20_KWARGS) + stix2.Environment().semantically_equivalent(ind1, ind2) + + assert str(excinfo.value) == "The objects to compare must be of the same spec version!" + + +@pytest.mark.parametrize( + "obj1,obj2,ret_val", + [ + ( + stix2.v21.CourseOfAction(id=COURSE_OF_ACTION_ID, **COURSE_OF_ACTION_KWARGS), + stix2.v21.CourseOfAction(id=COURSE_OF_ACTION_ID, **COURSE_OF_ACTION_KWARGS), + "course-of-action type has no semantic equivalence implementation!", + ), + ( + stix2.v21.IntrusionSet(id=INTRUSION_SET_ID, **INTRUSION_SET_KWARGS), + stix2.v21.IntrusionSet(id=INTRUSION_SET_ID, **INTRUSION_SET_KWARGS), + "intrusion-set type has no semantic equivalence implementation!", + ), + ( + stix2.v21.ObservedData(id=OBSERVED_DATA_ID, **OBSERVED_DATA_KWARGS), + stix2.v21.ObservedData(id=OBSERVED_DATA_ID, **OBSERVED_DATA_KWARGS), + "observed-data type has no semantic equivalence implementation!", + ), + ( + stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS), + stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS), + "report type has no semantic equivalence implementation!", + ), + ], +) +def test_semantic_equivalence_on_unsupported_types(obj1, obj2, ret_val): + with pytest.raises(stix2.exceptions.SemanticEquivalenceUnsupportedTypeError) as excinfo: + stix2.Environment().semantically_equivalent(obj1, obj2) + assert ret_val == str(excinfo.value) + + +def test_semantic_equivalence_zero_match(): + IND_KWARGS = dict( + indicator_types=["APTX"], + pattern="[ipv4-addr:value = '192.168.1.1']", + pattern_type="stix", + valid_from="2019-01-01T12:34:56Z", + ) + weights = { + "indicator": { + "indicator_types": 15, + "pattern": 80, + "valid_from": 0, + "tdelta": 1, # One day interval + "method": stix2.environment._indicator_checks, + }, + "_internal": { + "ignore_spec_version": False, + }, + } + ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) + ind2 = stix2.v21.Indicator(id=INDICATOR_ID, **IND_KWARGS) + env = stix2.Environment().semantically_equivalent(ind1, ind2, **weights) + assert round(env) == 0 + + +def test_semantic_equivalence_different_spec_version(): + IND_KWARGS = dict( + labels=["APTX"], + pattern="[ipv4-addr:value = '192.168.1.1']", + ) + weights = { + "indicator": { + "indicator_types": 15, + "pattern": 80, + "valid_from": 0, + "tdelta": 1, # One day interval + "method": stix2.environment._indicator_checks, + }, + "_internal": { + "ignore_spec_version": True, # Disables spec_version check. + }, + } + ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) + ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **IND_KWARGS) + env = stix2.Environment().semantically_equivalent(ind1, ind2, **weights) + assert round(env) == 0 + + +@pytest.mark.parametrize( + "refs1,refs2,ret_val", [ + ( + [ + { + "url": "https://attack.mitre.org/techniques/T1150", + "source_name": "mitre-attack", + "external_id": "T1150", + }, + { + "url": "https://researchcenter.paloaltonetworks.com/2016/09/unit42-sofacys-komplex-os-x-trojan/", + "source_name": "Sofacy Komplex Trojan", + "description": "Dani Creus, Tyler Halfpop, Robert Falcone. (2016, September 26). Sofacy's 'Komplex' OS X Trojan. Retrieved July 8, 2017.", + }, + ], + [ + { + "url": "https://attack.mitre.org/techniques/T1129", + "source_name": "mitre-attack", + "external_id": "T1129", + }, + { + "url": "https://en.wikipedia.org/wiki/Microsoft_Windows_library_files", + "source_name": "Wikipedia Windows Library Files", + "description": "Wikipedia. (2017, January 31). Microsoft Windows library files. Retrieved February 13, 2017.", + }, + ], + 0.0, + ), + ( + [ + { + "url": "https://attack.mitre.org/techniques/T1129", + "source_name": "mitre-attack", + "external_id": "T1129", + }, + ], + [ + { + "url": "https://attack.mitre.org/techniques/T1129", + "source_name": "mitre-attack", + "external_id": "T1129", + }, + { + "url": "https://en.wikipedia.org/wiki/Microsoft_Windows_library_files", + "source_name": "Wikipedia Windows Library Files", + "description": "Wikipedia. (2017, January 31). Microsoft Windows library files. Retrieved February 13, 2017.", + }, + ], + 1.0, + ), + ( + [ + { + "url": "https://example", + "source_name": "some-source", + }, + ], + [ + { + "url": "https://example", + "source_name": "some-source", + }, + ], + 1.0, + ), + ], +) +def test_semantic_equivalence_external_references(refs1, refs2, ret_val): + value = stix2.environment.partial_external_reference_based(refs1, refs2) + assert value == ret_val + + +def test_semantic_equivalence_timetamp(): + t1 = "2018-10-17T00:14:20.652Z" + t2 = "2018-10-17T12:14:20.652Z" + assert stix2.environment.partial_timestamp_based(t1, t2, 1) == 0.5 + + +def test_semantic_equivalence_exact_match(): + t1 = "2018-10-17T00:14:20.652Z" + t2 = "2018-10-17T12:14:20.652Z" + assert stix2.environment.exact_match(t1, t2) == 0.0 diff --git a/tox.ini b/tox.ini index 2225bae..d8b840f 100644 --- a/tox.ini +++ b/tox.ini @@ -9,6 +9,8 @@ deps = pytest-cov coverage taxii2-client + pyjarowinkler + haversine medallion commands = python -m pytest --cov=stix2 stix2/test/ --cov-report term-missing -W ignore::stix2.exceptions.STIXDeprecationWarning