Graph Equivalence (#449)
* new packages for graph and object-based semantic equivalence
* new method graphically_equivalent for Environment, move equivalence methods out
* object equivalence function, methods used for object-based moved here.
* new graph_equivalence methods
* add notes
* add support for versioning checks (default disabled)
* new tests to cover graph equivalence and new methods
* added more imports to environment.py to prevent breaking changes
* variable changes, new fields for checks, reset depth check per call
* flexibility when object is not available on graph.
* refactor debug logging message
* new file stix2.equivalence.graph_equivalence.rst and stix2.equivalence.object_equivalence.rst for docs
* API documentation for new modules
* additional text required to build docs
* add more test methods for list_semantic_check an graphically_equivalent/versioning
* add logging debug messages, code clean-up
* include individual scoring on results dict, fix issue on list_semantic_check not keeping highest score
* include results as summary in prop_scores, minor tweaks
* Update __init__.py
doctrings update
* apply feedback from pull request
- rename semantic_check to reference_check
- rename modules to graph and object respectively to eliminate redundancy
- remove created_by_ref and object_marking_refs from graph WEIGHTS and rebalance
* update docs/ entries
* add more checks, make max score based on actual objects checked instead of the full list, only create entry when type is present in WEIGHTS dictionary
update tests to reflect changes
* rename package patterns -> pattern
* documentation, moving weights around
* more documentation moving
* rename WEIGHTS variable for graph_equivalence
2020-10-16 17:35:26 +02:00
|
|
|
import os
|
|
|
|
|
2017-09-08 15:01:12 +02:00
|
|
|
import pytest
|
|
|
|
|
2017-07-12 17:36:15 +02:00
|
|
|
import stix2
|
Graph Equivalence (#449)
* new packages for graph and object-based semantic equivalence
* new method graphically_equivalent for Environment, move equivalence methods out
* object equivalence function, methods used for object-based moved here.
* new graph_equivalence methods
* add notes
* add support for versioning checks (default disabled)
* new tests to cover graph equivalence and new methods
* added more imports to environment.py to prevent breaking changes
* variable changes, new fields for checks, reset depth check per call
* flexibility when object is not available on graph.
* refactor debug logging message
* new file stix2.equivalence.graph_equivalence.rst and stix2.equivalence.object_equivalence.rst for docs
* API documentation for new modules
* additional text required to build docs
* add more test methods for list_semantic_check an graphically_equivalent/versioning
* add logging debug messages, code clean-up
* include individual scoring on results dict, fix issue on list_semantic_check not keeping highest score
* include results as summary in prop_scores, minor tweaks
* Update __init__.py
doctrings update
* apply feedback from pull request
- rename semantic_check to reference_check
- rename modules to graph and object respectively to eliminate redundancy
- remove created_by_ref and object_marking_refs from graph WEIGHTS and rebalance
* update docs/ entries
* add more checks, make max score based on actual objects checked instead of the full list, only create entry when type is present in WEIGHTS dictionary
update tests to reflect changes
* rename package patterns -> pattern
* documentation, moving weights around
* more documentation moving
* rename WEIGHTS variable for graph_equivalence
2020-10-16 17:35:26 +02:00
|
|
|
import stix2.equivalence.graph
|
|
|
|
import stix2.equivalence.object
|
2017-07-12 17:36:15 +02:00
|
|
|
|
2018-07-13 17:10:05 +02:00
|
|
|
from .constants import (
|
|
|
|
CAMPAIGN_ID, CAMPAIGN_KWARGS, FAKE_TIME, IDENTITY_ID, IDENTITY_KWARGS,
|
|
|
|
INDICATOR_ID, INDICATOR_KWARGS, MALWARE_ID, MALWARE_KWARGS,
|
|
|
|
RELATIONSHIP_IDS,
|
|
|
|
)
|
2017-11-02 21:18:41 +01:00
|
|
|
|
Graph Equivalence (#449)
* new packages for graph and object-based semantic equivalence
* new method graphically_equivalent for Environment, move equivalence methods out
* object equivalence function, methods used for object-based moved here.
* new graph_equivalence methods
* add notes
* add support for versioning checks (default disabled)
* new tests to cover graph equivalence and new methods
* added more imports to environment.py to prevent breaking changes
* variable changes, new fields for checks, reset depth check per call
* flexibility when object is not available on graph.
* refactor debug logging message
* new file stix2.equivalence.graph_equivalence.rst and stix2.equivalence.object_equivalence.rst for docs
* API documentation for new modules
* additional text required to build docs
* add more test methods for list_semantic_check an graphically_equivalent/versioning
* add logging debug messages, code clean-up
* include individual scoring on results dict, fix issue on list_semantic_check not keeping highest score
* include results as summary in prop_scores, minor tweaks
* Update __init__.py
doctrings update
* apply feedback from pull request
- rename semantic_check to reference_check
- rename modules to graph and object respectively to eliminate redundancy
- remove created_by_ref and object_marking_refs from graph WEIGHTS and rebalance
* update docs/ entries
* add more checks, make max score based on actual objects checked instead of the full list, only create entry when type is present in WEIGHTS dictionary
update tests to reflect changes
* rename package patterns -> pattern
* documentation, moving weights around
* more documentation moving
* rename WEIGHTS variable for graph_equivalence
2020-10-16 17:35:26 +02:00
|
|
|
FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
|
|
|
|
|
2017-11-02 21:18:41 +01:00
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
def ds():
|
2018-07-05 21:23:25 +02:00
|
|
|
cam = stix2.v20.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
|
|
|
|
idy = stix2.v20.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
|
|
|
|
ind = stix2.v20.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
|
|
|
mal = stix2.v20.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
|
|
|
|
rel1 = stix2.v20.Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
|
|
|
|
rel2 = stix2.v20.Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
|
|
|
|
rel3 = stix2.v20.Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
|
Graph Equivalence (#449)
* new packages for graph and object-based semantic equivalence
* new method graphically_equivalent for Environment, move equivalence methods out
* object equivalence function, methods used for object-based moved here.
* new graph_equivalence methods
* add notes
* add support for versioning checks (default disabled)
* new tests to cover graph equivalence and new methods
* added more imports to environment.py to prevent breaking changes
* variable changes, new fields for checks, reset depth check per call
* flexibility when object is not available on graph.
* refactor debug logging message
* new file stix2.equivalence.graph_equivalence.rst and stix2.equivalence.object_equivalence.rst for docs
* API documentation for new modules
* additional text required to build docs
* add more test methods for list_semantic_check an graphically_equivalent/versioning
* add logging debug messages, code clean-up
* include individual scoring on results dict, fix issue on list_semantic_check not keeping highest score
* include results as summary in prop_scores, minor tweaks
* Update __init__.py
doctrings update
* apply feedback from pull request
- rename semantic_check to reference_check
- rename modules to graph and object respectively to eliminate redundancy
- remove created_by_ref and object_marking_refs from graph WEIGHTS and rebalance
* update docs/ entries
* add more checks, make max score based on actual objects checked instead of the full list, only create entry when type is present in WEIGHTS dictionary
update tests to reflect changes
* rename package patterns -> pattern
* documentation, moving weights around
* more documentation moving
* rename WEIGHTS variable for graph_equivalence
2020-10-16 17:35:26 +02:00
|
|
|
reprt = stix2.v20.Report(
|
|
|
|
name="Malware Report",
|
|
|
|
published="2021-05-09T08:22:22Z",
|
|
|
|
labels=["campaign"],
|
|
|
|
object_refs=[mal.id, rel1.id, ind.id],
|
|
|
|
)
|
|
|
|
stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3, reprt]
|
|
|
|
yield stix2.MemoryStore(stix_objs)
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
def ds2():
|
|
|
|
cam = stix2.v20.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
|
|
|
|
idy = stix2.v20.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
|
|
|
|
ind = stix2.v20.Indicator(id=INDICATOR_ID, created_by_ref=idy.id, **INDICATOR_KWARGS)
|
|
|
|
indv2 = ind.new_version(external_references=[{
|
|
|
|
"source_name": "unknown",
|
|
|
|
"url": "https://examplewebsite.com/",
|
|
|
|
}])
|
|
|
|
mal = stix2.v20.Malware(id=MALWARE_ID, created_by_ref=idy.id, **MALWARE_KWARGS)
|
|
|
|
malv2 = mal.new_version(external_references=[{
|
|
|
|
"source_name": "unknown",
|
|
|
|
"url": "https://examplewebsite2.com/",
|
|
|
|
}])
|
|
|
|
rel1 = stix2.v20.Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
|
|
|
|
rel2 = stix2.v20.Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
|
|
|
|
rel3 = stix2.v20.Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
|
|
|
|
stix_objs = [cam, idy, ind, indv2, mal, malv2, rel1, rel2, rel3]
|
|
|
|
reprt = stix2.v20.Report(
|
|
|
|
created_by_ref=idy.id,
|
|
|
|
name="example",
|
|
|
|
labels=["campaign"],
|
|
|
|
published="2021-04-09T08:22:22Z",
|
|
|
|
object_refs=stix_objs,
|
|
|
|
)
|
|
|
|
stix_objs.append(reprt)
|
2017-11-02 21:18:41 +01:00
|
|
|
yield stix2.MemoryStore(stix_objs)
|
2017-07-12 17:36:15 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_object_factory_created_by_ref_str():
|
2017-07-13 15:45:43 +02:00
|
|
|
factory = stix2.ObjectFactory(created_by_ref=IDENTITY_ID)
|
2018-07-05 21:23:25 +02:00
|
|
|
ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
|
2017-07-12 17:36:15 +02:00
|
|
|
assert ind.created_by_ref == IDENTITY_ID
|
|
|
|
|
|
|
|
|
|
|
|
def test_object_factory_created_by_ref_obj():
|
2018-07-05 21:23:25 +02:00
|
|
|
id_obj = stix2.v20.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
|
2017-07-13 15:45:43 +02:00
|
|
|
factory = stix2.ObjectFactory(created_by_ref=id_obj)
|
2018-07-05 21:23:25 +02:00
|
|
|
ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
|
2017-07-12 17:36:15 +02:00
|
|
|
assert ind.created_by_ref == IDENTITY_ID
|
2017-07-12 20:44:52 +02:00
|
|
|
|
|
|
|
|
2017-07-13 15:45:43 +02:00
|
|
|
def test_object_factory_override_default():
|
|
|
|
factory = stix2.ObjectFactory(created_by_ref=IDENTITY_ID)
|
|
|
|
new_id = "identity--983b3172-44fe-4a80-8091-eb8098841fe8"
|
2018-07-05 21:23:25 +02:00
|
|
|
ind = factory.create(stix2.v20.Indicator, created_by_ref=new_id, **INDICATOR_KWARGS)
|
2017-07-13 15:45:43 +02:00
|
|
|
assert ind.created_by_ref == new_id
|
|
|
|
|
|
|
|
|
|
|
|
def test_object_factory_created():
|
|
|
|
factory = stix2.ObjectFactory(created=FAKE_TIME)
|
2018-07-05 21:23:25 +02:00
|
|
|
ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
|
2017-07-13 15:45:43 +02:00
|
|
|
assert ind.created == FAKE_TIME
|
|
|
|
assert ind.modified == FAKE_TIME
|
|
|
|
|
|
|
|
|
2018-03-19 18:32:02 +01:00
|
|
|
def test_object_factory_external_reference():
|
2018-07-05 21:23:25 +02:00
|
|
|
ext_ref = stix2.v20.ExternalReference(
|
|
|
|
source_name="ACME Threat Intel",
|
2018-07-13 17:10:05 +02:00
|
|
|
description="Threat report",
|
|
|
|
)
|
2017-07-13 15:45:43 +02:00
|
|
|
factory = stix2.ObjectFactory(external_references=ext_ref)
|
2018-07-05 21:23:25 +02:00
|
|
|
ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
|
2017-07-13 15:45:43 +02:00
|
|
|
assert ind.external_references[0].source_name == "ACME Threat Intel"
|
|
|
|
assert ind.external_references[0].description == "Threat report"
|
|
|
|
|
2018-07-05 21:23:25 +02:00
|
|
|
ind2 = factory.create(stix2.v20.Indicator, external_references=None, **INDICATOR_KWARGS)
|
2017-07-17 20:56:13 +02:00
|
|
|
assert 'external_references' not in ind2
|
|
|
|
|
2017-07-13 15:45:43 +02:00
|
|
|
|
2017-07-12 20:44:52 +02:00
|
|
|
def test_object_factory_obj_markings():
|
2018-07-05 21:23:25 +02:00
|
|
|
stmt_marking = stix2.v20.StatementMarking("Copyright 2016, Example Corp")
|
|
|
|
mark_def = stix2.v20.MarkingDefinition(
|
|
|
|
definition_type="statement",
|
2018-07-13 17:10:05 +02:00
|
|
|
definition=stmt_marking,
|
|
|
|
)
|
2018-07-05 21:23:25 +02:00
|
|
|
factory = stix2.ObjectFactory(object_marking_refs=[mark_def, stix2.v20.TLP_AMBER])
|
|
|
|
ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
|
2017-07-12 20:44:52 +02:00
|
|
|
assert mark_def.id in ind.object_marking_refs
|
2018-07-05 21:23:25 +02:00
|
|
|
assert stix2.v20.TLP_AMBER.id in ind.object_marking_refs
|
2017-07-12 20:44:52 +02:00
|
|
|
|
2018-07-05 21:23:25 +02:00
|
|
|
factory = stix2.ObjectFactory(object_marking_refs=stix2.v20.TLP_RED)
|
|
|
|
ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
|
|
|
|
assert stix2.v20.TLP_RED.id in ind.object_marking_refs
|
2017-07-17 20:56:13 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_object_factory_list_append():
|
2018-07-05 21:23:25 +02:00
|
|
|
ext_ref = stix2.v20.ExternalReference(
|
|
|
|
source_name="ACME Threat Intel",
|
2018-07-13 17:10:05 +02:00
|
|
|
description="Threat report from ACME",
|
|
|
|
)
|
2018-07-05 21:23:25 +02:00
|
|
|
ext_ref2 = stix2.v20.ExternalReference(
|
|
|
|
source_name="Yet Another Threat Report",
|
2018-07-13 17:10:05 +02:00
|
|
|
description="Threat report from YATR",
|
|
|
|
)
|
2018-07-05 21:23:25 +02:00
|
|
|
ext_ref3 = stix2.v20.ExternalReference(
|
|
|
|
source_name="Threat Report #3",
|
2018-07-13 17:10:05 +02:00
|
|
|
description="One more threat report",
|
|
|
|
)
|
2017-07-17 20:56:13 +02:00
|
|
|
factory = stix2.ObjectFactory(external_references=ext_ref)
|
2018-07-05 21:23:25 +02:00
|
|
|
ind = factory.create(stix2.v20.Indicator, external_references=ext_ref2, **INDICATOR_KWARGS)
|
2017-07-17 20:56:13 +02:00
|
|
|
assert ind.external_references[1].source_name == "Yet Another Threat Report"
|
|
|
|
|
2018-07-05 21:23:25 +02:00
|
|
|
ind = factory.create(stix2.v20.Indicator, external_references=[ext_ref2, ext_ref3], **INDICATOR_KWARGS)
|
2017-07-18 18:05:19 +02:00
|
|
|
assert ind.external_references[2].source_name == "Threat Report #3"
|
|
|
|
|
2017-07-17 20:56:13 +02:00
|
|
|
|
|
|
|
def test_object_factory_list_replace():
|
2018-07-05 21:23:25 +02:00
|
|
|
ext_ref = stix2.v20.ExternalReference(
|
|
|
|
source_name="ACME Threat Intel",
|
2018-07-13 17:10:05 +02:00
|
|
|
description="Threat report from ACME",
|
|
|
|
)
|
2018-07-05 21:23:25 +02:00
|
|
|
ext_ref2 = stix2.v20.ExternalReference(
|
|
|
|
source_name="Yet Another Threat Report",
|
2018-07-13 17:10:05 +02:00
|
|
|
description="Threat report from YATR",
|
|
|
|
)
|
2017-07-17 20:56:13 +02:00
|
|
|
factory = stix2.ObjectFactory(external_references=ext_ref, list_append=False)
|
2018-07-05 21:23:25 +02:00
|
|
|
ind = factory.create(stix2.v20.Indicator, external_references=ext_ref2, **INDICATOR_KWARGS)
|
2017-07-17 20:56:13 +02:00
|
|
|
assert len(ind.external_references) == 1
|
|
|
|
assert ind.external_references[0].source_name == "Yet Another Threat Report"
|
2017-09-08 15:01:12 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_environment_functions():
|
2018-07-13 17:10:05 +02:00
|
|
|
env = stix2.Environment(
|
|
|
|
stix2.ObjectFactory(created_by_ref=IDENTITY_ID),
|
|
|
|
stix2.MemoryStore(),
|
|
|
|
)
|
2017-09-08 15:01:12 +02:00
|
|
|
|
|
|
|
# Create a STIX object
|
2018-07-05 21:23:25 +02:00
|
|
|
ind = env.create(stix2.v20.Indicator, id=INDICATOR_ID, **INDICATOR_KWARGS)
|
2017-09-08 15:01:12 +02:00
|
|
|
assert ind.created_by_ref == IDENTITY_ID
|
|
|
|
|
|
|
|
# Add objects to datastore
|
|
|
|
ind2 = ind.new_version(labels=['benign'])
|
|
|
|
env.add([ind, ind2])
|
|
|
|
|
|
|
|
# Get both versions of the object
|
|
|
|
resp = env.all_versions(INDICATOR_ID)
|
2018-10-15 23:57:57 +02:00
|
|
|
assert len(resp) == 2
|
2017-09-08 15:01:12 +02:00
|
|
|
|
|
|
|
# Get just the most recent version of the object
|
|
|
|
resp = env.get(INDICATOR_ID)
|
|
|
|
assert resp['labels'][0] == 'benign'
|
|
|
|
|
|
|
|
# Search on something other than id
|
|
|
|
query = [stix2.Filter('type', '=', 'vulnerability')]
|
|
|
|
resp = env.query(query)
|
|
|
|
assert len(resp) == 0
|
|
|
|
|
|
|
|
# See different results after adding filters to the environment
|
2018-07-13 17:10:05 +02:00
|
|
|
env.add_filters([
|
|
|
|
stix2.Filter('type', '=', 'indicator'),
|
|
|
|
stix2.Filter('created_by_ref', '=', IDENTITY_ID),
|
|
|
|
])
|
2017-09-08 15:01:12 +02:00
|
|
|
env.add_filter(stix2.Filter('labels', '=', 'benign')) # should be 'malicious-activity'
|
|
|
|
resp = env.get(INDICATOR_ID)
|
|
|
|
assert resp['labels'][0] == 'benign' # should be 'malicious-activity'
|
|
|
|
|
|
|
|
|
|
|
|
def test_environment_source_and_sink():
|
2018-07-05 21:23:25 +02:00
|
|
|
ind = stix2.v20.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
2017-09-08 15:01:12 +02:00
|
|
|
env = stix2.Environment(source=stix2.MemorySource([ind]), sink=stix2.MemorySink([ind]))
|
|
|
|
assert env.get(INDICATOR_ID).labels[0] == 'malicious-activity'
|
|
|
|
|
|
|
|
|
|
|
|
def test_environment_datastore_and_sink():
|
|
|
|
with pytest.raises(ValueError) as excinfo:
|
2018-07-13 17:10:05 +02:00
|
|
|
stix2.Environment(
|
|
|
|
factory=stix2.ObjectFactory(),
|
|
|
|
store=stix2.MemoryStore(), sink=stix2.MemorySink,
|
|
|
|
)
|
2017-09-08 15:01:12 +02:00
|
|
|
assert 'Data store already provided' in str(excinfo.value)
|
|
|
|
|
|
|
|
|
|
|
|
def test_environment_no_datastore():
|
|
|
|
env = stix2.Environment(factory=stix2.ObjectFactory())
|
|
|
|
|
|
|
|
with pytest.raises(AttributeError) as excinfo:
|
2018-07-05 21:23:25 +02:00
|
|
|
env.add(stix2.v20.Indicator(**INDICATOR_KWARGS))
|
2017-09-08 15:01:12 +02:00
|
|
|
assert 'Environment has no data sink to put objects in' in str(excinfo.value)
|
|
|
|
|
|
|
|
with pytest.raises(AttributeError) as excinfo:
|
|
|
|
env.get(INDICATOR_ID)
|
|
|
|
assert 'Environment has no data source' in str(excinfo.value)
|
|
|
|
|
|
|
|
with pytest.raises(AttributeError) as excinfo:
|
|
|
|
env.all_versions(INDICATOR_ID)
|
|
|
|
assert 'Environment has no data source' in str(excinfo.value)
|
|
|
|
|
|
|
|
with pytest.raises(AttributeError) as excinfo:
|
|
|
|
env.query(INDICATOR_ID)
|
|
|
|
assert 'Environment has no data source' in str(excinfo.value)
|
|
|
|
|
2017-11-16 22:38:11 +01:00
|
|
|
with pytest.raises(AttributeError) as excinfo:
|
|
|
|
env.relationships(INDICATOR_ID)
|
|
|
|
assert 'Environment has no data source' in str(excinfo.value)
|
|
|
|
|
|
|
|
with pytest.raises(AttributeError) as excinfo:
|
|
|
|
env.related_to(INDICATOR_ID)
|
|
|
|
assert 'Environment has no data source' in str(excinfo.value)
|
|
|
|
|
2017-09-08 15:01:12 +02:00
|
|
|
|
2017-09-29 17:24:19 +02:00
|
|
|
def test_environment_add_filters():
|
|
|
|
env = stix2.Environment(factory=stix2.ObjectFactory())
|
|
|
|
env.add_filters([INDICATOR_ID])
|
|
|
|
env.add_filter(INDICATOR_ID)
|
2017-09-08 15:01:12 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_environment_datastore_and_no_object_factory():
|
|
|
|
# Uses a default object factory
|
|
|
|
env = stix2.Environment(store=stix2.MemoryStore())
|
2018-07-05 21:23:25 +02:00
|
|
|
ind = env.create(stix2.v20.Indicator, id=INDICATOR_ID, **INDICATOR_KWARGS)
|
2017-09-08 15:01:12 +02:00
|
|
|
assert ind.id == INDICATOR_ID
|
2017-09-08 18:39:36 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_parse_malware():
|
|
|
|
env = stix2.Environment()
|
|
|
|
data = """{
|
|
|
|
"type": "malware",
|
2018-06-27 18:34:49 +02:00
|
|
|
"id": "malware--9c4638ec-f1de-4ddb-abf4-1b760417654e",
|
2017-09-08 18:39:36 +02:00
|
|
|
"created": "2017-01-01T12:34:56.000Z",
|
|
|
|
"modified": "2017-01-01T12:34:56.000Z",
|
|
|
|
"name": "Cryptolocker",
|
|
|
|
"labels": [
|
|
|
|
"ransomware"
|
2018-07-05 21:23:25 +02:00
|
|
|
]
|
2017-09-08 18:39:36 +02:00
|
|
|
}"""
|
2018-07-05 21:23:25 +02:00
|
|
|
mal = env.parse(data, version="2.0")
|
2017-09-08 18:39:36 +02:00
|
|
|
|
|
|
|
assert mal.type == 'malware'
|
|
|
|
assert mal.id == MALWARE_ID
|
|
|
|
assert mal.created == FAKE_TIME
|
|
|
|
assert mal.modified == FAKE_TIME
|
|
|
|
assert mal.labels == ['ransomware']
|
|
|
|
assert mal.name == "Cryptolocker"
|
2017-10-20 15:13:04 +02:00
|
|
|
|
|
|
|
|
2017-11-16 22:38:11 +01:00
|
|
|
def test_creator_of():
|
2018-07-05 21:23:25 +02:00
|
|
|
identity = stix2.v20.Identity(**IDENTITY_KWARGS)
|
2017-10-20 15:13:04 +02:00
|
|
|
factory = stix2.ObjectFactory(created_by_ref=identity.id)
|
|
|
|
env = stix2.Environment(store=stix2.MemoryStore(), factory=factory)
|
|
|
|
env.add(identity)
|
|
|
|
|
2018-07-05 21:23:25 +02:00
|
|
|
ind = env.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
|
2017-10-24 20:20:42 +02:00
|
|
|
creator = env.creator_of(ind)
|
|
|
|
assert creator is identity
|
2017-10-20 15:13:04 +02:00
|
|
|
|
|
|
|
|
2017-11-16 22:38:11 +01:00
|
|
|
def test_creator_of_no_datasource():
|
2018-07-05 21:23:25 +02:00
|
|
|
identity = stix2.v20.Identity(**IDENTITY_KWARGS)
|
2017-10-20 15:13:04 +02:00
|
|
|
factory = stix2.ObjectFactory(created_by_ref=identity.id)
|
|
|
|
env = stix2.Environment(factory=factory)
|
|
|
|
|
2018-07-05 21:23:25 +02:00
|
|
|
ind = env.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
|
2017-10-20 15:13:04 +02:00
|
|
|
with pytest.raises(AttributeError) as excinfo:
|
2017-10-24 20:20:42 +02:00
|
|
|
env.creator_of(ind)
|
2017-10-20 15:13:04 +02:00
|
|
|
assert 'Environment has no data source' in str(excinfo.value)
|
2017-10-24 20:20:42 +02:00
|
|
|
|
|
|
|
|
2017-11-16 22:38:11 +01:00
|
|
|
def test_creator_of_not_found():
|
2018-07-05 21:23:25 +02:00
|
|
|
identity = stix2.v20.Identity(**IDENTITY_KWARGS)
|
2017-10-24 20:20:42 +02:00
|
|
|
factory = stix2.ObjectFactory(created_by_ref=identity.id)
|
|
|
|
env = stix2.Environment(store=stix2.MemoryStore(), factory=factory)
|
|
|
|
|
2018-07-05 21:23:25 +02:00
|
|
|
ind = env.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
|
2017-10-24 20:20:42 +02:00
|
|
|
creator = env.creator_of(ind)
|
|
|
|
assert creator is None
|
2017-11-02 21:18:41 +01:00
|
|
|
|
|
|
|
|
2017-11-16 22:38:11 +01:00
|
|
|
def test_creator_of_no_created_by_ref():
|
|
|
|
env = stix2.Environment(store=stix2.MemoryStore())
|
2018-07-05 21:23:25 +02:00
|
|
|
ind = env.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
|
2017-11-16 22:38:11 +01:00
|
|
|
creator = env.creator_of(ind)
|
|
|
|
assert creator is None
|
|
|
|
|
|
|
|
|
2017-11-02 21:18:41 +01:00
|
|
|
def test_relationships(ds):
|
|
|
|
env = stix2.Environment(store=ds)
|
|
|
|
mal = env.get(MALWARE_ID)
|
|
|
|
resp = env.relationships(mal)
|
|
|
|
|
|
|
|
assert len(resp) == 3
|
2017-11-15 22:34:07 +01:00
|
|
|
assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
|
|
|
|
assert any(x['id'] == RELATIONSHIP_IDS[1] for x in resp)
|
|
|
|
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
|
2017-11-02 21:18:41 +01:00
|
|
|
|
|
|
|
|
2017-11-21 16:29:57 +01:00
|
|
|
def test_relationships_no_id(ds):
|
|
|
|
env = stix2.Environment(store=ds)
|
|
|
|
mal = {
|
|
|
|
"type": "malware",
|
2018-07-13 17:10:05 +02:00
|
|
|
"name": "some variant",
|
2017-11-21 16:29:57 +01:00
|
|
|
}
|
|
|
|
with pytest.raises(ValueError) as excinfo:
|
|
|
|
env.relationships(mal)
|
|
|
|
assert "object has no 'id' property" in str(excinfo.value)
|
|
|
|
|
|
|
|
|
2017-11-02 21:18:41 +01:00
|
|
|
def test_relationships_by_type(ds):
|
|
|
|
env = stix2.Environment(store=ds)
|
|
|
|
mal = env.get(MALWARE_ID)
|
|
|
|
resp = env.relationships(mal, relationship_type='indicates')
|
|
|
|
|
|
|
|
assert len(resp) == 1
|
2017-11-15 22:34:07 +01:00
|
|
|
assert resp[0]['id'] == RELATIONSHIP_IDS[0]
|
2017-11-02 21:18:41 +01:00
|
|
|
|
|
|
|
|
|
|
|
def test_relationships_by_source(ds):
|
|
|
|
env = stix2.Environment(store=ds)
|
|
|
|
resp = env.relationships(MALWARE_ID, source_only=True)
|
|
|
|
|
|
|
|
assert len(resp) == 1
|
2017-11-15 22:34:07 +01:00
|
|
|
assert resp[0]['id'] == RELATIONSHIP_IDS[1]
|
2017-11-02 21:18:41 +01:00
|
|
|
|
|
|
|
|
|
|
|
def test_relationships_by_target(ds):
|
|
|
|
env = stix2.Environment(store=ds)
|
|
|
|
resp = env.relationships(MALWARE_ID, target_only=True)
|
|
|
|
|
|
|
|
assert len(resp) == 2
|
2017-11-15 22:34:07 +01:00
|
|
|
assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
|
|
|
|
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
|
2017-11-02 21:18:41 +01:00
|
|
|
|
|
|
|
|
|
|
|
def test_relationships_by_target_and_type(ds):
|
|
|
|
env = stix2.Environment(store=ds)
|
|
|
|
resp = env.relationships(MALWARE_ID, relationship_type='uses', target_only=True)
|
|
|
|
|
|
|
|
assert len(resp) == 1
|
2017-11-15 22:34:07 +01:00
|
|
|
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
|
2017-11-02 21:18:41 +01:00
|
|
|
|
|
|
|
|
|
|
|
def test_relationships_by_target_and_source(ds):
|
|
|
|
env = stix2.Environment(store=ds)
|
|
|
|
with pytest.raises(ValueError) as excinfo:
|
|
|
|
env.relationships(MALWARE_ID, target_only=True, source_only=True)
|
|
|
|
|
|
|
|
assert 'not both' in str(excinfo.value)
|
2017-11-16 20:58:59 +01:00
|
|
|
|
|
|
|
|
|
|
|
def test_related_to(ds):
|
|
|
|
env = stix2.Environment(store=ds)
|
|
|
|
mal = env.get(MALWARE_ID)
|
|
|
|
resp = env.related_to(mal)
|
|
|
|
|
|
|
|
assert len(resp) == 3
|
|
|
|
assert any(x['id'] == CAMPAIGN_ID for x in resp)
|
|
|
|
assert any(x['id'] == INDICATOR_ID for x in resp)
|
|
|
|
assert any(x['id'] == IDENTITY_ID for x in resp)
|
|
|
|
|
|
|
|
|
2017-11-21 16:29:57 +01:00
|
|
|
def test_related_to_no_id(ds):
|
|
|
|
env = stix2.Environment(store=ds)
|
|
|
|
mal = {
|
|
|
|
"type": "malware",
|
2018-07-13 17:10:05 +02:00
|
|
|
"name": "some variant",
|
2017-11-21 16:29:57 +01:00
|
|
|
}
|
|
|
|
with pytest.raises(ValueError) as excinfo:
|
|
|
|
env.related_to(mal)
|
|
|
|
assert "object has no 'id' property" in str(excinfo.value)
|
|
|
|
|
|
|
|
|
2017-11-16 20:58:59 +01:00
|
|
|
def test_related_to_by_source(ds):
|
|
|
|
env = stix2.Environment(store=ds)
|
|
|
|
resp = env.related_to(MALWARE_ID, source_only=True)
|
|
|
|
|
|
|
|
assert len(resp) == 1
|
|
|
|
assert resp[0]['id'] == IDENTITY_ID
|
|
|
|
|
|
|
|
|
|
|
|
def test_related_to_by_target(ds):
|
|
|
|
env = stix2.Environment(store=ds)
|
|
|
|
resp = env.related_to(MALWARE_ID, target_only=True)
|
|
|
|
|
|
|
|
assert len(resp) == 2
|
|
|
|
assert any(x['id'] == CAMPAIGN_ID for x in resp)
|
|
|
|
assert any(x['id'] == INDICATOR_ID for x in resp)
|
Graph Equivalence (#449)
* new packages for graph and object-based semantic equivalence
* new method graphically_equivalent for Environment, move equivalence methods out
* object equivalence function, methods used for object-based moved here.
* new graph_equivalence methods
* add notes
* add support for versioning checks (default disabled)
* new tests to cover graph equivalence and new methods
* added more imports to environment.py to prevent breaking changes
* variable changes, new fields for checks, reset depth check per call
* flexibility when object is not available on graph.
* refactor debug logging message
* new file stix2.equivalence.graph_equivalence.rst and stix2.equivalence.object_equivalence.rst for docs
* API documentation for new modules
* additional text required to build docs
* add more test methods for list_semantic_check an graphically_equivalent/versioning
* add logging debug messages, code clean-up
* include individual scoring on results dict, fix issue on list_semantic_check not keeping highest score
* include results as summary in prop_scores, minor tweaks
* Update __init__.py
doctrings update
* apply feedback from pull request
- rename semantic_check to reference_check
- rename modules to graph and object respectively to eliminate redundancy
- remove created_by_ref and object_marking_refs from graph WEIGHTS and rebalance
* update docs/ entries
* add more checks, make max score based on actual objects checked instead of the full list, only create entry when type is present in WEIGHTS dictionary
update tests to reflect changes
* rename package patterns -> pattern
* documentation, moving weights around
* more documentation moving
* rename WEIGHTS variable for graph_equivalence
2020-10-16 17:35:26 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_versioned_checks(ds, ds2):
|
|
|
|
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
|
|
|
|
weights.update({
|
|
|
|
"_internal": {
|
|
|
|
"ignore_spec_version": True,
|
|
|
|
"versioning_checks": True,
|
|
|
|
"max_depth": 1,
|
|
|
|
},
|
|
|
|
})
|
|
|
|
score = stix2.equivalence.object._versioned_checks(INDICATOR_ID, INDICATOR_ID, ds, ds2, **weights)
|
|
|
|
assert round(score) == 100
|
|
|
|
|
|
|
|
|
|
|
|
def test_semantic_check_with_versioning(ds, ds2):
|
|
|
|
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
|
|
|
|
weights.update({
|
|
|
|
"_internal": {
|
|
|
|
"ignore_spec_version": False,
|
|
|
|
"versioning_checks": True,
|
|
|
|
"ds1": ds,
|
|
|
|
"ds2": ds2,
|
|
|
|
"max_depth": 1,
|
|
|
|
},
|
|
|
|
})
|
|
|
|
ind = stix2.v20.Indicator(
|
|
|
|
**dict(
|
|
|
|
labels=["malicious-activity"],
|
|
|
|
pattern="[file:hashes.'SHA-256' = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855']",
|
|
|
|
valid_from="2017-01-01T12:34:56Z",
|
|
|
|
external_references=[
|
|
|
|
{
|
|
|
|
"source_name": "unknown",
|
|
|
|
"url": "https://examplewebsite2.com/",
|
|
|
|
},
|
|
|
|
],
|
|
|
|
object_marking_refs=[stix2.v20.TLP_WHITE],
|
|
|
|
)
|
|
|
|
)
|
|
|
|
ds.add(ind)
|
|
|
|
score = stix2.equivalence.object.reference_check(ind.id, INDICATOR_ID, ds, ds2, **weights)
|
|
|
|
assert round(score) == 0 # Since pattern is different score is really low
|
|
|
|
|
|
|
|
|
|
|
|
def test_list_semantic_check(ds, ds2):
|
|
|
|
weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
|
|
|
|
weights.update({
|
|
|
|
"_internal": {
|
|
|
|
"ignore_spec_version": False,
|
|
|
|
"versioning_checks": False,
|
|
|
|
"ds1": ds,
|
|
|
|
"ds2": ds2,
|
|
|
|
"max_depth": 1,
|
|
|
|
},
|
|
|
|
})
|
|
|
|
object_refs1 = [
|
|
|
|
"malware--9c4638ec-f1de-4ddb-abf4-1b760417654e",
|
|
|
|
"relationship--06520621-5352-4e6a-b976-e8fa3d437ffd",
|
|
|
|
"indicator--a740531e-63ff-4e49-a9e1-a0a3eed0e3e7",
|
|
|
|
]
|
|
|
|
object_refs2 = [
|
|
|
|
"campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
|
|
|
|
"identity--311b2d2d-f010-4473-83ec-1edf84858f4c",
|
|
|
|
"indicator--a740531e-63ff-4e49-a9e1-a0a3eed0e3e7",
|
|
|
|
"malware--9c4638ec-f1de-4ddb-abf4-1b760417654e",
|
|
|
|
"malware--9c4638ec-f1de-4ddb-abf4-1b760417654e",
|
|
|
|
"relationship--06520621-5352-4e6a-b976-e8fa3d437ffd",
|
|
|
|
"relationship--181c9c09-43e6-45dd-9374-3bec192f05ef",
|
|
|
|
"relationship--a0cbb21c-8daf-4a7f-96aa-7155a4ef8f70",
|
|
|
|
]
|
|
|
|
|
|
|
|
score = stix2.equivalence.object.list_reference_check(
|
|
|
|
object_refs1,
|
|
|
|
object_refs2,
|
|
|
|
ds,
|
|
|
|
ds2,
|
|
|
|
**weights,
|
|
|
|
)
|
|
|
|
assert round(score) == 1
|
|
|
|
|
|
|
|
|
|
|
|
def test_graph_equivalence_with_filesystem_source(ds):
|
|
|
|
weights = {
|
|
|
|
"_internal": {
|
|
|
|
"ignore_spec_version": True,
|
|
|
|
"versioning_checks": False,
|
|
|
|
"max_depth": 1,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
prop_scores = {}
|
|
|
|
fs = stix2.FileSystemSource(FS_PATH)
|
|
|
|
env = stix2.Environment().graphically_equivalent(fs, ds, prop_scores, **weights)
|
|
|
|
assert round(env) == 28
|
|
|
|
assert round(prop_scores["matching_score"]) == 139
|
|
|
|
assert round(prop_scores["sum_weights"]) == 500
|
|
|
|
|
|
|
|
|
|
|
|
def test_graph_equivalence_with_duplicate_graph(ds):
|
|
|
|
weights = {
|
|
|
|
"_internal": {
|
|
|
|
"ignore_spec_version": False,
|
|
|
|
"versioning_checks": False,
|
|
|
|
"max_depth": 1,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
prop_scores = {}
|
|
|
|
env = stix2.Environment().graphically_equivalent(ds, ds, prop_scores, **weights)
|
|
|
|
assert round(env) == 100
|
|
|
|
assert round(prop_scores["matching_score"]) == 800
|
|
|
|
assert round(prop_scores["sum_weights"]) == 800
|
|
|
|
|
|
|
|
|
|
|
|
def test_graph_equivalence_with_versioning_check_on(ds2, ds):
|
|
|
|
weights = {
|
|
|
|
"_internal": {
|
|
|
|
"ignore_spec_version": False,
|
|
|
|
"versioning_checks": True,
|
|
|
|
"max_depth": 1,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
prop_scores = {}
|
|
|
|
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
|
|
|
|
assert round(env) == 93
|
|
|
|
assert round(prop_scores["matching_score"]) == 745
|
|
|
|
assert round(prop_scores["sum_weights"]) == 800
|
|
|
|
|
|
|
|
|
|
|
|
def test_graph_equivalence_with_versioning_check_off(ds2, ds):
|
|
|
|
weights = {
|
|
|
|
"_internal": {
|
|
|
|
"ignore_spec_version": False,
|
|
|
|
"versioning_checks": False,
|
|
|
|
"max_depth": 1,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
prop_scores = {}
|
|
|
|
env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
|
|
|
|
assert round(env) == 93
|
|
|
|
assert round(prop_scores["matching_score"]) == 745
|
|
|
|
assert round(prop_scores["sum_weights"]) == 800
|