From dc79a1f869e54f941e81d31a7544f4e0ebcf6e7e Mon Sep 17 00:00:00 2001 From: Emmanuelle Vargas-Gonzalez Date: Mon, 23 Sep 2019 23:13:50 -0400 Subject: [PATCH] add docstrings for new public methods. add test with disabled spec_version check. fix calculation for distance, using incorrect algorithm. update package settings, tox settings --- setup.py | 2 +- stix2/environment.py | 87 ++++++++++++++++++++++++++++-- stix2/test/v21/test_environment.py | 23 ++++++++ tox.ini | 1 + 4 files changed, 107 insertions(+), 6 deletions(-) diff --git a/setup.py b/setup.py index 185c76c..481534b 100644 --- a/setup.py +++ b/setup.py @@ -63,6 +63,6 @@ setup( }, extras_require={ 'taxii': ['taxii2-client'], - 'semantic': ['pyjarowinkler'], + 'semantic': ['haversine', 'pyjarowinkler'], }, ) diff --git a/stix2/environment.py b/stix2/environment.py index c013ae2..d2c6d3a 100644 --- a/stix2/environment.py +++ b/stix2/environment.py @@ -2,7 +2,6 @@ import copy import logging -import math import time from .core import parse as _parse @@ -252,6 +251,7 @@ class Environment(DataStoreMixin): "longitude_latitude": 34, "region": 33, "country": 33, + "threshold": 1000.0, "method": _location_checks, }, "malware": { @@ -309,12 +309,25 @@ class Environment(DataStoreMixin): def check_property_present(prop, obj1, obj2): + """Helper method checks if a property is present on both objects.""" if prop in obj1 and prop in obj2: return True return False def partial_timestamp_based(t1, t2, tdelta): + """Performs a timestamp-based matching via checking how close one timestamp is to another. + + Args: + t1: A datetime string or STIXdatetime object. + t2: A datetime string or STIXdatetime object. + tdelta (float): A given time delta. This number is multiplied by 86400 (1 day) to + extend or shrink your time change tolerance. + + Returns: + float: Number between 0.0 and 1.0 depending on match criteria. + + """ if not isinstance(t1, STIXdatetime): t1 = parse_into_datetime(t1) if not isinstance(t2, STIXdatetime): @@ -324,27 +337,77 @@ def partial_timestamp_based(t1, t2, tdelta): def partial_list_based(l1, l2): + """Performs a partial list matching via finding the intersection between common values. + + Args: + l1: A list of values. + l2: A list of values. + + Returns: + float: 1.0 if the value matches exactly, 0.0 otherwise. + + """ l1_set, l2_set = set(l1), set(l2) return len(l1_set.intersection(l2_set)) / max(len(l1), len(l2)) def exact_match(val1, val2): + """Performs an exact value match based on two values + + Args: + val1: A value suitable for an equality test. + val2: A value suitable for an equality test. + + Returns: + float: 1.0 if the value matches exactly, 0.0 otherwise. + + """ if val1 == val2: return 1.0 return 0.0 def partial_string_based(str1, str2): + """Performs a partial string match using the Jaro-Winkler distance algorithm. + + Args: + str1: A string value to check. + str2: A string value to check. + + Returns: + float: Number between 0.0 and 1.0 depending on match criteria. + + """ from pyjarowinkler import distance return distance.get_jaro_distance(str1, str2) def custom_pattern_based(pattern1, pattern2): + """Performs a matching on Indicator Patterns. + + Args: + pattern1: An Indicator pattern + pattern2: An Indicator pattern + + Returns: + float: Number between 0.0 and 1.0 depending on match criteria. + + """ logger.warning("Indicator pattern equivalence is not fully defined; will default to zero if not completely identical") return exact_match(pattern1, pattern2) # TODO: Implement pattern based equivalence def partial_external_reference_based(refs1, refs2): + """Performs a matching on External References. + + Args: + refs1: A list of external references. + refs2: A list of external references. + + Returns: + float: Number between 0.0 and 1.0 depending on matches. + + """ allowed = set(("veris", "cve", "capec", "mitre-attack")) matches = 0 @@ -387,9 +450,23 @@ def partial_external_reference_based(refs1, refs2): return matches / max(len(refs1), len(refs2)) -def partial_location_distance(lat1, long1, lat2, long2): - distance = math.sqrt(((lat2 - lat1) ** 2) + ((long2 - long1) ** 2)) - return 1 - (distance / 1000.0) +def partial_location_distance(lat1, long1, lat2, long2, threshold): + """Given two coordinates perform a matching based on its distance using the Haversine Formula. + + Args: + lat1: Latitude value for first coordinate point. + lat2: Latitude value for second coordinate point. + long1: Longitude value for first coordinate point. + long2: Longitude value for second coordinate point. + threshold (float): A kilometer measurement for the threshold distance between these two points. + + Returns: + float: Number between 0.0 and 1.0 depending on match. + + """ + from haversine import haversine, Unit + distance = haversine((lat1, long1), (lat2, long2), unit=Unit.KILOMETERS) + return 1 - (distance / threshold) def _attack_pattern_checks(obj1, obj2, **weights): @@ -470,7 +547,7 @@ def _location_checks(obj1, obj2, **weights): sum_weights += w matching_score += ( w * - partial_location_distance(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"]) + partial_location_distance(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], weights["threshold"]) ) if check_property_present("region", obj1, obj2): w = weights["region"] diff --git a/stix2/test/v21/test_environment.py b/stix2/test/v21/test_environment.py index f645513..d962147 100644 --- a/stix2/test/v21/test_environment.py +++ b/stix2/test/v21/test_environment.py @@ -666,6 +666,29 @@ def test_semantic_equivalence_zero_match(): assert round(env) == 0 +def test_semantic_equivalence_different_spec_version(): + IND_KWARGS = dict( + labels=["APTX"], + pattern="[ipv4-addr:value = '192.168.1.1']", + ) + weights = { + "indicator": { + "indicator_types": 15, + "pattern": 80, + "valid_from": 0, + "tdelta": 1, # One day interval + "method": stix2.environment._indicator_checks, + }, + "_internal": { + "ignore_spec_version": True, # Disables spec_version check. + }, + } + ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) + ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **IND_KWARGS) + env = stix2.Environment().semantically_equivalent(ind1, ind2, **weights) + assert round(env) == 0 + + @pytest.mark.parametrize( "refs1,refs2,ret_val", [ ( diff --git a/tox.ini b/tox.ini index 2bdae15..7911fde 100644 --- a/tox.ini +++ b/tox.ini @@ -10,6 +10,7 @@ deps = coverage taxii2-client pyjarowinkler + haversine medallion commands = pytest --ignore=stix2/test/v20/test_workbench.py --ignore=stix2/test/v21/test_workbench.py --cov=stix2 stix2/test/ --cov-report term-missing