wrote all default weights, actually computing the equivalence score

logging for unsupported objects, finished implementing some methods. Missing to implement patterning.
master
Emmanuelle Vargas-Gonzalez 2019-09-10 15:04:07 -04:00
parent 93aa709b68
commit 6fa77adfe3
1 changed files with 205 additions and 33 deletions

View File

@ -1,10 +1,14 @@
"""Python STIX2 Environment API.""" """Python STIX2 Environment API."""
import copy import copy
import logging
import math
from .core import parse as _parse from .core import parse as _parse
from .datastore import CompositeDataSource, DataStoreMixin from .datastore import CompositeDataSource, DataStoreMixin
logger = logging.getLogger(__name__)
class ObjectFactory(object): class ObjectFactory(object):
"""Easily create STIX objects with default values for certain properties. """Easily create STIX objects with default values for certain properties.
@ -187,13 +191,16 @@ class Environment(DataStoreMixin):
else: else:
return None return None
def semantically_equivalent(self, obj1, obj2): @staticmethod
def semantically_equivalent(obj1, obj2, **weight_dict):
"""This method is meant to verify if two objects of the same type are """This method is meant to verify if two objects of the same type are
semantically equivalent. semantically equivalent.
Args: Args:
obj1: A stix2 object instance obj1: A stix2 object instance
obj2: A stix2 object instance obj2: A stix2 object instance
weight_dict: A dictionary that can be used to override settings
in the semantic equivalence process
Returns: Returns:
float: A number between 0.0 and 1.0 as a measurement of equivalence. float: A number between 0.0 and 1.0 as a measurement of equivalence.
@ -206,7 +213,58 @@ class Environment(DataStoreMixin):
see `the Committee Note <link here>`__. see `the Committee Note <link here>`__.
""" """
equivalence_score = 0.0 # default weights used for the semantic equivalence process
weigths = {
"attack-pattern": {
"name": 30,
"external_references": 70,
},
"campaign": {
"name": 60,
"aliases": 40,
},
"identity": {
"name": 60,
"identity_class": 20,
"sectors": 20,
},
"indicator": {
"indicator_types": 15,
"pattern": 80,
"valid_from": 5,
},
"location": {
"longitude_latitude": 34,
"region": 33,
"country": 33,
},
"malware": {
"malware_types": 20,
"name": 80,
},
"threat-actor": {
"name": 60,
"threat_actor_types": 20,
"aliases": 20,
},
"tool": {
"tool_types": 20,
"name": 80,
},
"vulnerability": {
"name": 30,
"external_references": 70,
},
"_internal": {
"tdelta": 1,
},
}
if weight_dict:
weigths.update(weight_dict)
matching_score = 0.0
sum_weights = 0.0
type1, type2 = obj1["type"], obj2["type"] type1, type2 = obj1["type"], obj2["type"]
if type1 != type2: if type1 != type2:
@ -217,61 +275,132 @@ class Environment(DataStoreMixin):
if type1 == "attack-pattern": if type1 == "attack-pattern":
if _check_property_present("name", obj1, obj2): if _check_property_present("name", obj1, obj2):
_partial_string_based(obj1["name"], obj2["name"]) w = weigths["attack-pattern"]["name"]
sum_weights += w
matching_score += w * _partial_string_based(obj1["name"], obj2["name"])
if _check_property_present("external_references", obj1, obj2): if _check_property_present("external_references", obj1, obj2):
_partial_external_reference_based(obj1["external_references"], obj2["external_references"]) w = weigths["attack-pattern"]["external_references"]
sum_weights += w
matching_score += (
w *
_partial_external_reference_based(obj1["external_references"], obj2["external_references"])
)
elif type1 == "campaign": elif type1 == "campaign":
if _check_property_present("name", obj1, obj2): if _check_property_present("name", obj1, obj2):
_partial_string_based(obj1["name"], obj2["name"]) w = weigths["campaign"]["name"]
sum_weights += w
matching_score += w * _partial_string_based(obj1["name"], obj2["name"])
if _check_property_present("aliases", obj1, obj2): if _check_property_present("aliases", obj1, obj2):
_partial_list_based(obj1["aliases"], obj2["aliases"]) w = weigths["campaign"]["aliases"]
sum_weights += w
matching_score += w * _partial_list_based(obj1["aliases"], obj2["aliases"])
elif type1 == "course-of-action": elif type1 == "course-of-action":
pass logger.warning("%s type is not supported for semantic equivalence", type1)
elif type1 == "identity": elif type1 == "identity":
if _check_property_present("name", obj1, obj2): if _check_property_present("name", obj1, obj2):
_exact_match(obj1["name"], obj2["name"]) w = weigths["identity"]["name"]
sum_weights += w
matching_score += w * _exact_match(obj1["name"], obj2["name"])
if _check_property_present("identity_class", obj1, obj2): if _check_property_present("identity_class", obj1, obj2):
_exact_match(obj1["identity_class"], obj2["identity_class"]) w = weigths["identity"]["identity_class"]
sum_weights += w
matching_score += w * _exact_match(obj1["identity_class"], obj2["identity_class"])
if _check_property_present("sectors", obj1, obj2): if _check_property_present("sectors", obj1, obj2):
_partial_list_based(obj1["sectors"], obj2["sectors"]) w = weigths["identity"]["sectors"]
sum_weights += w
matching_score += w * _partial_list_based(obj1["sectors"], obj2["sectors"])
elif type1 == "indicator": elif type1 == "indicator":
if _check_property_present("indicator_types", obj1, obj2): if _check_property_present("indicator_types", obj1, obj2):
_partial_list_based(obj1["indicator_types"], obj2["indicator_types"]) w = weigths["indicator"]["indicator_types"]
sum_weights += w
matching_score += w * _partial_list_based(obj1["indicator_types"], obj2["indicator_types"])
if _check_property_present("pattern", obj1, obj2): if _check_property_present("pattern", obj1, obj2):
pass # TODO: needs to be done w = weigths["indicator"]["pattern"]
sum_weights += w
matching_score += w * _custom_pattern_based(obj1["pattern"], obj2["pattern"])
if _check_property_present("valid_from", obj1, obj2): if _check_property_present("valid_from", obj1, obj2):
_partial_timestamp_based(obj1["valid_from"], obj2["valid_from"]) w = weigths["indicator"]["valid_from"]
sum_weights += w
matching_score += (
w *
_partial_timestamp_based(obj1["valid_from"], obj2["valid_from"], weigths["_internal"]["tdelta"])
)
elif type1 == "instrusion-set": elif type1 == "instrusion-set":
pass logger.warning("%s type is not supported for semantic equivalence", type1)
elif type1 == "location": elif type1 == "location":
pass if _check_property_present("latitude", obj1, obj2) and _check_property_present("longitude", obj1, obj2):
w = weigths["location"]["longitude_latitude"]
sum_weights += w
matching_score += (
w *
_partial_location_distance(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"])
)
if _check_property_present("region", obj1, obj2):
w = weigths["location"]["region"]
sum_weights += w
matching_score += w * _exact_match(obj1["region"], obj2["region"])
if _check_property_present("country", obj1, obj2):
w = weigths["location"]["country"]
sum_weights += w
matching_score += w * _exact_match(obj1["country"], obj2["country"])
elif type1 == "malware": elif type1 == "malware":
if _check_property_present("malware_types", obj1, obj2): if _check_property_present("malware_types", obj1, obj2):
_partial_list_based(obj1["malware_types"], obj2["malware_types"]) w = weigths["malware"]["malware_types"]
sum_weights += w
matching_score += w * _partial_list_based(obj1["malware_types"], obj2["malware_types"])
if _check_property_present("name", obj1, obj2): if _check_property_present("name", obj1, obj2):
_partial_string_based(obj1["name"], obj2["name"]) w = weigths["malware"]["name"]
sum_weights += w
matching_score += w * _partial_string_based(obj1["name"], obj2["name"])
elif type1 == "observed-data": elif type1 == "observed-data":
pass logger.warning("%s type is not supported for semantic equivalence", type1)
elif type1 == "report": elif type1 == "report":
pass logger.warning("%s type is not supported for semantic equivalence", type1)
elif type1 == "threat-actor": elif type1 == "threat-actor":
if _check_property_present("name", obj1, obj2): if _check_property_present("name", obj1, obj2):
_partial_string_based(obj1["name"], obj2["name"]) w = weigths["threat-actor"]["name"]
sum_weights += w
matching_score += w * _partial_string_based(obj1["name"], obj2["name"])
if _check_property_present("threat_actor_types", obj1, obj2): if _check_property_present("threat_actor_types", obj1, obj2):
_partial_list_based(obj1["threat_actor_types"], obj2["threat_actor_types"]) w = weigths["threat-actor"]["threat_actor_types"]
sum_weights += w
matching_score += w * _partial_list_based(obj1["threat_actor_types"], obj2["threat_actor_types"])
if _check_property_present("aliases", obj1, obj2): if _check_property_present("aliases", obj1, obj2):
_partial_list_based(obj1["aliases"], obj2["aliases"]) w = weigths["threat-actor"]["aliases"]
sum_weights += w
matching_score += w * _partial_list_based(obj1["aliases"], obj2["aliases"])
elif type1 == "tool": elif type1 == "tool":
if _check_property_present("tool_types", obj1, obj2): if _check_property_present("tool_types", obj1, obj2):
_partial_list_based(obj1["tool_types"], obj2["tool_types"]) w = weigths["tool"]["tool_types"]
sum_weights += w
matching_score += w * _partial_list_based(obj1["tool_types"], obj2["tool_types"])
if _check_property_present("name", obj1, obj2): if _check_property_present("name", obj1, obj2):
_partial_string_based(obj1["name"], obj2["name"]) w = weigths["tool"]["name"]
sum_weights += w
matching_score += w * _partial_string_based(obj1["name"], obj2["name"])
elif type1 == "vulnerability": elif type1 == "vulnerability":
if _check_property_present("name", obj1, obj2): if _check_property_present("name", obj1, obj2):
_partial_string_based(obj1["name"], obj2["name"]) w = weigths["vulnerability"]["name"]
sum_weights += w
matching_score += w * _partial_string_based(obj1["name"], obj2["name"])
if _check_property_present("external_references", obj1, obj2): if _check_property_present("external_references", obj1, obj2):
_partial_external_reference_based(obj1["external_references"], obj2["external_references"]) w = weigths["vulnerability"]["external_references"]
# TODO: need to actually calculate the value sum_weights += w
matching_score += w * _partial_external_reference_based(obj1["external_references"], obj2["external_references"])
equivalence_score = (matching_score / sum_weights) * 100.0
return equivalence_score return equivalence_score
@ -281,16 +410,15 @@ def _check_property_present(prop, obj1, obj2):
return False return False
def _partial_timestamp_based(t1, t2): def _partial_timestamp_based(t1, t2, tdelta):
from .utils import parse_into_datetime from .utils import parse_into_datetime
tdelta = 1 # One day...
stix_t1, stix_t2 = parse_into_datetime(t1), parse_into_datetime(t2) stix_t1, stix_t2 = parse_into_datetime(t1), parse_into_datetime(t2)
return 1 - min(abs(stix_t1.timestamp() - stix_t2.timestamp()) / (86400 * tdelta), 1) return 1 - min(abs(stix_t1.timestamp() - stix_t2.timestamp()) / (86400 * tdelta), 1)
def _partial_list_based(l1, l2): def _partial_list_based(l1, l2):
l1_set, l2_set = set(l1), set(l2) l1_set, l2_set = set(l1), set(l2)
return len(l1_set.intersection(l2_set)) / max(len(l1_set), len(l2_set)) return len(l1_set.intersection(l2_set)) / max(len(l1), len(l2))
def _exact_match(val1, val2): def _exact_match(val1, val2):
@ -304,9 +432,53 @@ def _partial_string_based(str1, str2):
return distance.get_jaro_distance(str1, str2) return distance.get_jaro_distance(str1, str2)
def _custom_pattern_based(pattern1, pattern2):
return 0 # TODO: Needs to be implemented
def _partial_external_reference_based(refs1, refs2): def _partial_external_reference_based(refs1, refs2):
pass # TODO: needs to be done allowed = set(("veris", "cve", "capec", "mitre-attack"))
matches = 0
if len(refs1) >= len(refs2):
l1 = refs1
l2 = refs2
else:
l1 = refs2
l2 = refs1
for ext_ref1 in l1:
for ext_ref2 in l2:
sn_match = False
ei_match = False
url_match = False
source_name = None
if _check_property_present("source_name", ext_ref1, ext_ref2):
if ext_ref1["source_name"] == ext_ref2["source_name"]:
source_name = ext_ref1["source_name"]
sn_match = True
if _check_property_present("external_id", ext_ref1, ext_ref2):
if ext_ref1["external_id"] == ext_ref2["external_id"]:
ei_match = True
if _check_property_present("url", ext_ref1, ext_ref2):
if ext_ref1["url"] == ext_ref2["url"]:
url_match = True
# Special case: if source_name is a STIX defined name and either
# external_id or url match then its a perfect match and other entries
# can be ignored.
if sn_match and (ei_match or url_match) and source_name in allowed:
return 1.0
# Regular check. If the source_name (not STIX-defined) or external_id or
# url matches then we consider the entry a match.
if (sn_match or ei_match or url_match) and source_name not in allowed:
matches += 1
return matches / max(len(refs1), len(refs2))
def _partial_location_distance(loc1, loc2): def _partial_location_distance(lat1, long1, lat2, long2):
pass # TODO: needs to be done distance = math.sqrt(((lat2 - lat1) ** 2) + ((long2 - long1) ** 2))
return 1 - (distance / 1000.0)