cti-python-stix2/stix2/equivalence/object/__init__.py

452 lines
16 KiB
Python
Raw Normal View History

Graph Equivalence (#449) * new packages for graph and object-based semantic equivalence * new method graphically_equivalent for Environment, move equivalence methods out * object equivalence function, methods used for object-based moved here. * new graph_equivalence methods * add notes * add support for versioning checks (default disabled) * new tests to cover graph equivalence and new methods * added more imports to environment.py to prevent breaking changes * variable changes, new fields for checks, reset depth check per call * flexibility when object is not available on graph. * refactor debug logging message * new file stix2.equivalence.graph_equivalence.rst and stix2.equivalence.object_equivalence.rst for docs * API documentation for new modules * additional text required to build docs * add more test methods for list_semantic_check an graphically_equivalent/versioning * add logging debug messages, code clean-up * include individual scoring on results dict, fix issue on list_semantic_check not keeping highest score * include results as summary in prop_scores, minor tweaks * Update __init__.py doctrings update * apply feedback from pull request - rename semantic_check to reference_check - rename modules to graph and object respectively to eliminate redundancy - remove created_by_ref and object_marking_refs from graph WEIGHTS and rebalance * update docs/ entries * add more checks, make max score based on actual objects checked instead of the full list, only create entry when type is present in WEIGHTS dictionary update tests to reflect changes * rename package patterns -> pattern * documentation, moving weights around * more documentation moving * rename WEIGHTS variable for graph_equivalence
2020-10-16 17:35:26 +02:00
import logging
import time
from ...datastore import Filter
from ...utils import STIXdatetime, parse_into_datetime
logger = logging.getLogger(__name__)
def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
"""This method verifies if two objects of the same type are
semantically equivalent.
Args:
obj1: A stix2 object instance
obj2: A stix2 object instance
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the semantic equivalence process
Returns:
float: A number between 0.0 and 100.0 as a measurement of equivalence.
Warning:
Object types need to have property weights defined for the equivalence process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.object` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weights_dict:
.. include:: ../default_sem_eq_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
weights = WEIGHTS.copy()
if weight_dict:
weights.update(weight_dict)
type1, type2 = obj1["type"], obj2["type"]
ignore_spec_version = weights["_internal"]["ignore_spec_version"]
if type1 != type2:
raise ValueError('The objects to compare must be of the same type!')
if ignore_spec_version is False and obj1.get("spec_version", "2.0") != obj2.get("spec_version", "2.0"):
raise ValueError('The objects to compare must be of the same spec version!')
try:
weights[type1]
except KeyError:
logger.warning("'%s' type has no 'weights' dict specified & thus no semantic equivalence method to call!", type1)
sum_weights = matching_score = 0
else:
try:
method = weights[type1]["method"]
except KeyError:
logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"])
matching_score = 0.0
sum_weights = 0.0
for prop in weights[type1]:
if check_property_present(prop, obj1, obj2) or prop == "longitude_latitude":
w = weights[type1][prop][0]
comp_funct = weights[type1][prop][1]
if comp_funct == partial_timestamp_based:
contributing_score = w * comp_funct(obj1[prop], obj2[prop], weights[type1]["tdelta"])
elif comp_funct == partial_location_distance:
threshold = weights[type1]["threshold"]
contributing_score = w * comp_funct(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], threshold)
elif comp_funct == reference_check or comp_funct == list_reference_check:
max_depth = weights["_internal"]["max_depth"]
if max_depth < 0:
continue # prevent excessive recursion
else:
weights["_internal"]["max_depth"] -= 1
ds1, ds2 = weights["_internal"]["ds1"], weights["_internal"]["ds2"]
contributing_score = w * comp_funct(obj1[prop], obj2[prop], ds1, ds2, **weights)
else:
contributing_score = w * comp_funct(obj1[prop], obj2[prop])
sum_weights += w
matching_score += contributing_score
prop_scores[prop] = {
"weight": w,
"contributing_score": contributing_score,
}
logger.debug("'%s' check -- weight: %s, contributing score: %s", prop, w, contributing_score)
prop_scores["matching_score"] = matching_score
prop_scores["sum_weights"] = sum_weights
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
else:
logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"])
try:
matching_score, sum_weights = method(obj1, obj2, prop_scores, **weights[type1])
except TypeError:
# method doesn't support detailed output with prop_scores
matching_score, sum_weights = method(obj1, obj2, **weights[type1])
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
if sum_weights <= 0:
return 0
equivalence_score = (matching_score / sum_weights) * 100.0
return equivalence_score
def check_property_present(prop, obj1, obj2):
"""Helper method checks if a property is present on both objects."""
if prop in obj1 and prop in obj2:
return True
return False
def partial_timestamp_based(t1, t2, tdelta):
"""Performs a timestamp-based matching via checking how close one timestamp is to another.
Args:
t1: A datetime string or STIXdatetime object.
t2: A datetime string or STIXdatetime object.
tdelta (float): A given time delta. This number is multiplied by 86400 (1 day) to
extend or shrink your time change tolerance.
Returns:
float: Number between 0.0 and 1.0 depending on match criteria.
"""
if not isinstance(t1, STIXdatetime):
t1 = parse_into_datetime(t1)
if not isinstance(t2, STIXdatetime):
t2 = parse_into_datetime(t2)
t1, t2 = time.mktime(t1.timetuple()), time.mktime(t2.timetuple())
result = 1 - min(abs(t1 - t2) / (86400 * tdelta), 1)
logger.debug("--\t\tpartial_timestamp_based '%s' '%s' tdelta: '%s'\tresult: '%s'", t1, t2, tdelta, result)
return result
def partial_list_based(l1, l2):
"""Performs a partial list matching via finding the intersection between common values.
Args:
l1: A list of values.
l2: A list of values.
Returns:
float: 1.0 if the value matches exactly, 0.0 otherwise.
"""
l1_set, l2_set = set(l1), set(l2)
result = len(l1_set.intersection(l2_set)) / max(len(l1_set), len(l2_set))
logger.debug("--\t\tpartial_list_based '%s' '%s'\tresult: '%s'", l1, l2, result)
return result
def exact_match(val1, val2):
"""Performs an exact value match based on two values
Args:
val1: A value suitable for an equality test.
val2: A value suitable for an equality test.
Returns:
float: 1.0 if the value matches exactly, 0.0 otherwise.
"""
result = 0.0
if val1 == val2:
result = 1.0
logger.debug("--\t\texact_match '%s' '%s'\tresult: '%s'", val1, val2, result)
return result
def partial_string_based(str1, str2):
"""Performs a partial string match using the Jaro-Winkler distance algorithm.
Args:
str1: A string value to check.
str2: A string value to check.
Returns:
float: Number between 0.0 and 1.0 depending on match criteria.
"""
from rapidfuzz import fuzz
result = fuzz.token_sort_ratio(str1, str2)
logger.debug("--\t\tpartial_string_based '%s' '%s'\tresult: '%s'", str1, str2, result)
return result / 100.0
def custom_pattern_based(pattern1, pattern2):
"""Performs a matching on Indicator Patterns.
Args:
pattern1: An Indicator pattern
pattern2: An Indicator pattern
Returns:
float: Number between 0.0 and 1.0 depending on match criteria.
"""
logger.warning("Indicator pattern equivalence is not fully defined; will default to zero if not completely identical")
return exact_match(pattern1, pattern2) # TODO: Implement pattern based equivalence
def partial_external_reference_based(refs1, refs2):
"""Performs a matching on External References.
Args:
refs1: A list of external references.
refs2: A list of external references.
Returns:
float: Number between 0.0 and 1.0 depending on matches.
"""
allowed = {"veris", "cve", "capec", "mitre-attack"}
matches = 0
if len(refs1) >= len(refs2):
l1 = refs1
l2 = refs2
else:
l1 = refs2
l2 = refs1
for ext_ref1 in l1:
for ext_ref2 in l2:
sn_match = False
ei_match = False
url_match = False
source_name = None
if check_property_present("source_name", ext_ref1, ext_ref2):
if ext_ref1["source_name"] == ext_ref2["source_name"]:
source_name = ext_ref1["source_name"]
sn_match = True
if check_property_present("external_id", ext_ref1, ext_ref2):
if ext_ref1["external_id"] == ext_ref2["external_id"]:
ei_match = True
if check_property_present("url", ext_ref1, ext_ref2):
if ext_ref1["url"] == ext_ref2["url"]:
url_match = True
# Special case: if source_name is a STIX defined name and either
# external_id or url match then its a perfect match and other entries
# can be ignored.
if sn_match and (ei_match or url_match) and source_name in allowed:
result = 1.0
logger.debug(
"--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
refs1, refs2, result,
)
return result
# Regular check. If the source_name (not STIX-defined) or external_id or
# url matches then we consider the entry a match.
if (sn_match or ei_match or url_match) and source_name not in allowed:
matches += 1
result = matches / max(len(refs1), len(refs2))
logger.debug(
"--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
refs1, refs2, result,
)
return result
def partial_location_distance(lat1, long1, lat2, long2, threshold):
"""Given two coordinates perform a matching based on its distance using the Haversine Formula.
Args:
lat1: Latitude value for first coordinate point.
lat2: Latitude value for second coordinate point.
long1: Longitude value for first coordinate point.
long2: Longitude value for second coordinate point.
threshold (float): A kilometer measurement for the threshold distance between these two points.
Returns:
float: Number between 0.0 and 1.0 depending on match.
"""
from haversine import Unit, haversine
distance = haversine((lat1, long1), (lat2, long2), unit=Unit.KILOMETERS)
result = 1 - (distance / threshold)
logger.debug(
"--\t\tpartial_location_distance '%s' '%s' threshold: '%s'\tresult: '%s'",
(lat1, long1), (lat2, long2), threshold, result,
)
return result
def _versioned_checks(ref1, ref2, ds1, ds2, **weights):
"""Checks multiple object versions if present in graph.
Maximizes for the semantic equivalence score of a particular version."""
results = {}
objects1 = ds1.query([Filter("id", "=", ref1)])
objects2 = ds2.query([Filter("id", "=", ref2)])
if len(objects1) > 0 and len(objects2) > 0:
for o1 in objects1:
for o2 in objects2:
result = semantically_equivalent(o1, o2, **weights)
if ref1 not in results:
results[ref1] = {"matched": ref2, "value": result}
elif result > results[ref1]["value"]:
results[ref1] = {"matched": ref2, "value": result}
result = results.get(ref1, {}).get("value", 0.0)
logger.debug(
"--\t\t_versioned_checks '%s' '%s'\tresult: '%s'",
ref1, ref2, result,
)
return result
def reference_check(ref1, ref2, ds1, ds2, **weights):
"""For two references, de-reference the object and perform object-based
semantic equivalence. The score influences the result of an edge check."""
type1, type2 = ref1.split("--")[0], ref2.split("--")[0]
result = 0.0
if type1 == type2:
if weights["_internal"]["versioning_checks"]:
result = _versioned_checks(ref1, ref2, ds1, ds2, **weights) / 100.0
else:
o1, o2 = ds1.get(ref1), ds2.get(ref2)
if o1 and o2:
result = semantically_equivalent(o1, o2, **weights) / 100.0
logger.debug(
"--\t\treference_check '%s' '%s'\tresult: '%s'",
ref1, ref2, result,
)
return result
def list_reference_check(refs1, refs2, ds1, ds2, **weights):
"""For objects that contain multiple references (i.e., object_refs) perform
the same de-reference procedure and perform object-based semantic equivalence.
The score influences the objects containing these references. The result is
weighted on the amount of unique objects that could 1) be de-referenced 2) """
results = {}
if len(refs1) >= len(refs2):
l1 = refs1
l2 = refs2
b1 = ds1
b2 = ds2
else:
l1 = refs2
l2 = refs1
b1 = ds2
b2 = ds1
l1.sort()
l2.sort()
for ref1 in l1:
for ref2 in l2:
type1, type2 = ref1.split("--")[0], ref2.split("--")[0]
if type1 == type2:
score = reference_check(ref1, ref2, b1, b2, **weights) * 100.0
if ref1 not in results:
results[ref1] = {"matched": ref2, "value": score}
elif score > results[ref1]["value"]:
results[ref1] = {"matched": ref2, "value": score}
result = 0.0
total_sum = sum(x["value"] for x in results.values())
max_score = len(results) * 100.0
if max_score > 0:
result = total_sum / max_score
logger.debug(
"--\t\tlist_reference_check '%s' '%s'\ttotal_sum: '%s'\tmax_score: '%s'\tresult: '%s'",
refs1, refs2, total_sum, max_score, result,
)
return result
# default weights used for the semantic equivalence process
WEIGHTS = {
"attack-pattern": {
"name": (30, partial_string_based),
"external_references": (70, partial_external_reference_based),
},
"campaign": {
"name": (60, partial_string_based),
"aliases": (40, partial_list_based),
},
"course-of-action": {
"name": (60, partial_string_based),
"external_references": (40, partial_external_reference_based),
},
"identity": {
"name": (60, partial_string_based),
"identity_class": (20, exact_match),
"sectors": (20, partial_list_based),
},
"indicator": {
"indicator_types": (15, partial_list_based),
"pattern": (80, custom_pattern_based),
"valid_from": (5, partial_timestamp_based),
"tdelta": 1, # One day interval
},
"intrusion-set": {
"name": (20, partial_string_based),
"external_references": (60, partial_external_reference_based),
"aliases": (20, partial_list_based),
},
"location": {
"longitude_latitude": (34, partial_location_distance),
"region": (33, exact_match),
"country": (33, exact_match),
"threshold": 1000.0,
},
"malware": {
"malware_types": (20, partial_list_based),
"name": (80, partial_string_based),
},
"marking-definition": {
"name": (20, exact_match),
"definition": (60, exact_match),
"definition_type": (20, exact_match),
},
"threat-actor": {
"name": (60, partial_string_based),
"threat_actor_types": (20, partial_list_based),
"aliases": (20, partial_list_based),
},
"tool": {
"tool_types": (20, partial_list_based),
"name": (80, partial_string_based),
},
"vulnerability": {
"name": (30, partial_string_based),
"external_references": (70, partial_external_reference_based),
},
"_internal": {
"ignore_spec_version": False,
},
} #: :autodoc-skip: