add docstrings for new public methods. add test with disabled spec_version check.

fix calculation for distance, using incorrect algorithm. update package settings, tox settings
master
Emmanuelle Vargas-Gonzalez 2019-09-23 23:13:50 -04:00
parent 4eaaee89dc
commit dc79a1f869
4 changed files with 107 additions and 6 deletions

View File

@ -63,6 +63,6 @@ setup(
}, },
extras_require={ extras_require={
'taxii': ['taxii2-client'], 'taxii': ['taxii2-client'],
'semantic': ['pyjarowinkler'], 'semantic': ['haversine', 'pyjarowinkler'],
}, },
) )

View File

@ -2,7 +2,6 @@
import copy import copy
import logging import logging
import math
import time import time
from .core import parse as _parse from .core import parse as _parse
@ -252,6 +251,7 @@ class Environment(DataStoreMixin):
"longitude_latitude": 34, "longitude_latitude": 34,
"region": 33, "region": 33,
"country": 33, "country": 33,
"threshold": 1000.0,
"method": _location_checks, "method": _location_checks,
}, },
"malware": { "malware": {
@ -309,12 +309,25 @@ class Environment(DataStoreMixin):
def check_property_present(prop, obj1, obj2): def check_property_present(prop, obj1, obj2):
"""Helper method checks if a property is present on both objects."""
if prop in obj1 and prop in obj2: if prop in obj1 and prop in obj2:
return True return True
return False return False
def partial_timestamp_based(t1, t2, tdelta): def partial_timestamp_based(t1, t2, tdelta):
"""Performs a timestamp-based matching via checking how close one timestamp is to another.
Args:
t1: A datetime string or STIXdatetime object.
t2: A datetime string or STIXdatetime object.
tdelta (float): A given time delta. This number is multiplied by 86400 (1 day) to
extend or shrink your time change tolerance.
Returns:
float: Number between 0.0 and 1.0 depending on match criteria.
"""
if not isinstance(t1, STIXdatetime): if not isinstance(t1, STIXdatetime):
t1 = parse_into_datetime(t1) t1 = parse_into_datetime(t1)
if not isinstance(t2, STIXdatetime): if not isinstance(t2, STIXdatetime):
@ -324,27 +337,77 @@ def partial_timestamp_based(t1, t2, tdelta):
def partial_list_based(l1, l2): def partial_list_based(l1, l2):
"""Performs a partial list matching via finding the intersection between common values.
Args:
l1: A list of values.
l2: A list of values.
Returns:
float: 1.0 if the value matches exactly, 0.0 otherwise.
"""
l1_set, l2_set = set(l1), set(l2) l1_set, l2_set = set(l1), set(l2)
return len(l1_set.intersection(l2_set)) / max(len(l1), len(l2)) return len(l1_set.intersection(l2_set)) / max(len(l1), len(l2))
def exact_match(val1, val2): def exact_match(val1, val2):
"""Performs an exact value match based on two values
Args:
val1: A value suitable for an equality test.
val2: A value suitable for an equality test.
Returns:
float: 1.0 if the value matches exactly, 0.0 otherwise.
"""
if val1 == val2: if val1 == val2:
return 1.0 return 1.0
return 0.0 return 0.0
def partial_string_based(str1, str2): def partial_string_based(str1, str2):
"""Performs a partial string match using the Jaro-Winkler distance algorithm.
Args:
str1: A string value to check.
str2: A string value to check.
Returns:
float: Number between 0.0 and 1.0 depending on match criteria.
"""
from pyjarowinkler import distance from pyjarowinkler import distance
return distance.get_jaro_distance(str1, str2) return distance.get_jaro_distance(str1, str2)
def custom_pattern_based(pattern1, pattern2): def custom_pattern_based(pattern1, pattern2):
"""Performs a matching on Indicator Patterns.
Args:
pattern1: An Indicator pattern
pattern2: An Indicator pattern
Returns:
float: Number between 0.0 and 1.0 depending on match criteria.
"""
logger.warning("Indicator pattern equivalence is not fully defined; will default to zero if not completely identical") logger.warning("Indicator pattern equivalence is not fully defined; will default to zero if not completely identical")
return exact_match(pattern1, pattern2) # TODO: Implement pattern based equivalence return exact_match(pattern1, pattern2) # TODO: Implement pattern based equivalence
def partial_external_reference_based(refs1, refs2): def partial_external_reference_based(refs1, refs2):
"""Performs a matching on External References.
Args:
refs1: A list of external references.
refs2: A list of external references.
Returns:
float: Number between 0.0 and 1.0 depending on matches.
"""
allowed = set(("veris", "cve", "capec", "mitre-attack")) allowed = set(("veris", "cve", "capec", "mitre-attack"))
matches = 0 matches = 0
@ -387,9 +450,23 @@ def partial_external_reference_based(refs1, refs2):
return matches / max(len(refs1), len(refs2)) return matches / max(len(refs1), len(refs2))
def partial_location_distance(lat1, long1, lat2, long2): def partial_location_distance(lat1, long1, lat2, long2, threshold):
distance = math.sqrt(((lat2 - lat1) ** 2) + ((long2 - long1) ** 2)) """Given two coordinates perform a matching based on its distance using the Haversine Formula.
return 1 - (distance / 1000.0)
Args:
lat1: Latitude value for first coordinate point.
lat2: Latitude value for second coordinate point.
long1: Longitude value for first coordinate point.
long2: Longitude value for second coordinate point.
threshold (float): A kilometer measurement for the threshold distance between these two points.
Returns:
float: Number between 0.0 and 1.0 depending on match.
"""
from haversine import haversine, Unit
distance = haversine((lat1, long1), (lat2, long2), unit=Unit.KILOMETERS)
return 1 - (distance / threshold)
def _attack_pattern_checks(obj1, obj2, **weights): def _attack_pattern_checks(obj1, obj2, **weights):
@ -470,7 +547,7 @@ def _location_checks(obj1, obj2, **weights):
sum_weights += w sum_weights += w
matching_score += ( matching_score += (
w * w *
partial_location_distance(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"]) partial_location_distance(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], weights["threshold"])
) )
if check_property_present("region", obj1, obj2): if check_property_present("region", obj1, obj2):
w = weights["region"] w = weights["region"]

View File

@ -666,6 +666,29 @@ def test_semantic_equivalence_zero_match():
assert round(env) == 0 assert round(env) == 0
def test_semantic_equivalence_different_spec_version():
IND_KWARGS = dict(
labels=["APTX"],
pattern="[ipv4-addr:value = '192.168.1.1']",
)
weights = {
"indicator": {
"indicator_types": 15,
"pattern": 80,
"valid_from": 0,
"tdelta": 1, # One day interval
"method": stix2.environment._indicator_checks,
},
"_internal": {
"ignore_spec_version": True, # Disables spec_version check.
},
}
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **IND_KWARGS)
env = stix2.Environment().semantically_equivalent(ind1, ind2, **weights)
assert round(env) == 0
@pytest.mark.parametrize( @pytest.mark.parametrize(
"refs1,refs2,ret_val", [ "refs1,refs2,ret_val", [
( (

View File

@ -10,6 +10,7 @@ deps =
coverage coverage
taxii2-client taxii2-client
pyjarowinkler pyjarowinkler
haversine
medallion medallion
commands = commands =
pytest --ignore=stix2/test/v20/test_workbench.py --ignore=stix2/test/v21/test_workbench.py --cov=stix2 stix2/test/ --cov-report term-missing pytest --ignore=stix2/test/v20/test_workbench.py --ignore=stix2/test/v21/test_workbench.py --cov=stix2 stix2/test/ --cov-report term-missing