Merge branch 'master' of github.com:oasis-open/cti-python-stix2 into main

pull/1/head
chrisr3d 2020-10-28 19:42:51 +01:00
commit f0f8091516
43 changed files with 1151 additions and 392 deletions

3
.gitignore vendored
View File

@ -55,7 +55,8 @@ coverage.xml
# Sphinx documentation
docs/_build/
.ipynb_checkpoints
default_sem_eq_weights.rst
graph_default_sem_eq_weights.rst
object_default_sem_eq_weights.rst
# PyBuilder
target/

View File

@ -0,0 +1,5 @@
comparison
============================================
.. automodule:: stix2.equivalence.pattern.compare.comparison
:members:

View File

@ -0,0 +1,5 @@
observation
=============================================
.. automodule:: stix2.equivalence.pattern.compare.observation
:members:

View File

@ -0,0 +1,5 @@
compare
=================================
.. automodule:: stix2.equivalence.pattern.compare
:members:

View File

@ -0,0 +1,5 @@
transform
===================================
.. automodule:: stix2.equivalence.pattern.transform
:members:

View File

@ -0,0 +1,5 @@
comparison
==============================================
.. automodule:: stix2.equivalence.pattern.transform.comparison
:members:

View File

@ -0,0 +1,5 @@
observation
===============================================
.. automodule:: stix2.equivalence.pattern.transform.observation
:members:

View File

@ -0,0 +1,5 @@
specials
============================================
.. automodule:: stix2.equivalence.pattern.transform.specials
:members:

View File

@ -1,5 +0,0 @@
comparison
==============
.. automodule:: stix2.equivalence.patterns.compare.comparison
:members:

View File

@ -1,5 +0,0 @@
observation
==============
.. automodule:: stix2.equivalence.patterns.compare.observation
:members:

View File

@ -1,5 +0,0 @@
comparison
==============
.. automodule:: stix2.equivalence.patterns.transform.comparison
:members:

View File

@ -1,5 +0,0 @@
observation
==============
.. automodule:: stix2.equivalence.patterns.transform.observation
:members:

View File

@ -1,5 +0,0 @@
specials
==============
.. automodule:: stix2.equivalence.patterns.transform.specials
:members:

View File

@ -0,0 +1,5 @@
graph
=======================
.. automodule:: stix2.equivalence.graph
:members:

View File

@ -0,0 +1,5 @@
object
========================
.. automodule:: stix2.equivalence.object
:members:

View File

@ -0,0 +1,5 @@
pattern
==============
.. automodule:: stix2.equivalence.pattern
:members:

View File

@ -1,5 +0,0 @@
patterns
==============
.. automodule:: stix2.equivalence.patterns
:members:

View File

@ -8,7 +8,8 @@ from six import class_types
from sphinx.ext.autodoc import ClassDocumenter
from stix2.base import _STIXBase
from stix2.environment import WEIGHTS
from stix2.equivalence.graph import GRAPH_WEIGHTS
from stix2.equivalence.object import WEIGHTS
from stix2.version import __version__
sys.path.insert(0, os.path.abspath('..'))
@ -62,12 +63,19 @@ latex_documents = [
]
# Add a formatted version of environment.WEIGHTS
default_sem_eq_weights = json.dumps(WEIGHTS, indent=4, default=lambda o: o.__name__)
default_sem_eq_weights = default_sem_eq_weights.replace('\n', '\n ')
default_sem_eq_weights = default_sem_eq_weights.replace(' "', ' ')
default_sem_eq_weights = default_sem_eq_weights.replace('"\n', '\n')
with open('default_sem_eq_weights.rst', 'w') as f:
f.write(".. code-block:: py\n\n {}\n\n".format(default_sem_eq_weights))
object_default_sem_eq_weights = json.dumps(WEIGHTS, indent=4, default=lambda o: o.__name__)
object_default_sem_eq_weights = object_default_sem_eq_weights.replace('\n', '\n ')
object_default_sem_eq_weights = object_default_sem_eq_weights.replace(' "', ' ')
object_default_sem_eq_weights = object_default_sem_eq_weights.replace('"\n', '\n')
with open('object_default_sem_eq_weights.rst', 'w') as f:
f.write(".. code-block:: python\n\n {}\n\n".format(object_default_sem_eq_weights))
graph_default_sem_eq_weights = json.dumps(GRAPH_WEIGHTS, indent=4, default=lambda o: o.__name__)
graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace('\n', '\n ')
graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace(' "', ' ')
graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace('"\n', '\n')
with open('graph_default_sem_eq_weights.rst', 'w') as f:
f.write(".. code-block:: python\n\n {}\n\n".format(graph_default_sem_eq_weights))
def get_property_type(prop):

View File

@ -2165,15 +2165,17 @@
"The weights dictionary should contain both the weight and the comparison function for each property. You may use the default weights and functions, or provide your own.\n",
"\n",
"##### Existing comparison functions\n",
"For reference, here is a list of the comparison functions already built in the codebase (found in [stix2/environment.py](../api/stix2.environment.rst#stix2.environment.Environment)):\n",
"For reference, here is a list of the comparison functions already built in the codebase (found in [stix2/equivalence/object](../api/equivalence/stix2.equivalence.object.rst#module-stix2.equivalence.object)):\n",
"\n",
" - [custom_pattern_based](../api/stix2.environment.rst#stix2.environment.custom_pattern_based)\n",
" - [exact_match](../api/stix2.environment.rst#stix2.environment.exact_match)\n",
" - [partial_external_reference_based](../api/stix2.environment.rst#stix2.environment.partial_external_reference_based)\n",
" - [partial_list_based](../api/stix2.environment.rst#stix2.environment.partial_list_based)\n",
" - [partial_location_distance](../api/stix2.environment.rst#stix2.environment.partial_location_distance)\n",
" - [partial_string_based](../api/stix2.environment.rst#stix2.environment.partial_string_based)\n",
" - [partial_timestamp_based](../api/stix2.environment.rst#stix2.environment.partial_timestamp_based)\n",
" - [custom_pattern_based](../api/equivalence/stix2.equivalence.object.rst#stix2.equivalence.object.custom_pattern_based)\n",
" - [exact_match](../api/equivalence/stix2.equivalence.object.rst#stix2.equivalence.object.exact_match)\n",
" - [list_reference_check](../api/equivalence/stix2.equivalence.object.rst#stix2.equivalence.object.list_reference_check)\n",
" - [partial_external_reference_based](../api/equivalence/stix2.equivalence.object.rst#stix2.equivalence.object.partial_external_reference_based)\n",
" - [partial_list_based](../api/equivalence/stix2.equivalence.object.rst#stix2.equivalence.object.partial_list_based)\n",
" - [partial_location_distance](../api/equivalence/stix2.equivalence.object.rst#stix2.equivalence.object.partial_location_distance)\n",
" - [partial_string_based](../api/equivalence/stix2.equivalence.object.rst#stix2.equivalence.object.partial_string_based)\n",
" - [partial_timestamp_based](../api/equivalence/stix2.equivalence.object.rst#stix2.equivalence.object.partial_timestamp_based)\n",
" - [reference_check](../api/equivalence/stix2.equivalence.object.rst#stix2.equivalence.object.reference_check)\n",
"\n",
"For instance, if we wanted to compare two of the `ThreatActor`s from before, but use our own weights, then we could do the following:"
]

View File

@ -1,5 +1,6 @@
bumpversion
ipython
nbconvert<6
nbsphinx==0.4.3
pre-commit
pygments<3,>=2.4.1

View File

@ -481,14 +481,14 @@ class CompositeDataSource(DataSource):
if data:
all_data.append(data)
# remove duplicate versions
if len(all_data) > 0:
all_data = deduplicate(all_data)
else:
return None
# Search for latest version
stix_obj = latest_ver = None
for obj in all_data:
ver = obj.get("modified") or obj.get("created")
# reduce to most recent version
stix_obj = sorted(all_data, key=lambda k: k['modified'], reverse=True)[0]
if stix_obj is None or ver is None or ver > latest_ver:
stix_obj = obj
latest_ver = ver
return stix_obj

View File

@ -1,13 +1,18 @@
"""Python STIX2 Environment API."""
import copy
import logging
import time
from .datastore import CompositeDataSource, DataStoreMixin
from .equivalence.graph import graphically_equivalent
from .equivalence.object import ( # noqa: F401
WEIGHTS, check_property_present, custom_pattern_based, exact_match,
list_reference_check, partial_external_reference_based, partial_list_based,
partial_location_distance, partial_string_based, partial_timestamp_based,
reference_check, semantically_equivalent,
)
from .parsing import parse as _parse
from .utils import STIXdatetime, parse_into_datetime
logger = logging.getLogger(__name__)
# TODO: Remove all unused imports that now belong to the equivalence module in the next major release.
# Kept for backwards compatibility.
class ObjectFactory(object):
@ -193,7 +198,7 @@ class Environment(DataStoreMixin):
@staticmethod
def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
"""This method is meant to verify if two objects of the same type are
"""This method verifies if two objects of the same type are
semantically equivalent.
Args:
@ -208,315 +213,58 @@ class Environment(DataStoreMixin):
float: A number between 0.0 and 100.0 as a measurement of equivalence.
Warning:
Course of Action, Intrusion-Set, Observed-Data, Report are not supported
by this implementation. Indicator pattern check is also limited.
Object types need to have property weights defined for the equivalence process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.object` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weights_dict:
Default weight_dict:
.. include:: ../default_sem_eq_weights.rst
.. include:: ../object_default_sem_eq_weights.rst
Note:
This implementation follows the Committee Note on semantic equivalence.
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
weights = WEIGHTS.copy()
return semantically_equivalent(obj1, obj2, prop_scores, **weight_dict)
if weight_dict:
weights.update(weight_dict)
type1, type2 = obj1["type"], obj2["type"]
ignore_spec_version = weights["_internal"]["ignore_spec_version"]
if type1 != type2:
raise ValueError('The objects to compare must be of the same type!')
if ignore_spec_version is False and obj1.get("spec_version", "2.0") != obj2.get("spec_version", "2.0"):
raise ValueError('The objects to compare must be of the same spec version!')
try:
weights[type1]
except KeyError:
logger.warning("'%s' type has no 'weights' dict specified & thus no semantic equivalence method to call!", type1)
sum_weights = matching_score = 0
else:
try:
method = weights[type1]["method"]
except KeyError:
logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"])
matching_score = 0.0
sum_weights = 0.0
for prop in weights[type1]:
if check_property_present(prop, obj1, obj2) or prop == "longitude_latitude":
w = weights[type1][prop][0]
comp_funct = weights[type1][prop][1]
if comp_funct == partial_timestamp_based:
contributing_score = w * comp_funct(obj1[prop], obj2[prop], weights[type1]["tdelta"])
elif comp_funct == partial_location_distance:
threshold = weights[type1]["threshold"]
contributing_score = w * comp_funct(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], threshold)
else:
contributing_score = w * comp_funct(obj1[prop], obj2[prop])
sum_weights += w
matching_score += contributing_score
prop_scores[prop] = {
"weight": w,
"contributing_score": contributing_score,
}
logger.debug("'%s' check -- weight: %s, contributing score: %s", prop, w, contributing_score)
prop_scores["matching_score"] = matching_score
prop_scores["sum_weights"] = sum_weights
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
else:
logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"])
try:
matching_score, sum_weights = method(obj1, obj2, prop_scores, **weights[type1])
except TypeError:
# method doesn't support detailed output with prop_scores
matching_score, sum_weights = method(obj1, obj2, **weights[type1])
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
if sum_weights <= 0:
return 0
equivalence_score = (matching_score / sum_weights) * 100.0
return equivalence_score
def check_property_present(prop, obj1, obj2):
"""Helper method checks if a property is present on both objects."""
if prop in obj1 and prop in obj2:
return True
return False
def partial_timestamp_based(t1, t2, tdelta):
"""Performs a timestamp-based matching via checking how close one timestamp is to another.
@staticmethod
def graphically_equivalent(ds1, ds2, prop_scores={}, **weight_dict):
"""This method verifies if two graphs are semantically equivalent.
Each DataStore can contain a connected or disconnected graph and the
final result is weighted over the amount of objects we managed to compare.
This approach builds on top of the object-based semantic equivalence process
and each comparison can return a value between 0 and 100.
Args:
t1: A datetime string or STIXdatetime object.
t2: A datetime string or STIXdatetime object.
tdelta (float): A given time delta. This number is multiplied by 86400 (1 day) to
extend or shrink your time change tolerance.
ds1: A DataStore object instance representing your graph
ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the semantic equivalence process
Returns:
float: Number between 0.0 and 1.0 depending on match criteria.
float: A number between 0.0 and 100.0 as a measurement of equivalence.
Warning:
Object types need to have property weights defined for the equivalence process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.graph` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weight_dict:
.. include:: ../graph_default_sem_eq_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
if not isinstance(t1, STIXdatetime):
t1 = parse_into_datetime(t1)
if not isinstance(t2, STIXdatetime):
t2 = parse_into_datetime(t2)
t1, t2 = time.mktime(t1.timetuple()), time.mktime(t2.timetuple())
result = 1 - min(abs(t1 - t2) / (86400 * tdelta), 1)
logger.debug("--\t\tpartial_timestamp_based '%s' '%s' tdelta: '%s'\tresult: '%s'", t1, t2, tdelta, result)
return result
def partial_list_based(l1, l2):
"""Performs a partial list matching via finding the intersection between common values.
Args:
l1: A list of values.
l2: A list of values.
Returns:
float: 1.0 if the value matches exactly, 0.0 otherwise.
"""
l1_set, l2_set = set(l1), set(l2)
result = len(l1_set.intersection(l2_set)) / max(len(l1), len(l2))
logger.debug("--\t\tpartial_list_based '%s' '%s'\tresult: '%s'", l1, l2, result)
return result
def exact_match(val1, val2):
"""Performs an exact value match based on two values
Args:
val1: A value suitable for an equality test.
val2: A value suitable for an equality test.
Returns:
float: 1.0 if the value matches exactly, 0.0 otherwise.
"""
result = 0.0
if val1 == val2:
result = 1.0
logger.debug("--\t\texact_match '%s' '%s'\tresult: '%s'", val1, val2, result)
return result
def partial_string_based(str1, str2):
"""Performs a partial string match using the Jaro-Winkler distance algorithm.
Args:
str1: A string value to check.
str2: A string value to check.
Returns:
float: Number between 0.0 and 1.0 depending on match criteria.
"""
from rapidfuzz import fuzz
result = fuzz.token_sort_ratio(str1, str2)
logger.debug("--\t\tpartial_string_based '%s' '%s'\tresult: '%s'", str1, str2, result)
return result / 100.0
def custom_pattern_based(pattern1, pattern2):
"""Performs a matching on Indicator Patterns.
Args:
pattern1: An Indicator pattern
pattern2: An Indicator pattern
Returns:
float: Number between 0.0 and 1.0 depending on match criteria.
"""
logger.warning("Indicator pattern equivalence is not fully defined; will default to zero if not completely identical")
return exact_match(pattern1, pattern2) # TODO: Implement pattern based equivalence
def partial_external_reference_based(refs1, refs2):
"""Performs a matching on External References.
Args:
refs1: A list of external references.
refs2: A list of external references.
Returns:
float: Number between 0.0 and 1.0 depending on matches.
"""
allowed = set(("veris", "cve", "capec", "mitre-attack"))
matches = 0
if len(refs1) >= len(refs2):
l1 = refs1
l2 = refs2
else:
l1 = refs2
l2 = refs1
for ext_ref1 in l1:
for ext_ref2 in l2:
sn_match = False
ei_match = False
url_match = False
source_name = None
if check_property_present("source_name", ext_ref1, ext_ref2):
if ext_ref1["source_name"] == ext_ref2["source_name"]:
source_name = ext_ref1["source_name"]
sn_match = True
if check_property_present("external_id", ext_ref1, ext_ref2):
if ext_ref1["external_id"] == ext_ref2["external_id"]:
ei_match = True
if check_property_present("url", ext_ref1, ext_ref2):
if ext_ref1["url"] == ext_ref2["url"]:
url_match = True
# Special case: if source_name is a STIX defined name and either
# external_id or url match then its a perfect match and other entries
# can be ignored.
if sn_match and (ei_match or url_match) and source_name in allowed:
result = 1.0
logger.debug(
"--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
refs1, refs2, result,
)
return result
# Regular check. If the source_name (not STIX-defined) or external_id or
# url matches then we consider the entry a match.
if (sn_match or ei_match or url_match) and source_name not in allowed:
matches += 1
result = matches / max(len(refs1), len(refs2))
logger.debug(
"--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
refs1, refs2, result,
)
return result
def partial_location_distance(lat1, long1, lat2, long2, threshold):
"""Given two coordinates perform a matching based on its distance using the Haversine Formula.
Args:
lat1: Latitude value for first coordinate point.
lat2: Latitude value for second coordinate point.
long1: Longitude value for first coordinate point.
long2: Longitude value for second coordinate point.
threshold (float): A kilometer measurement for the threshold distance between these two points.
Returns:
float: Number between 0.0 and 1.0 depending on match.
"""
from haversine import Unit, haversine
distance = haversine((lat1, long1), (lat2, long2), unit=Unit.KILOMETERS)
result = 1 - (distance / threshold)
logger.debug(
"--\t\tpartial_location_distance '%s' '%s' threshold: '%s'\tresult: '%s'",
(lat1, long1), (lat2, long2), threshold, result,
)
return result
# default weights used for the semantic equivalence process
WEIGHTS = {
"attack-pattern": {
"name": (30, partial_string_based),
"external_references": (70, partial_external_reference_based),
},
"campaign": {
"name": (60, partial_string_based),
"aliases": (40, partial_list_based),
},
"identity": {
"name": (60, partial_string_based),
"identity_class": (20, exact_match),
"sectors": (20, partial_list_based),
},
"indicator": {
"indicator_types": (15, partial_list_based),
"pattern": (80, custom_pattern_based),
"valid_from": (5, partial_timestamp_based),
"tdelta": 1, # One day interval
},
"location": {
"longitude_latitude": (34, partial_location_distance),
"region": (33, exact_match),
"country": (33, exact_match),
"threshold": 1000.0,
},
"malware": {
"malware_types": (20, partial_list_based),
"name": (80, partial_string_based),
},
"threat-actor": {
"name": (60, partial_string_based),
"threat_actor_types": (20, partial_list_based),
"aliases": (20, partial_list_based),
},
"tool": {
"tool_types": (20, partial_list_based),
"name": (80, partial_string_based),
},
"vulnerability": {
"name": (30, partial_string_based),
"external_references": (70, partial_external_reference_based),
},
"_internal": {
"ignore_spec_version": False,
},
} #: :autodoc-skip:
return graphically_equivalent(ds1, ds2, prop_scores, **weight_dict)

View File

@ -3,7 +3,9 @@
.. autosummary::
:toctree: equivalence
patterns
pattern
graph
object
|
"""

View File

@ -0,0 +1,137 @@
"""Python APIs for STIX 2 Graph-based Semantic Equivalence."""
import logging
from ..object import (
WEIGHTS, exact_match, list_reference_check, partial_string_based,
partial_timestamp_based, reference_check, semantically_equivalent,
)
logger = logging.getLogger(__name__)
def graphically_equivalent(ds1, ds2, prop_scores={}, **weight_dict):
"""This method verifies if two graphs are semantically equivalent.
Each DataStore can contain a connected or disconnected graph and the
final result is weighted over the amount of objects we managed to compare.
This approach builds on top of the object-based semantic equivalence process
and each comparison can return a value between 0 and 100.
Args:
ds1: A DataStore object instance representing your graph
ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the semantic equivalence process
Returns:
float: A number between 0.0 and 100.0 as a measurement of equivalence.
Warning:
Object types need to have property weights defined for the equivalence process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.graph` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weights_dict:
.. include:: ../../graph_default_sem_eq_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
weights = GRAPH_WEIGHTS.copy()
if weight_dict:
weights.update(weight_dict)
results = {}
depth = weights["_internal"]["max_depth"]
graph1 = ds1.query([])
graph2 = ds2.query([])
graph1.sort(key=lambda x: x["type"])
graph2.sort(key=lambda x: x["type"])
if len(graph1) < len(graph2):
weights["_internal"]["ds1"] = ds1
weights["_internal"]["ds2"] = ds2
g1 = graph1
g2 = graph2
else:
weights["_internal"]["ds1"] = ds2
weights["_internal"]["ds2"] = ds1
g1 = graph2
g2 = graph1
for object1 in g1:
for object2 in g2:
if object1["type"] == object2["type"] and object1["type"] in weights:
iprop_score = {}
result = semantically_equivalent(object1, object2, iprop_score, **weights)
objects1_id = object1["id"]
weights["_internal"]["max_depth"] = depth
if objects1_id not in results:
results[objects1_id] = {"matched": object2["id"], "prop_score": iprop_score, "value": result}
elif result > results[objects1_id]["value"]:
results[objects1_id] = {"matched": object2["id"], "prop_score": iprop_score, "value": result}
equivalence_score = 0
matching_score = sum(x["value"] for x in results.values())
sum_weights = len(results) * 100.0
if sum_weights > 0:
equivalence_score = (matching_score / sum_weights) * 100
prop_scores["matching_score"] = matching_score
prop_scores["sum_weights"] = sum_weights
prop_scores["summary"] = results
logger.debug(
"DONE\t\tSUM_WEIGHT: %.2f\tMATCHING_SCORE: %.2f\t SCORE: %.2f",
sum_weights,
matching_score,
equivalence_score,
)
return equivalence_score
# default weights used for the graph semantic equivalence process
GRAPH_WEIGHTS = WEIGHTS.copy()
GRAPH_WEIGHTS.update({
"grouping": {
"name": (20, partial_string_based),
"context": (20, partial_string_based),
"object_refs": (60, list_reference_check),
},
"relationship": {
"relationship_type": (20, exact_match),
"source_ref": (40, reference_check),
"target_ref": (40, reference_check),
},
"report": {
"name": (30, partial_string_based),
"published": (10, partial_timestamp_based),
"object_refs": (60, list_reference_check),
"tdelta": 1, # One day interval
},
"sighting": {
"first_seen": (5, partial_timestamp_based),
"last_seen": (5, partial_timestamp_based),
"sighting_of_ref": (40, reference_check),
"observed_data_refs": (20, list_reference_check),
"where_sighted_refs": (20, list_reference_check),
"summary": (10, exact_match),
},
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"ds1": None,
"ds2": None,
"max_depth": 1,
},
}) # :autodoc-skip:

View File

@ -0,0 +1,452 @@
"""Python APIs for STIX 2 Object-based Semantic Equivalence."""
import logging
import time
from ...datastore import Filter
from ...utils import STIXdatetime, parse_into_datetime
logger = logging.getLogger(__name__)
def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
"""This method verifies if two objects of the same type are
semantically equivalent.
Args:
obj1: A stix2 object instance
obj2: A stix2 object instance
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the semantic equivalence process
Returns:
float: A number between 0.0 and 100.0 as a measurement of equivalence.
Warning:
Object types need to have property weights defined for the equivalence process.
Otherwise, those objects will not influence the final score. The WEIGHTS
dictionary under `stix2.equivalence.object` can give you an idea on how to add
new entries and pass them via the `weight_dict` argument. Similarly, the values
or methods can be fine tuned for a particular use case.
Note:
Default weights_dict:
.. include:: ../../object_default_sem_eq_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
weights = WEIGHTS.copy()
if weight_dict:
weights.update(weight_dict)
type1, type2 = obj1["type"], obj2["type"]
ignore_spec_version = weights["_internal"]["ignore_spec_version"]
if type1 != type2:
raise ValueError('The objects to compare must be of the same type!')
if ignore_spec_version is False and obj1.get("spec_version", "2.0") != obj2.get("spec_version", "2.0"):
raise ValueError('The objects to compare must be of the same spec version!')
try:
weights[type1]
except KeyError:
logger.warning("'%s' type has no 'weights' dict specified & thus no semantic equivalence method to call!", type1)
sum_weights = matching_score = 0
else:
try:
method = weights[type1]["method"]
except KeyError:
logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"])
matching_score = 0.0
sum_weights = 0.0
for prop in weights[type1]:
if check_property_present(prop, obj1, obj2) or prop == "longitude_latitude":
w = weights[type1][prop][0]
comp_funct = weights[type1][prop][1]
if comp_funct == partial_timestamp_based:
contributing_score = w * comp_funct(obj1[prop], obj2[prop], weights[type1]["tdelta"])
elif comp_funct == partial_location_distance:
threshold = weights[type1]["threshold"]
contributing_score = w * comp_funct(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], threshold)
elif comp_funct == reference_check or comp_funct == list_reference_check:
max_depth = weights["_internal"]["max_depth"]
if max_depth < 0:
continue # prevent excessive recursion
else:
weights["_internal"]["max_depth"] -= 1
ds1, ds2 = weights["_internal"]["ds1"], weights["_internal"]["ds2"]
contributing_score = w * comp_funct(obj1[prop], obj2[prop], ds1, ds2, **weights)
else:
contributing_score = w * comp_funct(obj1[prop], obj2[prop])
sum_weights += w
matching_score += contributing_score
prop_scores[prop] = {
"weight": w,
"contributing_score": contributing_score,
}
logger.debug("'%s' check -- weight: %s, contributing score: %s", prop, w, contributing_score)
prop_scores["matching_score"] = matching_score
prop_scores["sum_weights"] = sum_weights
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
else:
logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"])
try:
matching_score, sum_weights = method(obj1, obj2, prop_scores, **weights[type1])
except TypeError:
# method doesn't support detailed output with prop_scores
matching_score, sum_weights = method(obj1, obj2, **weights[type1])
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
if sum_weights <= 0:
return 0
equivalence_score = (matching_score / sum_weights) * 100.0
return equivalence_score
def check_property_present(prop, obj1, obj2):
"""Helper method checks if a property is present on both objects."""
if prop in obj1 and prop in obj2:
return True
return False
def partial_timestamp_based(t1, t2, tdelta):
"""Performs a timestamp-based matching via checking how close one timestamp is to another.
Args:
t1: A datetime string or STIXdatetime object.
t2: A datetime string or STIXdatetime object.
tdelta (float): A given time delta. This number is multiplied by 86400 (1 day) to
extend or shrink your time change tolerance.
Returns:
float: Number between 0.0 and 1.0 depending on match criteria.
"""
if not isinstance(t1, STIXdatetime):
t1 = parse_into_datetime(t1)
if not isinstance(t2, STIXdatetime):
t2 = parse_into_datetime(t2)
t1, t2 = time.mktime(t1.timetuple()), time.mktime(t2.timetuple())
result = 1 - min(abs(t1 - t2) / (86400 * tdelta), 1)
logger.debug("--\t\tpartial_timestamp_based '%s' '%s' tdelta: '%s'\tresult: '%s'", t1, t2, tdelta, result)
return result
def partial_list_based(l1, l2):
"""Performs a partial list matching via finding the intersection between common values.
Args:
l1: A list of values.
l2: A list of values.
Returns:
float: 1.0 if the value matches exactly, 0.0 otherwise.
"""
l1_set, l2_set = set(l1), set(l2)
result = len(l1_set.intersection(l2_set)) / max(len(l1_set), len(l2_set))
logger.debug("--\t\tpartial_list_based '%s' '%s'\tresult: '%s'", l1, l2, result)
return result
def exact_match(val1, val2):
"""Performs an exact value match based on two values
Args:
val1: A value suitable for an equality test.
val2: A value suitable for an equality test.
Returns:
float: 1.0 if the value matches exactly, 0.0 otherwise.
"""
result = 0.0
if val1 == val2:
result = 1.0
logger.debug("--\t\texact_match '%s' '%s'\tresult: '%s'", val1, val2, result)
return result
def partial_string_based(str1, str2):
"""Performs a partial string match using the Jaro-Winkler distance algorithm.
Args:
str1: A string value to check.
str2: A string value to check.
Returns:
float: Number between 0.0 and 1.0 depending on match criteria.
"""
from rapidfuzz import fuzz
result = fuzz.token_sort_ratio(str1, str2)
logger.debug("--\t\tpartial_string_based '%s' '%s'\tresult: '%s'", str1, str2, result)
return result / 100.0
def custom_pattern_based(pattern1, pattern2):
"""Performs a matching on Indicator Patterns.
Args:
pattern1: An Indicator pattern
pattern2: An Indicator pattern
Returns:
float: Number between 0.0 and 1.0 depending on match criteria.
"""
logger.warning("Indicator pattern equivalence is not fully defined; will default to zero if not completely identical")
return exact_match(pattern1, pattern2) # TODO: Implement pattern based equivalence
def partial_external_reference_based(refs1, refs2):
"""Performs a matching on External References.
Args:
refs1: A list of external references.
refs2: A list of external references.
Returns:
float: Number between 0.0 and 1.0 depending on matches.
"""
allowed = {"veris", "cve", "capec", "mitre-attack"}
matches = 0
if len(refs1) >= len(refs2):
l1 = refs1
l2 = refs2
else:
l1 = refs2
l2 = refs1
for ext_ref1 in l1:
for ext_ref2 in l2:
sn_match = False
ei_match = False
url_match = False
source_name = None
if check_property_present("source_name", ext_ref1, ext_ref2):
if ext_ref1["source_name"] == ext_ref2["source_name"]:
source_name = ext_ref1["source_name"]
sn_match = True
if check_property_present("external_id", ext_ref1, ext_ref2):
if ext_ref1["external_id"] == ext_ref2["external_id"]:
ei_match = True
if check_property_present("url", ext_ref1, ext_ref2):
if ext_ref1["url"] == ext_ref2["url"]:
url_match = True
# Special case: if source_name is a STIX defined name and either
# external_id or url match then its a perfect match and other entries
# can be ignored.
if sn_match and (ei_match or url_match) and source_name in allowed:
result = 1.0
logger.debug(
"--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
refs1, refs2, result,
)
return result
# Regular check. If the source_name (not STIX-defined) or external_id or
# url matches then we consider the entry a match.
if (sn_match or ei_match or url_match) and source_name not in allowed:
matches += 1
result = matches / max(len(refs1), len(refs2))
logger.debug(
"--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
refs1, refs2, result,
)
return result
def partial_location_distance(lat1, long1, lat2, long2, threshold):
"""Given two coordinates perform a matching based on its distance using the Haversine Formula.
Args:
lat1: Latitude value for first coordinate point.
lat2: Latitude value for second coordinate point.
long1: Longitude value for first coordinate point.
long2: Longitude value for second coordinate point.
threshold (float): A kilometer measurement for the threshold distance between these two points.
Returns:
float: Number between 0.0 and 1.0 depending on match.
"""
from haversine import Unit, haversine
distance = haversine((lat1, long1), (lat2, long2), unit=Unit.KILOMETERS)
result = 1 - (distance / threshold)
logger.debug(
"--\t\tpartial_location_distance '%s' '%s' threshold: '%s'\tresult: '%s'",
(lat1, long1), (lat2, long2), threshold, result,
)
return result
def _versioned_checks(ref1, ref2, ds1, ds2, **weights):
"""Checks multiple object versions if present in graph.
Maximizes for the semantic equivalence score of a particular version."""
results = {}
objects1 = ds1.query([Filter("id", "=", ref1)])
objects2 = ds2.query([Filter("id", "=", ref2)])
if len(objects1) > 0 and len(objects2) > 0:
for o1 in objects1:
for o2 in objects2:
result = semantically_equivalent(o1, o2, **weights)
if ref1 not in results:
results[ref1] = {"matched": ref2, "value": result}
elif result > results[ref1]["value"]:
results[ref1] = {"matched": ref2, "value": result}
result = results.get(ref1, {}).get("value", 0.0)
logger.debug(
"--\t\t_versioned_checks '%s' '%s'\tresult: '%s'",
ref1, ref2, result,
)
return result
def reference_check(ref1, ref2, ds1, ds2, **weights):
"""For two references, de-reference the object and perform object-based
semantic equivalence. The score influences the result of an edge check."""
type1, type2 = ref1.split("--")[0], ref2.split("--")[0]
result = 0.0
if type1 == type2:
if weights["_internal"]["versioning_checks"]:
result = _versioned_checks(ref1, ref2, ds1, ds2, **weights) / 100.0
else:
o1, o2 = ds1.get(ref1), ds2.get(ref2)
if o1 and o2:
result = semantically_equivalent(o1, o2, **weights) / 100.0
logger.debug(
"--\t\treference_check '%s' '%s'\tresult: '%s'",
ref1, ref2, result,
)
return result
def list_reference_check(refs1, refs2, ds1, ds2, **weights):
"""For objects that contain multiple references (i.e., object_refs) perform
the same de-reference procedure and perform object-based semantic equivalence.
The score influences the objects containing these references. The result is
weighted on the amount of unique objects that could 1) be de-referenced 2) """
results = {}
if len(refs1) >= len(refs2):
l1 = refs1
l2 = refs2
b1 = ds1
b2 = ds2
else:
l1 = refs2
l2 = refs1
b1 = ds2
b2 = ds1
l1.sort()
l2.sort()
for ref1 in l1:
for ref2 in l2:
type1, type2 = ref1.split("--")[0], ref2.split("--")[0]
if type1 == type2:
score = reference_check(ref1, ref2, b1, b2, **weights) * 100.0
if ref1 not in results:
results[ref1] = {"matched": ref2, "value": score}
elif score > results[ref1]["value"]:
results[ref1] = {"matched": ref2, "value": score}
result = 0.0
total_sum = sum(x["value"] for x in results.values())
max_score = len(results) * 100.0
if max_score > 0:
result = total_sum / max_score
logger.debug(
"--\t\tlist_reference_check '%s' '%s'\ttotal_sum: '%s'\tmax_score: '%s'\tresult: '%s'",
refs1, refs2, total_sum, max_score, result,
)
return result
# default weights used for the semantic equivalence process
WEIGHTS = {
"attack-pattern": {
"name": (30, partial_string_based),
"external_references": (70, partial_external_reference_based),
},
"campaign": {
"name": (60, partial_string_based),
"aliases": (40, partial_list_based),
},
"course-of-action": {
"name": (60, partial_string_based),
"external_references": (40, partial_external_reference_based),
},
"identity": {
"name": (60, partial_string_based),
"identity_class": (20, exact_match),
"sectors": (20, partial_list_based),
},
"indicator": {
"indicator_types": (15, partial_list_based),
"pattern": (80, custom_pattern_based),
"valid_from": (5, partial_timestamp_based),
"tdelta": 1, # One day interval
},
"intrusion-set": {
"name": (20, partial_string_based),
"external_references": (60, partial_external_reference_based),
"aliases": (20, partial_list_based),
},
"location": {
"longitude_latitude": (34, partial_location_distance),
"region": (33, exact_match),
"country": (33, exact_match),
"threshold": 1000.0,
},
"malware": {
"malware_types": (20, partial_list_based),
"name": (80, partial_string_based),
},
"marking-definition": {
"name": (20, exact_match),
"definition": (60, exact_match),
"definition_type": (20, exact_match),
},
"threat-actor": {
"name": (60, partial_string_based),
"threat_actor_types": (20, partial_list_based),
"aliases": (20, partial_list_based),
},
"tool": {
"tool_types": (20, partial_list_based),
"name": (80, partial_string_based),
},
"vulnerability": {
"name": (30, partial_string_based),
"external_references": (70, partial_external_reference_based),
},
"_internal": {
"ignore_spec_version": False,
},
} # :autodoc-skip:

View File

@ -1,7 +1,7 @@
"""Python APIs for STIX 2 Pattern Semantic Equivalence.
.. autosummary::
:toctree: patterns
:toctree: pattern
compare
transform
@ -10,13 +10,13 @@
"""
import stix2
from stix2.equivalence.patterns.compare.observation import (
from stix2.equivalence.pattern.compare.observation import (
observation_expression_cmp,
)
from stix2.equivalence.patterns.transform import (
from stix2.equivalence.pattern.transform import (
ChainTransformer, SettleTransformer,
)
from stix2.equivalence.patterns.transform.observation import (
from stix2.equivalence.pattern.transform.observation import (
AbsorptionTransformer, CanonicalizeComparisonExpressionsTransformer,
DNFTransformer, FlattenTransformer, OrderDedupeTransformer,
)

View File

@ -4,7 +4,7 @@ Comparison utilities for STIX pattern comparison expressions.
import base64
import functools
from stix2.equivalence.patterns.compare import generic_cmp, iter_lex_cmp
from stix2.equivalence.pattern.compare import generic_cmp, iter_lex_cmp
from stix2.patterns import (
AndBooleanExpression, BinaryConstant, BooleanConstant, FloatConstant,
HexConstant, IntegerConstant, ListConstant, ListObjectPathComponent,

View File

@ -1,8 +1,8 @@
"""
Comparison utilities for STIX pattern observation expressions.
"""
from stix2.equivalence.patterns.compare import generic_cmp, iter_lex_cmp
from stix2.equivalence.patterns.compare.comparison import (
from stix2.equivalence.pattern.compare import generic_cmp, iter_lex_cmp
from stix2.equivalence.pattern.compare.comparison import (
comparison_expression_cmp, generic_constant_cmp,
)
from stix2.patterns import (

View File

@ -4,12 +4,12 @@ Transformation utilities for STIX pattern comparison expressions.
import functools
import itertools
from stix2.equivalence.patterns.compare import iter_in, iter_lex_cmp
from stix2.equivalence.patterns.compare.comparison import (
from stix2.equivalence.pattern.compare import iter_in, iter_lex_cmp
from stix2.equivalence.pattern.compare.comparison import (
comparison_expression_cmp,
)
from stix2.equivalence.patterns.transform import Transformer
from stix2.equivalence.patterns.transform.specials import (
from stix2.equivalence.pattern.transform import Transformer
from stix2.equivalence.pattern.transform.specials import (
ipv4_addr, ipv6_addr, windows_reg_key,
)
from stix2.patterns import (

View File

@ -4,23 +4,23 @@ Transformation utilities for STIX pattern observation expressions.
import functools
import itertools
from stix2.equivalence.patterns.compare import iter_in, iter_lex_cmp
from stix2.equivalence.patterns.compare.observation import (
from stix2.equivalence.pattern.compare import iter_in, iter_lex_cmp
from stix2.equivalence.pattern.compare.observation import (
observation_expression_cmp,
)
from stix2.equivalence.patterns.transform import (
from stix2.equivalence.pattern.transform import (
ChainTransformer, SettleTransformer, Transformer,
)
from stix2.equivalence.patterns.transform.comparison import (
from stix2.equivalence.pattern.transform.comparison import (
SpecialValueCanonicalization,
)
from stix2.equivalence.patterns.transform.comparison import \
from stix2.equivalence.pattern.transform.comparison import \
AbsorptionTransformer as CAbsorptionTransformer
from stix2.equivalence.patterns.transform.comparison import \
from stix2.equivalence.pattern.transform.comparison import \
DNFTransformer as CDNFTransformer
from stix2.equivalence.patterns.transform.comparison import \
from stix2.equivalence.pattern.transform.comparison import \
FlattenTransformer as CFlattenTransformer
from stix2.equivalence.patterns.transform.comparison import \
from stix2.equivalence.pattern.transform.comparison import \
OrderDedupeTransformer as COrderDedupeTransformer
from stix2.patterns import (
AndObservationExpression, FollowedByObservationExpression,

View File

@ -3,7 +3,7 @@ Some simple comparison expression canonicalization functions.
"""
import socket
from stix2.equivalence.patterns.compare.comparison import (
from stix2.equivalence.pattern.compare.comparison import (
object_path_to_raw_values,
)

View File

@ -1,6 +1,6 @@
import pytest
from stix2.equivalence.patterns import (
from stix2.equivalence.pattern import (
equivalent_patterns, find_equivalent_patterns,
)

View File

@ -1,6 +1,10 @@
import os
import pytest
import stix2
import stix2.equivalence.graph
import stix2.equivalence.object
from .constants import (
CAMPAIGN_ID, CAMPAIGN_KWARGS, FAKE_TIME, IDENTITY_ID, IDENTITY_KWARGS,
@ -8,6 +12,8 @@ from .constants import (
RELATIONSHIP_IDS,
)
FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
@pytest.fixture
def ds():
@ -18,7 +24,42 @@ def ds():
rel1 = stix2.v20.Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
rel2 = stix2.v20.Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
rel3 = stix2.v20.Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3]
reprt = stix2.v20.Report(
name="Malware Report",
published="2021-05-09T08:22:22Z",
labels=["campaign"],
object_refs=[mal.id, rel1.id, ind.id],
)
stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3, reprt]
yield stix2.MemoryStore(stix_objs)
@pytest.fixture
def ds2():
cam = stix2.v20.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
idy = stix2.v20.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
ind = stix2.v20.Indicator(id=INDICATOR_ID, created_by_ref=idy.id, **INDICATOR_KWARGS)
indv2 = ind.new_version(external_references=[{
"source_name": "unknown",
"url": "https://examplewebsite.com/",
}])
mal = stix2.v20.Malware(id=MALWARE_ID, created_by_ref=idy.id, **MALWARE_KWARGS)
malv2 = mal.new_version(external_references=[{
"source_name": "unknown",
"url": "https://examplewebsite2.com/",
}])
rel1 = stix2.v20.Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
rel2 = stix2.v20.Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
rel3 = stix2.v20.Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
stix_objs = [cam, idy, ind, indv2, mal, malv2, rel1, rel2, rel3]
reprt = stix2.v20.Report(
created_by_ref=idy.id,
name="example",
labels=["campaign"],
published="2021-04-09T08:22:22Z",
object_refs=stix_objs,
)
stix_objs.append(reprt)
yield stix2.MemoryStore(stix_objs)
@ -370,3 +411,144 @@ def test_related_to_by_target(ds):
assert len(resp) == 2
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)
def test_versioned_checks(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": True,
"versioning_checks": True,
"max_depth": 1,
},
})
score = stix2.equivalence.object._versioned_checks(INDICATOR_ID, INDICATOR_ID, ds, ds2, **weights)
assert round(score) == 100
def test_semantic_check_with_versioning(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"ds1": ds,
"ds2": ds2,
"max_depth": 1,
},
})
ind = stix2.v20.Indicator(
**dict(
labels=["malicious-activity"],
pattern="[file:hashes.'SHA-256' = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855']",
valid_from="2017-01-01T12:34:56Z",
external_references=[
{
"source_name": "unknown",
"url": "https://examplewebsite2.com/",
},
],
object_marking_refs=[stix2.v20.TLP_WHITE],
)
)
ds.add(ind)
score = stix2.equivalence.object.reference_check(ind.id, INDICATOR_ID, ds, ds2, **weights)
assert round(score) == 0 # Since pattern is different score is really low
def test_list_semantic_check(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"ds1": ds,
"ds2": ds2,
"max_depth": 1,
},
})
object_refs1 = [
"malware--9c4638ec-f1de-4ddb-abf4-1b760417654e",
"relationship--06520621-5352-4e6a-b976-e8fa3d437ffd",
"indicator--a740531e-63ff-4e49-a9e1-a0a3eed0e3e7",
]
object_refs2 = [
"campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
"identity--311b2d2d-f010-4473-83ec-1edf84858f4c",
"indicator--a740531e-63ff-4e49-a9e1-a0a3eed0e3e7",
"malware--9c4638ec-f1de-4ddb-abf4-1b760417654e",
"malware--9c4638ec-f1de-4ddb-abf4-1b760417654e",
"relationship--06520621-5352-4e6a-b976-e8fa3d437ffd",
"relationship--181c9c09-43e6-45dd-9374-3bec192f05ef",
"relationship--a0cbb21c-8daf-4a7f-96aa-7155a4ef8f70",
]
score = stix2.equivalence.object.list_reference_check(
object_refs1,
object_refs2,
ds,
ds2,
**weights,
)
assert round(score) == 1
def test_graph_equivalence_with_filesystem_source(ds):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {}
fs = stix2.FileSystemSource(FS_PATH)
env = stix2.Environment().graphically_equivalent(fs, ds, prop_scores, **weights)
assert round(env) == 28
assert round(prop_scores["matching_score"]) == 139
assert round(prop_scores["sum_weights"]) == 500
def test_graph_equivalence_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds, prop_scores, **weights)
assert round(env) == 100
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["sum_weights"]) == 800
def test_graph_equivalence_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
assert round(env) == 93
assert round(prop_scores["matching_score"]) == 745
assert round(prop_scores["sum_weights"]) == 800
def test_graph_equivalence_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
assert round(env) == 93
assert round(prop_scores["matching_score"]) == 745
assert round(prop_scores["sum_weights"]) == 800

View File

@ -4,7 +4,7 @@ Pattern equivalence unit tests which use STIX 2.0-specific pattern features
import pytest
from stix2.equivalence.patterns import equivalent_patterns
from stix2.equivalence.pattern import equivalent_patterns
@pytest.mark.parametrize(

View File

@ -114,7 +114,7 @@ def test_deduplicate(stix_objs1):
mods = [obj['modified'] for obj in unique]
assert "indicator--00000000-0000-4000-8000-000000000001" in ids
assert "indicator--00000000-0000-4000-8000-000000000001" in ids
assert "indicator--00000000-0000-4000-8000-000000000002" in ids
assert "2017-01-27T13:49:53.935Z" in mods
assert "2017-01-27T13:49:53.936Z" in mods

View File

@ -132,7 +132,13 @@ def stix_objs1():
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z",
}
return [ind1, ind2, ind3, ind4, ind5]
sco = {
"type": "url",
"spec_version": "2.1",
"id": "url--cc1deced-d99b-4d72-9268-8182420cb2fd",
"value": "http://example.com/",
}
return [ind1, ind2, ind3, ind4, ind5, sco]
@pytest.fixture

View File

@ -59,6 +59,17 @@ def test_composite_datasource_operations(stix_objs1, stix_objs2):
assert indicator["modified"] == parse_into_datetime("2017-01-31T13:49:53.935Z")
assert indicator["type"] == "indicator"
sco = cds1.get("url--cc1deced-d99b-4d72-9268-8182420cb2fd")
assert sco["id"] == "url--cc1deced-d99b-4d72-9268-8182420cb2fd"
scos = cds1.all_versions("url--cc1deced-d99b-4d72-9268-8182420cb2fd")
assert len(scos) == 1
assert scos[0]["id"] == "url--cc1deced-d99b-4d72-9268-8182420cb2fd"
scos = cds1.query([Filter("value", "=", "http://example.com/")])
assert len(scos) == 1
assert scos[0]["id"] == "url--cc1deced-d99b-4d72-9268-8182420cb2fd"
query1 = [
Filter("type", "=", "indicator"),
]

View File

@ -1,7 +1,11 @@
import os
import pytest
import stix2
import stix2.environment
import stix2.equivalence.graph
import stix2.equivalence.object
import stix2.exceptions
from .constants import (
@ -12,6 +16,8 @@ from .constants import (
VULNERABILITY_ID, VULNERABILITY_KWARGS,
)
FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
@pytest.fixture
def ds():
@ -22,7 +28,46 @@ def ds():
rel1 = stix2.v21.Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
rel2 = stix2.v21.Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
rel3 = stix2.v21.Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3]
reprt = stix2.v21.Report(
name="Malware Report", published="2021-05-09T08:22:22Z",
object_refs=[mal.id, rel1.id, ind.id],
)
stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3, reprt]
yield stix2.MemoryStore(stix_objs)
@pytest.fixture
def ds2():
cam = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
idy = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
ind = stix2.v21.Indicator(id=INDICATOR_ID, created_by_ref=idy.id, **INDICATOR_KWARGS)
indv2 = ind.new_version(
external_references=[
{
"source_name": "unknown",
"url": "https://examplewebsite.com/",
},
],
object_marking_refs=[stix2.v21.TLP_WHITE],
)
mal = stix2.v21.Malware(id=MALWARE_ID, created_by_ref=idy.id, **MALWARE_KWARGS)
malv2 = mal.new_version(
external_references=[
{
"source_name": "unknown",
"url": "https://examplewebsite2.com/",
},
],
)
rel1 = stix2.v21.Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
rel2 = stix2.v21.Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
rel3 = stix2.v21.Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
stix_objs = [cam, idy, ind, indv2, mal, malv2, rel1, rel2, rel3]
reprt = stix2.v21.Report(
created_by_ref=idy.id, name="example",
published="2021-04-09T08:22:22Z", object_refs=stix_objs,
)
stix_objs.append(reprt)
yield stix2.MemoryStore(stix_objs)
@ -820,3 +865,145 @@ def test_semantic_equivalence_prop_scores_method_provided():
assert len(prop_scores) == 2
assert prop_scores["matching_score"] == 96.0
assert prop_scores["sum_weights"] == 100.0
def test_versioned_checks(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": True,
"versioning_checks": True,
"max_depth": 1,
},
})
score = stix2.equivalence.object._versioned_checks(INDICATOR_ID, INDICATOR_ID, ds, ds2, **weights)
assert round(score) == 100
def test_semantic_check_with_versioning(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"ds1": ds,
"ds2": ds2,
"max_depth": 1,
},
})
ind = stix2.v21.Indicator(
**dict(
indicator_types=["malicious-activity"],
pattern_type="stix",
pattern="[file:hashes.'SHA-256' = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855']",
valid_from="2017-01-01T12:34:56Z",
external_references=[
{
"source_name": "unknown",
"url": "https://examplewebsite2.com/",
},
],
object_marking_refs=[stix2.v21.TLP_WHITE],
)
)
ds.add(ind)
score = stix2.equivalence.object.reference_check(ind.id, INDICATOR_ID, ds, ds2, **weights)
assert round(score) == 0 # Since pattern is different score is really low
def test_list_semantic_check(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"ds1": ds,
"ds2": ds2,
"max_depth": 1,
},
})
object_refs1 = [
"malware--9c4638ec-f1de-4ddb-abf4-1b760417654e",
"relationship--06520621-5352-4e6a-b976-e8fa3d437ffd",
"indicator--a740531e-63ff-4e49-a9e1-a0a3eed0e3e7",
]
object_refs2 = [
"campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
"identity--311b2d2d-f010-4473-83ec-1edf84858f4c",
"indicator--a740531e-63ff-4e49-a9e1-a0a3eed0e3e7",
"malware--9c4638ec-f1de-4ddb-abf4-1b760417654e",
"malware--9c4638ec-f1de-4ddb-abf4-1b760417654e",
"relationship--06520621-5352-4e6a-b976-e8fa3d437ffd",
"relationship--181c9c09-43e6-45dd-9374-3bec192f05ef",
"relationship--a0cbb21c-8daf-4a7f-96aa-7155a4ef8f70",
]
score = stix2.equivalence.object.list_reference_check(
object_refs1,
object_refs2,
ds,
ds2,
**weights,
)
assert round(score) == 1
def test_graph_equivalence_with_filesystem_source(ds):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {}
fs = stix2.FileSystemSource(FS_PATH)
env = stix2.Environment().graphically_equivalent(fs, ds, prop_scores, **weights)
assert round(env) == 24
assert round(prop_scores["matching_score"]) == 122
assert round(prop_scores["sum_weights"]) == 500
def test_graph_equivalence_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds, prop_scores, **weights)
assert round(env) == 100
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["sum_weights"]) == 800
def test_graph_equivalence_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
assert round(env) == 93
assert round(prop_scores["matching_score"]) == 745
assert round(prop_scores["sum_weights"]) == 800
def test_graph_equivalence_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
assert round(env) == 93
assert round(prop_scores["matching_score"]) == 745
assert round(prop_scores["sum_weights"]) == 800

View File

@ -4,7 +4,7 @@ Pattern equivalence unit tests which use STIX 2.1+-specific pattern features
import pytest
from stix2.equivalence.patterns import equivalent_patterns
from stix2.equivalence.pattern import equivalent_patterns
@pytest.mark.parametrize(

View File

@ -104,17 +104,18 @@ def test_get_type_from_id(stix_id, type):
def test_deduplicate(stix_objs1):
unique = stix2.utils.deduplicate(stix_objs1)
# Only 3 objects are unique
# 2 id's vary
# Only 4 objects are unique
# 3 id's vary
# 2 modified times vary for a particular id
assert len(unique) == 3
assert len(unique) == 4
ids = [obj['id'] for obj in unique]
mods = [obj['modified'] for obj in unique]
mods = [obj.get('modified') for obj in unique]
assert "indicator--00000000-0000-4000-8000-000000000001" in ids
assert "indicator--00000000-0000-4000-8000-000000000001" in ids
assert "indicator--00000000-0000-4000-8000-000000000002" in ids
assert "url--cc1deced-d99b-4d72-9268-8182420cb2fd" in ids
assert "2017-01-27T13:49:53.935Z" in mods
assert "2017-01-27T13:49:53.936Z" in mods

View File

@ -132,11 +132,12 @@ def deduplicate(stix_obj_list):
unique_objs = {}
for obj in stix_obj_list:
try:
unique_objs[(obj['id'], obj['modified'])] = obj
except KeyError:
# Handle objects with no `modified` property, e.g. marking-definition
unique_objs[(obj['id'], obj['created'])] = obj
ver = obj.get("modified") or obj.get("created")
if ver is None:
unique_objs[obj["id"]] = obj
else:
unique_objs[(obj['id'], ver)] = obj
return list(unique_objs.values())