Merge branch 'master' of github.com:oasis-open/cti-python-stix2 into dev-extensions-proposal

pull/1/head
Emmanuelle Vargas-Gonzalez 2021-03-31 18:53:02 -04:00
commit bc053da3c4
17 changed files with 514 additions and 451 deletions

3
.gitignore vendored
View File

@ -55,8 +55,7 @@ coverage.xml
# Sphinx documentation
docs/_build/
.ipynb_checkpoints
graph_default_sem_eq_weights.rst
object_default_sem_eq_weights.rst
similarity_weights.rst
# PyBuilder
target/

View File

@ -7,7 +7,6 @@ import sys
from sphinx.ext.autodoc import ClassDocumenter
from stix2.base import _STIXBase
from stix2.equivalence.graph import GRAPH_WEIGHTS
from stix2.equivalence.object import WEIGHTS
from stix2.version import __version__
@ -66,16 +65,9 @@ object_default_sem_eq_weights = json.dumps(WEIGHTS, indent=4, default=lambda o:
object_default_sem_eq_weights = object_default_sem_eq_weights.replace('\n', '\n ')
object_default_sem_eq_weights = object_default_sem_eq_weights.replace(' "', ' ')
object_default_sem_eq_weights = object_default_sem_eq_weights.replace('"\n', '\n')
with open('object_default_sem_eq_weights.rst', 'w') as f:
with open('similarity_weights.rst', 'w') as f:
f.write(".. code-block:: python\n\n {}\n\n".format(object_default_sem_eq_weights))
graph_default_sem_eq_weights = json.dumps(GRAPH_WEIGHTS, indent=4, default=lambda o: o.__name__)
graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace('\n', '\n ')
graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace(' "', ' ')
graph_default_sem_eq_weights = graph_default_sem_eq_weights.replace('"\n', '\n')
with open('graph_default_sem_eq_weights.rst', 'w') as f:
f.write(".. code-block:: python\n\n {}\n\n".format(graph_default_sem_eq_weights))
def get_property_type(prop):
"""Convert property classname into pretty string name of property.

View File

@ -4607,20 +4607,11 @@
" ),\n",
"]\n",
"\n",
"\n",
"weights = {\n",
" \"_internal\": {\n",
" \"ignore_spec_version\": False,\n",
" \"versioning_checks\": False,\n",
" \"max_depth\": 1,\n",
" },\n",
"}\n",
"\n",
"memstore1 = MemoryStore(g1)\n",
"memstore2 = MemoryStore(g2)\n",
"prop_scores = {}\n",
"\n",
"similarity_result = env.graph_similarity(memstore1, memstore2, prop_scores, **weights)\n",
"similarity_result = env.graph_similarity(memstore1, memstore2, prop_scores)\n",
"equivalence_result = env.graph_equivalence(memstore1, memstore2, threshold=60)\n",
"\n",
"print(similarity_result)\n",

View File

@ -60,7 +60,7 @@ setup(
'Bug Tracker': 'https://github.com/oasis-open/cti-python-stix2/issues/',
},
extras_require={
'taxii': ['taxii2-client>=2.2.1'],
'taxii': ['taxii2-client>=2.3.0'],
'semantic': ['haversine', 'rapidfuzz'],
},
)

View File

@ -17,7 +17,7 @@ from .exceptions import (
)
from .markings import _MarkingsMixin
from .markings.utils import validate
from .serialization import STIXJSONEncoder, serialize
from .serialization import STIXJSONEncoder, fp_serialize, serialize
from .utils import NOW, PREFIX_21_REGEX, get_timestamp
from .versioning import new_version as _new_version
from .versioning import revoke as _revoke
@ -262,6 +262,35 @@ class _STIXBase(collections.abc.Mapping):
"""
return serialize(self, *args, **kwargs)
def fp_serialize(self, *args, **kwargs):
"""
Serialize a STIX object to ``fp`` (a text stream file-like supporting object).
Examples:
>>> import stix2
>>> identity = stix2.Identity(name='Example Corp.', identity_class='organization')
>>> print(identity.serialize(sort_keys=True))
{"created": "2018-06-08T19:03:54.066Z", ... "name": "Example Corp.", "type": "identity"}
>>> print(identity.serialize(sort_keys=True, indent=4))
{
"created": "2018-06-08T19:03:54.066Z",
"id": "identity--d7f3e25a-ba1c-447a-ab71-6434b092b05e",
"identity_class": "organization",
"modified": "2018-06-08T19:03:54.066Z",
"name": "Example Corp.",
"type": "identity"
}
>>> with open("example.json", mode="w", encoding="utf-8") as f:
>>> identity.fp_serialize(f, pretty=True)
Returns:
None
See Also:
``stix2.serialization.fp_serialize`` for options.
"""
fp_serialize(self, *args, **kwargs)
class _DomainObject(_STIXBase, _MarkingsMixin):
pass

View File

@ -13,7 +13,7 @@ from stix2.datastore import (
)
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
from stix2.parsing import parse
from stix2.serialization import serialize
from stix2.serialization import fp_serialize
from stix2.utils import format_datetime, get_type_from_id, parse_into_datetime
@ -584,9 +584,8 @@ class FileSystemSink(DataSink):
if os.path.isfile(file_path):
raise DataSourceError("Attempted to overwrite file (!) at: {}".format(file_path))
with io.open(file_path, 'w', encoding=encoding) as f:
stix_obj = serialize(stix_obj, pretty=True, encoding=encoding, ensure_ascii=False)
f.write(stix_obj)
with io.open(file_path, mode='w', encoding=encoding) as f:
fp_serialize(stix_obj, f, pretty=True, encoding=encoding, ensure_ascii=False)
def add(self, stix_data=None, version=None):
"""Add STIX objects to file directory.

View File

@ -12,6 +12,8 @@ from stix2.parsing import parse
from stix2.utils import deduplicate
try:
from taxii2client import v20 as tcv20
from taxii2client import v21 as tcv21
from taxii2client.exceptions import ValidationError
_taxii2_client = True
except ImportError:
@ -33,9 +35,12 @@ class TAXIICollectionStore(DataStoreMixin):
side(retrieving data) and False for TAXIICollectionSink
side(pushing data). However, when parameter is supplied, it will
be applied to both TAXIICollectionSource/Sink.
items_per_page (int): How many STIX objects to request per call
to TAXII Server. The value can be tuned, but servers may override
if their internal limit is surpassed. Used by TAXIICollectionSource
"""
def __init__(self, collection, allow_custom=None):
def __init__(self, collection, allow_custom=None, items_per_page=5000):
if allow_custom is None:
allow_custom_source = True
allow_custom_sink = False
@ -43,7 +48,7 @@ class TAXIICollectionStore(DataStoreMixin):
allow_custom_sink = allow_custom_source = allow_custom
super(TAXIICollectionStore, self).__init__(
source=TAXIICollectionSource(collection, allow_custom=allow_custom_source),
source=TAXIICollectionSource(collection, allow_custom=allow_custom_source, items_per_page=items_per_page),
sink=TAXIICollectionSink(collection, allow_custom=allow_custom_sink),
)
@ -144,9 +149,12 @@ class TAXIICollectionSource(DataSource):
collection (taxii2.Collection): TAXII Collection instance
allow_custom (bool): Whether to allow custom STIX content to be
added to the FileSystemSink. Default: True
items_per_page (int): How many STIX objects to request per call
to TAXII Server. The value can be tuned, but servers may override
if their internal limit is surpassed.
"""
def __init__(self, collection, allow_custom=True):
def __init__(self, collection, allow_custom=True, items_per_page=5000):
super(TAXIICollectionSource, self).__init__()
if not _taxii2_client:
raise ImportError("taxii2client library is required for usage of TAXIICollectionSource")
@ -167,6 +175,7 @@ class TAXIICollectionSource(DataSource):
)
self.allow_custom = allow_custom
self.items_per_page = items_per_page
def get(self, stix_id, version=None, _composite_filters=None):
"""Retrieve STIX object from local/remote STIX Collection
@ -286,8 +295,12 @@ class TAXIICollectionSource(DataSource):
taxii_filters_dict = dict((f.property, f.value) for f in taxii_filters)
# query TAXII collection
all_data = []
try:
all_data = self.collection.get_objects(**taxii_filters_dict).get('objects', [])
paged_request = tcv21.as_pages if isinstance(self.collection, tcv21.Collection) else tcv20.as_pages
for resource in paged_request(self.collection.get_objects, per_request=self.items_per_page, **taxii_filters_dict):
all_data.extend(resource.get("objects", []))
# deduplicate data (before filtering as reduces wasted filtering)
all_data = deduplicate(all_data)

View File

@ -189,7 +189,11 @@ class Environment(DataStoreMixin):
return None
@staticmethod
def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
def object_similarity(
obj1, obj2, prop_scores={}, ds1=None, ds2=None,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a measure of how similar the two objects are.
Args:
@ -197,8 +201,19 @@ class Environment(DataStoreMixin):
obj2: A stix2 object instance
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the similarity process
ds1 (optional): A DataStore object instance from which to pull related objects
ds2 (optional): A DataStore object instance from which to pull related objects
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity.
@ -213,17 +228,24 @@ class Environment(DataStoreMixin):
Note:
Default weight_dict:
.. include:: ../object_default_sem_eq_weights.rst
.. include:: ../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
return object_similarity(obj1, obj2, prop_scores, **weight_dict)
return object_similarity(
obj1, obj2, prop_scores, ds1, ds2, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)
@staticmethod
def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict):
def object_equivalence(
obj1, obj2, prop_scores={}, threshold=70, ds1=None, ds2=None,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a true/false value if two objects are semantically equivalent.
Internally, it calls the object_similarity function and compares it against the given
threshold value.
@ -236,8 +258,19 @@ class Environment(DataStoreMixin):
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both objects equivalent. This
value can be tuned.
weight_dict: A dictionary that can be used to override settings
in the similarity process
ds1 (optional): A DataStore object instance from which to pull related objects
ds2 (optional): A DataStore object instance from which to pull related objects
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns:
bool: True if the result of the object similarity is greater than or equal to
@ -253,17 +286,23 @@ class Environment(DataStoreMixin):
Note:
Default weight_dict:
.. include:: ../object_default_sem_eq_weights.rst
.. include:: ../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
return object_equivalence(obj1, obj2, prop_scores, threshold, **weight_dict)
return object_equivalence(
obj1, obj2, prop_scores, threshold, ds1, ds2,
ignore_spec_version, versioning_checks, max_depth, **weight_dict
)
@staticmethod
def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
def graph_similarity(
ds1, ds2, prop_scores={}, ignore_spec_version=False,
versioning_checks=False, max_depth=1, **weight_dict
):
"""This method returns a similarity score for two given graphs.
Each DataStore can contain a connected or disconnected graph and the
final result is weighted over the amount of objects we managed to compare.
@ -275,8 +314,17 @@ class Environment(DataStoreMixin):
ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the similarity process
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity.
@ -291,17 +339,24 @@ class Environment(DataStoreMixin):
Note:
Default weight_dict:
.. include:: ../graph_default_sem_eq_weights.rst
.. include:: ../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
return graph_similarity(ds1, ds2, prop_scores, **weight_dict)
return graph_similarity(
ds1, ds2, prop_scores, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)
@staticmethod
def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict):
def graph_equivalence(
ds1, ds2, prop_scores={}, threshold=70,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a true/false value if two graphs are semantically equivalent.
Internally, it calls the graph_similarity function and compares it against the given
threshold value.
@ -314,8 +369,17 @@ class Environment(DataStoreMixin):
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both graphs equivalent. This
value can be tuned.
weight_dict: A dictionary that can be used to override settings
in the similarity process
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns:
bool: True if the result of the graph similarity is greater than or equal to
@ -331,11 +395,14 @@ class Environment(DataStoreMixin):
Note:
Default weight_dict:
.. include:: ../graph_default_sem_eq_weights.rst
.. include:: ../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
return graph_equivalence(ds1, ds2, prop_scores, threshold, **weight_dict)
return graph_equivalence(
ds1, ds2, prop_scores, threshold, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)

View File

@ -2,15 +2,17 @@
import logging
from ..object import (
WEIGHTS, _bucket_per_type, _object_pairs, exact_match,
list_reference_check, object_similarity, partial_string_based,
partial_timestamp_based, reference_check,
WEIGHTS, _bucket_per_type, _object_pairs, object_similarity,
)
logger = logging.getLogger(__name__)
def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict):
def graph_equivalence(
ds1, ds2, prop_scores={}, threshold=70,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a true/false value if two graphs are semantically equivalent.
Internally, it calls the graph_similarity function and compares it against the given
threshold value.
@ -23,8 +25,17 @@ def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict):
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both graphs equivalent. This
value can be tuned.
weight_dict: A dictionary that can be used to override settings
in the similarity process
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns:
bool: True if the result of the graph similarity is greater than or equal to
@ -40,20 +51,26 @@ def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, **weight_dict):
Note:
Default weight_dict:
.. include:: ../../graph_default_sem_eq_weights.rst
.. include:: ../../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
similarity_result = graph_similarity(ds1, ds2, prop_scores, **weight_dict)
similarity_result = graph_similarity(
ds1, ds2, prop_scores, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)
if similarity_result >= threshold:
return True
return False
def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
def graph_similarity(
ds1, ds2, prop_scores={}, ignore_spec_version=False,
versioning_checks=False, max_depth=1, **weight_dict
):
"""This method returns a similarity score for two given graphs.
Each DataStore can contain a connected or disconnected graph and the
final result is weighted over the amount of objects we managed to compare.
@ -65,8 +82,17 @@ def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
ds2: A DataStore object instance representing your graph
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the similarity process
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity.
@ -81,7 +107,7 @@ def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
Note:
Default weight_dict:
.. include:: ../../graph_default_sem_eq_weights.rst
.. include:: ../../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
@ -90,13 +116,21 @@ def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
"""
results = {}
similarity_score = 0
weights = GRAPH_WEIGHTS.copy()
weights = WEIGHTS.copy()
if weight_dict:
weights.update(weight_dict)
if weights["_internal"]["max_depth"] <= 0:
raise ValueError("weight_dict['_internal']['max_depth'] must be greater than 0")
weights["_internal"] = {
"ignore_spec_version": ignore_spec_version,
"versioning_checks": versioning_checks,
"ds1": ds1,
"ds2": ds2,
"max_depth": max_depth,
}
if max_depth <= 0:
raise ValueError("'max_depth' must be greater than 0")
pairs = _object_pairs(
_bucket_per_type(ds1.query([])),
@ -104,16 +138,17 @@ def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
weights,
)
weights["_internal"]["ds1"] = ds1
weights["_internal"]["ds2"] = ds2
logger.debug("Starting graph similarity process between DataStores: '%s' and '%s'", ds1.id, ds2.id)
for object1, object2 in pairs:
iprop_score = {}
object1_id = object1["id"]
object2_id = object2["id"]
result = object_similarity(object1, object2, iprop_score, **weights)
result = object_similarity(
object1, object2, iprop_score, ds1, ds2,
ignore_spec_version, versioning_checks,
max_depth, **weights
)
if object1_id not in results:
results[object1_id] = {"lhs": object1_id, "rhs": object2_id, "prop_score": iprop_score, "value": result}
@ -141,40 +176,3 @@ def graph_similarity(ds1, ds2, prop_scores={}, **weight_dict):
similarity_score,
)
return similarity_score
# default weights used for the graph similarity process
GRAPH_WEIGHTS = WEIGHTS.copy()
GRAPH_WEIGHTS.update({
"grouping": {
"name": (20, partial_string_based),
"context": (20, partial_string_based),
"object_refs": (60, list_reference_check),
},
"relationship": {
"relationship_type": (20, exact_match),
"source_ref": (40, reference_check),
"target_ref": (40, reference_check),
},
"report": {
"name": (30, partial_string_based),
"published": (10, partial_timestamp_based),
"object_refs": (60, list_reference_check),
"tdelta": 1, # One day interval
},
"sighting": {
"first_seen": (5, partial_timestamp_based),
"last_seen": (5, partial_timestamp_based),
"sighting_of_ref": (40, reference_check),
"observed_data_refs": (20, list_reference_check),
"where_sighted_refs": (20, list_reference_check),
"summary": (10, exact_match),
},
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"ds1": None,
"ds2": None,
"max_depth": 1,
},
}) # :autodoc-skip:

View File

@ -4,14 +4,18 @@ import itertools
import logging
import time
from ...datastore import Filter
from ...datastore import DataSource, DataStoreMixin, Filter
from ...utils import STIXdatetime, parse_into_datetime
from ..pattern import equivalent_patterns
logger = logging.getLogger(__name__)
def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict):
def object_equivalence(
obj1, obj2, prop_scores={}, threshold=70, ds1=None,
ds2=None, ignore_spec_version=False,
versioning_checks=False, max_depth=1, **weight_dict
):
"""This method returns a true/false value if two objects are semantically equivalent.
Internally, it calls the object_similarity function and compares it against the given
threshold value.
@ -24,8 +28,19 @@ def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict):
threshold: A numerical value between 0 and 100 to determine the minimum
score to result in successfully calling both objects equivalent. This
value can be tuned.
weight_dict: A dictionary that can be used to override settings
in the similarity process
ds1 (optional): A DataStore object instance from which to pull related objects
ds2 (optional): A DataStore object instance from which to pull related objects
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns:
bool: True if the result of the object similarity is greater than or equal to
@ -41,20 +56,27 @@ def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, **weight_dict):
Note:
Default weight_dict:
.. include:: ../../object_default_sem_eq_weights.rst
.. include:: ../../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
see `the Committee Note <link here>`__.
"""
similarity_result = object_similarity(obj1, obj2, prop_scores, **weight_dict)
similarity_result = object_similarity(
obj1, obj2, prop_scores, ds1, ds2, ignore_spec_version,
versioning_checks, max_depth, **weight_dict
)
if similarity_result >= threshold:
return True
return False
def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
def object_similarity(
obj1, obj2, prop_scores={}, ds1=None, ds2=None,
ignore_spec_version=False, versioning_checks=False,
max_depth=1, **weight_dict
):
"""This method returns a measure of similarity depending on how
similar the two objects are.
@ -63,8 +85,19 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
obj2: A stix2 object instance
prop_scores: A dictionary that can hold individual property scores,
weights, contributing score, matching score and sum of weights.
weight_dict: A dictionary that can be used to override settings
in the similarity process
ds1 (optional): A DataStore object instance from which to pull related objects
ds2 (optional): A DataStore object instance from which to pull related objects
ignore_spec_version: A boolean indicating whether to test object types
that belong to different spec versions (STIX 2.0 and STIX 2.1 for example).
If set to True this check will be skipped.
versioning_checks: A boolean indicating whether to test multiple revisions
of the same object (when present) to maximize similarity against a
particular version. If set to True the algorithm will perform this step.
max_depth: A positive integer indicating the maximum recursion depth the
algorithm can reach when de-referencing objects and performing the
object_similarity algorithm.
weight_dict: A dictionary that can be used to override what checks are done
to objects in the similarity process.
Returns:
float: A number between 0.0 and 100.0 as a measurement of similarity.
@ -79,7 +112,7 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
Note:
Default weight_dict:
.. include:: ../../object_default_sem_eq_weights.rst
.. include:: ../../similarity_weights.rst
Note:
This implementation follows the Semantic Equivalence Committee Note.
@ -91,8 +124,15 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
if weight_dict:
weights.update(weight_dict)
weights["_internal"] = {
"ignore_spec_version": ignore_spec_version,
"versioning_checks": versioning_checks,
"ds1": ds1,
"ds2": ds2,
"max_depth": max_depth,
}
type1, type2 = obj1["type"], obj2["type"]
ignore_spec_version = weights["_internal"]["ignore_spec_version"]
if type1 != type2:
raise ValueError('The objects to compare must be of the same type!')
@ -117,6 +157,7 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
if check_property_present(prop, obj1, obj2):
w = weights[type1][prop][0]
comp_funct = weights[type1][prop][1]
prop_scores[prop] = {}
if comp_funct == partial_timestamp_based:
contributing_score = w * comp_funct(obj1[prop], obj2[prop], weights[type1]["tdelta"])
@ -124,11 +165,18 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
threshold = weights[type1]["threshold"]
contributing_score = w * comp_funct(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], threshold)
elif comp_funct == reference_check or comp_funct == list_reference_check:
max_depth = weights["_internal"]["max_depth"]
if max_depth > 0:
weights["_internal"]["max_depth"] = max_depth - 1
ds1, ds2 = weights["_internal"]["ds1"], weights["_internal"]["ds2"]
contributing_score = w * comp_funct(obj1[prop], obj2[prop], ds1, ds2, **weights)
if _datastore_check(ds1, ds2):
contributing_score = w * comp_funct(obj1[prop], obj2[prop], ds1, ds2, **weights)
elif comp_funct == reference_check:
comp_funct = exact_match
contributing_score = w * comp_funct(obj1[prop], obj2[prop])
elif comp_funct == list_reference_check:
comp_funct = partial_list_based
contributing_score = w * comp_funct(obj1[prop], obj2[prop])
prop_scores[prop]["check_type"] = comp_funct.__name__
else:
continue # prevent excessive recursion
weights["_internal"]["max_depth"] = max_depth
@ -138,10 +186,8 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
sum_weights += w
matching_score += contributing_score
prop_scores[prop] = {
"weight": w,
"contributing_score": contributing_score,
}
prop_scores[prop]["weight"] = w
prop_scores[prop]["contributing_score"] = contributing_score
logger.debug("'%s' check -- weight: %s, contributing score: %s", prop, w, contributing_score)
prop_scores["matching_score"] = matching_score
@ -165,7 +211,7 @@ def object_similarity(obj1, obj2, prop_scores={}, **weight_dict):
def check_property_present(prop, obj1, obj2):
"""Helper method checks if a property is present on both objects."""
if prop == "longitude_latitude":
if all(x in obj1 and x in obj2 for x in ['latitude', 'longitude']):
if all(x in obj1 and x in obj2 for x in ('latitude', 'longitude')):
return True
elif prop in obj1 and prop in obj2:
return True
@ -196,7 +242,9 @@ def partial_timestamp_based(t1, t2, tdelta):
def partial_list_based(l1, l2):
"""Performs a partial list matching via finding the intersection between common values.
"""Performs a partial list matching via finding the intersection between
common values. Repeated values are counted only once. This method can be
used for *_refs equality checks when de-reference is not possible.
Args:
l1: A list of values.
@ -213,7 +261,8 @@ def partial_list_based(l1, l2):
def exact_match(val1, val2):
"""Performs an exact value match based on two values
"""Performs an exact value match based on two values. This method can be
used for *_ref equality check when de-reference is not possible.
Args:
val1: A value suitable for an equality test.
@ -261,12 +310,12 @@ def custom_pattern_based(pattern1, pattern2):
return equivalent_patterns(pattern1, pattern2)
def partial_external_reference_based(refs1, refs2):
def partial_external_reference_based(ext_refs1, ext_refs2):
"""Performs a matching on External References.
Args:
refs1: A list of external references.
refs2: A list of external references.
ext_refs1: A list of external references.
ext_refs2: A list of external references.
Returns:
float: Number between 0.0 and 1.0 depending on matches.
@ -275,51 +324,47 @@ def partial_external_reference_based(refs1, refs2):
allowed = {"veris", "cve", "capec", "mitre-attack"}
matches = 0
if len(refs1) >= len(refs2):
l1 = refs1
l2 = refs2
else:
l1 = refs2
l2 = refs1
ref_pairs = itertools.chain(
itertools.product(ext_refs1, ext_refs2),
)
for ext_ref1 in l1:
for ext_ref2 in l2:
sn_match = False
ei_match = False
url_match = False
source_name = None
for ext_ref1, ext_ref2 in ref_pairs:
sn_match = False
ei_match = False
url_match = False
source_name = None
if check_property_present("source_name", ext_ref1, ext_ref2):
if ext_ref1["source_name"] == ext_ref2["source_name"]:
source_name = ext_ref1["source_name"]
sn_match = True
if check_property_present("external_id", ext_ref1, ext_ref2):
if ext_ref1["external_id"] == ext_ref2["external_id"]:
ei_match = True
if check_property_present("url", ext_ref1, ext_ref2):
if ext_ref1["url"] == ext_ref2["url"]:
url_match = True
if check_property_present("source_name", ext_ref1, ext_ref2):
if ext_ref1["source_name"] == ext_ref2["source_name"]:
source_name = ext_ref1["source_name"]
sn_match = True
if check_property_present("external_id", ext_ref1, ext_ref2):
if ext_ref1["external_id"] == ext_ref2["external_id"]:
ei_match = True
if check_property_present("url", ext_ref1, ext_ref2):
if ext_ref1["url"] == ext_ref2["url"]:
url_match = True
# Special case: if source_name is a STIX defined name and either
# external_id or url match then its a perfect match and other entries
# can be ignored.
if sn_match and (ei_match or url_match) and source_name in allowed:
result = 1.0
logger.debug(
"--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
refs1, refs2, result,
)
return result
# Special case: if source_name is a STIX defined name and either
# external_id or url match then its a perfect match and other entries
# can be ignored.
if sn_match and (ei_match or url_match) and source_name in allowed:
result = 1.0
logger.debug(
"--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
ext_refs1, ext_refs2, result,
)
return result
# Regular check. If the source_name (not STIX-defined) or external_id or
# url matches then we consider the entry a match.
if (sn_match or ei_match or url_match) and source_name not in allowed:
matches += 1
# Regular check. If the source_name (not STIX-defined) or external_id or
# url matches then we consider the entry a match.
if (sn_match or ei_match or url_match) and source_name not in allowed:
matches += 1
result = matches / max(len(refs1), len(refs2))
result = matches / max(len(ext_refs1), len(ext_refs2))
logger.debug(
"--\t\tpartial_external_reference_based '%s' '%s'\tresult: '%s'",
refs1, refs2, result,
ext_refs1, ext_refs2, result,
)
return result
@ -352,17 +397,23 @@ def _versioned_checks(ref1, ref2, ds1, ds2, **weights):
"""Checks multiple object versions if present in graph.
Maximizes for the similarity score of a particular version."""
results = {}
objects1 = ds1.query([Filter("id", "=", ref1)])
objects2 = ds2.query([Filter("id", "=", ref2)])
pairs = _object_pairs(
_bucket_per_type(objects1),
_bucket_per_type(objects2),
_bucket_per_type(ds1.query([Filter("id", "=", ref1)])),
_bucket_per_type(ds2.query([Filter("id", "=", ref2)])),
weights,
)
ignore_spec_version = weights["_internal"]["ignore_spec_version"]
versioning_checks = weights["_internal"]["versioning_checks"]
max_depth = weights["_internal"]["max_depth"]
for object1, object2 in pairs:
result = object_similarity(object1, object2, **weights)
result = object_similarity(
object1, object2, ds1=ds1, ds2=ds2,
ignore_spec_version=ignore_spec_version,
versioning_checks=versioning_checks,
max_depth=max_depth, **weights,
)
if ref1 not in results:
results[ref1] = {"matched": ref2, "value": result}
elif result > results[ref1]["value"]:
@ -383,12 +434,20 @@ def reference_check(ref1, ref2, ds1, ds2, **weights):
result = 0.0
if type1 == type2 and type1 in weights:
if weights["_internal"]["versioning_checks"]:
ignore_spec_version = weights["_internal"]["ignore_spec_version"]
versioning_checks = weights["_internal"]["versioning_checks"]
max_depth = weights["_internal"]["max_depth"]
if versioning_checks:
result = _versioned_checks(ref1, ref2, ds1, ds2, **weights) / 100.0
else:
o1, o2 = ds1.get(ref1), ds2.get(ref2)
if o1 and o2:
result = object_similarity(o1, o2, **weights) / 100.0
result = object_similarity(
o1, o2, ds1=ds1, ds2=ds2,
ignore_spec_version=ignore_spec_version,
versioning_checks=versioning_checks,
max_depth=max_depth, **weights,
) / 100.0
logger.debug(
"--\t\treference_check '%s' '%s'\tresult: '%s'",
@ -439,6 +498,15 @@ def list_reference_check(refs1, refs2, ds1, ds2, **weights):
return result
def _datastore_check(ds1, ds2):
if (
issubclass(ds1.__class__, (DataStoreMixin, DataSource)) or
issubclass(ds2.__class__, (DataStoreMixin, DataSource))
):
return True
return False
def _bucket_per_type(graph, mode="type"):
"""Given a list of objects or references, bucket them by type.
Depending on the list type: extract from 'type' property or using
@ -480,11 +548,20 @@ WEIGHTS = {
"name": (60, partial_string_based),
"external_references": (40, partial_external_reference_based),
},
"grouping": {
"name": (20, partial_string_based),
"context": (20, partial_string_based),
"object_refs": (60, list_reference_check),
},
"identity": {
"name": (60, partial_string_based),
"identity_class": (20, exact_match),
"sectors": (20, partial_list_based),
},
"incident": {
"name": (30, partial_string_based),
"external_references": (70, partial_external_reference_based),
},
"indicator": {
"indicator_types": (15, partial_list_based),
"pattern": (80, custom_pattern_based),
@ -511,6 +588,25 @@ WEIGHTS = {
"definition": (60, exact_match),
"definition_type": (20, exact_match),
},
"relationship": {
"relationship_type": (20, exact_match),
"source_ref": (40, reference_check),
"target_ref": (40, reference_check),
},
"report": {
"name": (30, partial_string_based),
"published": (10, partial_timestamp_based),
"object_refs": (60, list_reference_check),
"tdelta": 1, # One day interval
},
"sighting": {
"first_seen": (5, partial_timestamp_based),
"last_seen": (5, partial_timestamp_based),
"sighting_of_ref": (40, reference_check),
"observed_data_refs": (20, list_reference_check),
"where_sighted_refs": (20, list_reference_check),
"summary": (10, exact_match),
},
"threat-actor": {
"name": (60, partial_string_based),
"threat_actor_types": (20, partial_list_based),
@ -524,7 +620,4 @@ WEIGHTS = {
"name": (30, partial_string_based),
"external_references": (70, partial_external_reference_based),
},
"_internal": {
"ignore_spec_version": False,
},
} # :autodoc-skip:

View File

@ -2,6 +2,7 @@
import copy
import datetime as dt
import io
import simplejson as json
@ -64,6 +65,37 @@ def serialize(obj, pretty=False, include_optional_defaults=False, **kwargs):
Returns:
str: The serialized JSON object.
Note:
The argument ``pretty=True`` will output the STIX object following
spec order. Using this argument greatly impacts object serialization
performance. If your use case is centered across machine-to-machine
operation it is recommended to set ``pretty=False``.
When ``pretty=True`` the following key-value pairs will be added or
overridden: indent=4, separators=(",", ": "), item_sort_key=sort_by.
"""
with io.StringIO() as fp:
fp_serialize(obj, fp, pretty, include_optional_defaults, **kwargs)
return fp.getvalue()
def fp_serialize(obj, fp, pretty=False, include_optional_defaults=False, **kwargs):
"""
Serialize a STIX object to ``fp`` (a text stream file-like supporting object).
Args:
obj: The STIX object to be serialized.
fp: A text stream file-like object supporting ``.write()``.
pretty (bool): If True, output properties following the STIX specs
formatting. This includes indentation. Refer to notes for more
details. (Default: ``False``)
include_optional_defaults (bool): Determines whether to include
optional properties set to the default value defined in the spec.
**kwargs: The arguments for a json.dumps() call.
Returns:
None
Note:
The argument ``pretty=True`` will output the STIX object following
spec order. Using this argument greatly impacts object serialization
@ -80,9 +112,9 @@ def serialize(obj, pretty=False, include_optional_defaults=False, **kwargs):
kwargs.update({'indent': 4, 'separators': (',', ': '), 'item_sort_key': sort_by})
if include_optional_defaults:
return json.dumps(obj, cls=STIXJSONIncludeOptionalDefaultsEncoder, **kwargs)
json.dump(obj, fp, cls=STIXJSONIncludeOptionalDefaultsEncoder, **kwargs)
else:
return json.dumps(obj, cls=STIXJSONEncoder, **kwargs)
json.dump(obj, fp, cls=STIXJSONEncoder, **kwargs)
def _find(seq, val):

View File

@ -1,3 +1,4 @@
import io
import json
import pytest
@ -113,6 +114,27 @@ def test_bundle_id_must_start_with_bundle():
assert str(excinfo.value) == "Invalid value for Bundle 'id': must start with 'bundle--'."
def test_create_bundle_fp_serialize_pretty(indicator, malware, relationship):
bundle = stix2.v20.Bundle(objects=[indicator, malware, relationship])
buffer = io.StringIO()
bundle.fp_serialize(buffer, pretty=True)
assert str(bundle) == EXPECTED_BUNDLE
assert bundle.serialize(pretty=True) == EXPECTED_BUNDLE
assert buffer.getvalue() == EXPECTED_BUNDLE
def test_create_bundle_fp_serialize_nonpretty(indicator, malware, relationship):
bundle = stix2.v20.Bundle(objects=[indicator, malware, relationship])
buffer = io.StringIO()
bundle.fp_serialize(buffer, sort_keys=True)
assert bundle.serialize(sort_keys=True) == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
assert buffer.getvalue() == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
def test_create_bundle1(indicator, malware, relationship):
bundle = stix2.v20.Bundle(objects=[indicator, malware, relationship])

View File

@ -4,7 +4,7 @@ from medallion.filters.basic_filter import BasicFilter
import pytest
from requests.models import Response
from taxii2client.common import _filter_kwargs_to_query_params
from taxii2client.v20 import Collection
from taxii2client.v20 import MEDIA_TYPE_STIX_V20, Collection
import stix2
from stix2.datastore import DataSourceError
@ -34,12 +34,12 @@ class MockTAXIICollectionEndpoint(Collection):
{
"date_added": get_timestamp(),
"id": object["id"],
"media_type": "application/stix+json;version=2.1",
"media_type": "application/stix+json;version=2.0",
"version": object.get("modified", object.get("created", get_timestamp())),
},
)
def get_objects(self, **filter_kwargs):
def get_objects(self, accept=MEDIA_TYPE_STIX_V20, start=0, per_request=0, **filter_kwargs):
self._verify_can_read()
query_params = _filter_kwargs_to_query_params(filter_kwargs)
assert isinstance(query_params, dict)
@ -51,7 +51,12 @@ class MockTAXIICollectionEndpoint(Collection):
100,
)[0]
if objs:
return stix2.v20.Bundle(objects=objs)
resp = Response()
resp.status_code = 200
resp.headers["Content-Range"] = f"items 0-{len(objs)}/{len(objs)}"
resp.encoding = "utf-8"
resp._content = bytes(stix2.v20.Bundle(objects=objs).serialize(ensure_ascii=False), resp.encoding)
return resp
else:
resp = Response()
resp.status_code = 404

View File

@ -424,7 +424,7 @@ def test_related_to_by_target(ds):
def test_versioned_checks(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": True,
@ -437,7 +437,7 @@ def test_versioned_checks(ds, ds2):
def test_semantic_check_with_versioning(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": False,
@ -467,13 +467,11 @@ def test_semantic_check_with_versioning(ds, ds2):
def test_list_semantic_check(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"ds1": ds,
"ds2": ds2,
"max_depth": 1,
},
})
@ -504,39 +502,18 @@ def test_list_semantic_check(ds, ds2):
def test_graph_similarity_raises_value_error(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": -1,
},
}
with pytest.raises(ValueError):
prop_scores1 = {}
stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
stix2.Environment().graph_similarity(ds, ds2, prop_scores1, max_depth=-1)
def test_graph_similarity_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, **weights)
env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, ignore_spec_version=True)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, **weights)
env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, ignore_spec_version=True)
assert round(env1) == 25
assert round(prop_scores1["matching_score"]) == 451
@ -552,41 +529,20 @@ def test_graph_similarity_with_filesystem_source(ds, fs):
def test_graph_similarity_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graph_similarity(ds, ds, prop_scores, **weights)
env = stix2.Environment().graph_similarity(ds, ds, prop_scores)
assert round(env) == 100
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_similarity_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, versioning_checks=True)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights)
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, versioning_checks=True)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
@ -602,26 +558,12 @@ def test_graph_similarity_with_versioning_check_on(ds2, ds):
def test_graph_similarity_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights)
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
@ -637,26 +579,12 @@ def test_graph_similarity_with_versioning_check_off(ds2, ds):
def test_graph_equivalence_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, **weights)
env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, ignore_spec_version=True)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, **weights)
env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, ignore_spec_version=True)
assert env1 is False
assert round(prop_scores1["matching_score"]) == 451
@ -672,41 +600,20 @@ def test_graph_equivalence_with_filesystem_source(ds, fs):
def test_graph_equivalence_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graph_equivalence(ds, ds, prop_scores, **weights)
env = stix2.Environment().graph_equivalence(ds, ds, prop_scores)
assert env is True
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_equivalence_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights)
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, versioning_checks=True)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights)
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, versioning_checks=True)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789
@ -722,26 +629,12 @@ def test_graph_equivalence_with_versioning_check_on(ds2, ds):
def test_graph_equivalence_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights)
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights)
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789

View File

@ -1,3 +1,4 @@
import io
import json
import pytest
@ -123,6 +124,27 @@ def test_bundle_id_must_start_with_bundle():
assert str(excinfo.value) == "Invalid value for Bundle 'id': must start with 'bundle--'."
def test_create_bundle_fp_serialize_pretty(indicator, malware, relationship):
bundle = stix2.v21.Bundle(objects=[indicator, malware, relationship])
buffer = io.StringIO()
bundle.fp_serialize(buffer, pretty=True)
assert str(bundle) == EXPECTED_BUNDLE
assert bundle.serialize(pretty=True) == EXPECTED_BUNDLE
assert buffer.getvalue() == EXPECTED_BUNDLE
def test_create_bundle_fp_serialize_nonpretty(indicator, malware, relationship):
bundle = stix2.v21.Bundle(objects=[indicator, malware, relationship])
buffer = io.StringIO()
bundle.fp_serialize(buffer, sort_keys=True)
assert bundle.serialize(sort_keys=True) == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
assert buffer.getvalue() == json.dumps(json.loads(EXPECTED_BUNDLE), sort_keys=True)
def test_create_bundle1(indicator, malware, relationship):
bundle = stix2.v21.Bundle(objects=[indicator, malware, relationship])

View File

@ -28,14 +28,14 @@ class MockTAXIICollectionEndpoint(Collection):
self._verify_can_write()
if isinstance(bundle, str):
bundle = json.loads(bundle)
for object in bundle.get("objects", []):
self.objects.append(object)
for obj in bundle.get("objects", []):
self.objects.append(obj)
self.manifests.append(
{
"date_added": get_timestamp(),
"id": object["id"],
"id": obj["id"],
"media_type": "application/stix+json;version=2.1",
"version": object.get("modified", object.get("created", get_timestamp())),
"version": obj.get("modified", obj.get("created", get_timestamp())),
},
)
@ -51,7 +51,10 @@ class MockTAXIICollectionEndpoint(Collection):
100,
)[0]
if objs:
return stix2.v21.Bundle(objects=objs)
return {
"objects": objs,
"more": False,
}
else:
resp = Response()
resp.status_code = 404
@ -75,7 +78,10 @@ class MockTAXIICollectionEndpoint(Collection):
else:
filtered_objects = []
if filtered_objects:
return stix2.v21.Bundle(objects=filtered_objects)
return {
"objects": filtered_objects,
"more": False,
}
else:
resp = Response()
resp.status_code = 404

View File

@ -760,16 +760,13 @@ def test_object_similarity_different_spec_version():
"valid_from": (5, stix2.equivalence.object.partial_timestamp_based),
"tdelta": 1, # One day interval
},
"_internal": {
"ignore_spec_version": True, # Disables spec_version check.
},
}
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **IND_KWARGS)
env = stix2.Environment().object_similarity(ind1, ind2, **weights)
env = stix2.Environment().object_similarity(ind1, ind2, ignore_spec_version=True, **weights)
assert round(env) == 0
env = stix2.Environment().object_similarity(ind2, ind1, **weights)
env = stix2.Environment().object_similarity(ind2, ind1, ignore_spec_version=True, **weights)
assert round(env) == 0
@ -858,10 +855,12 @@ def test_object_similarity_exact_match():
assert stix2.equivalence.object.exact_match(t1, t2) == 0.0
def test_non_existent_config_for_object():
def test_no_datastore_fallsback_list_based_check_for_refs_check():
r1 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
r2 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
assert stix2.Environment().object_similarity(r1, r2) == 0.0
prop_scores = {}
assert stix2.Environment().object_similarity(r1, r2, prop_scores) == 100.0
assert prop_scores["object_refs"]["check_type"] == "partial_list_based"
def custom_semantic_equivalence_method(obj1, obj2, **weights):
@ -937,7 +936,8 @@ def test_object_similarity_prop_scores_method_provided():
def test_versioned_checks(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
# Testing internal method
weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": True,
@ -950,7 +950,7 @@ def test_versioned_checks(ds, ds2):
def test_semantic_check_with_versioning(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": False,
@ -981,7 +981,7 @@ def test_semantic_check_with_versioning(ds, ds2):
def test_list_semantic_check(ds, ds2):
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
weights = stix2.equivalence.graph.WEIGHTS.copy()
weights.update({
"_internal": {
"ignore_spec_version": False,
@ -1027,39 +1027,28 @@ def test_list_semantic_check(ds, ds2):
def test_graph_similarity_raises_value_error(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": -1,
},
}
with pytest.raises(ValueError):
prop_scores1 = {}
stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
stix2.Environment().graph_similarity(ds, ds2, prop_scores1, max_depth=-1)
def test_graph_similarity_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(fs, ds, prop_scores1, **weights)
env1 = stix2.Environment().graph_similarity(
fs, ds, prop_scores1,
ignore_spec_version=True,
versioning_checks=False,
max_depth=1,
)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds, fs, prop_scores2, **weights)
env2 = stix2.Environment().graph_similarity(
ds, fs, prop_scores2,
ignore_spec_version=True,
versioning_checks=False,
max_depth=1,
)
assert round(env1) == 23
assert round(prop_scores1["matching_score"]) == 411
@ -1154,14 +1143,11 @@ def test_depth_limiting():
"some2_ref": (33, stix2.equivalence.object.reference_check),
"name": (34, stix2.equivalence.object.partial_string_based),
},
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.equivalence.graph.graph_similarity(mem_store1, mem_store2, prop_scores1, **custom_weights)
env1 = stix2.equivalence.graph.graph_similarity(
mem_store1, mem_store2, prop_scores1, **custom_weights
)
assert round(env1) == 38
assert round(prop_scores1["matching_score"]) == 300
@ -1185,44 +1171,23 @@ def test_depth_limiting():
def test_graph_similarity_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graph_similarity(ds, ds, prop_scores, **weights)
env = stix2.Environment().graph_similarity(ds, ds, prop_scores)
assert round(env) == 100
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_similarity_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, versioning_checks=True)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights)
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, versioning_checks=True)
assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
@ -1233,29 +1198,15 @@ def test_graph_similarity_with_versioning_check_on(ds2, ds):
def test_graph_similarity_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1, **weights)
env1 = stix2.Environment().graph_similarity(ds, ds2, prop_scores1)
assert round(env1) == 88
assert round(prop_scores1["matching_score"]) == 789
assert round(prop_scores1["len_pairs"]) == 9
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2, **weights)
env2 = stix2.Environment().graph_similarity(ds2, ds, prop_scores2)
assert round(env2) == 88
assert round(prop_scores2["matching_score"]) == 789
assert round(prop_scores2["len_pairs"]) == 9
@ -1266,26 +1217,12 @@ def test_graph_similarity_with_versioning_check_off(ds2, ds):
def test_graph_equivalence_with_filesystem_source(ds, fs):
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, **weights)
env1 = stix2.Environment().graph_equivalence(fs, ds, prop_scores1, ignore_spec_version=True)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": True,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, **weights)
env2 = stix2.Environment().graph_equivalence(ds, fs, prop_scores2, ignore_spec_version=True)
assert env1 is False
assert round(prop_scores1["matching_score"]) == 411
@ -1301,41 +1238,20 @@ def test_graph_equivalence_with_filesystem_source(ds, fs):
def test_graph_equivalence_with_duplicate_graph(ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores = {}
env = stix2.Environment().graph_equivalence(ds, ds, prop_scores, **weights)
env = stix2.Environment().graph_equivalence(ds, ds, prop_scores)
assert env is True
assert round(prop_scores["matching_score"]) == 800
assert round(prop_scores["len_pairs"]) == 8
def test_graph_equivalence_with_versioning_check_on(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights)
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, versioning_checks=True)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": True,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights)
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, versioning_checks=True)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789
@ -1351,26 +1267,12 @@ def test_graph_equivalence_with_versioning_check_on(ds2, ds):
def test_graph_equivalence_with_versioning_check_off(ds2, ds):
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores1 = {}
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1, **weights)
env1 = stix2.Environment().graph_equivalence(ds, ds2, prop_scores1)
# Switching parameters
weights = {
"_internal": {
"ignore_spec_version": False,
"versioning_checks": False,
"max_depth": 1,
},
}
prop_scores2 = {}
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2, **weights)
env2 = stix2.Environment().graph_equivalence(ds2, ds, prop_scores2)
assert env1 is True
assert round(prop_scores1["matching_score"]) == 789