Moved test/v20/test_workbench.py up one directory level since

it doesn't make sense to have a test per STIX version.  The
workbench only uses the latest supported STIX version.  In
order to make this work, the test suite was modified to
dynamically compute some settings like where to get demo data,
based on the value of stix2.DEFAULT_VERSION.

Switched stix2.DEFAULT_VERSION back to "2.0", since I figure it
should be sync'd up with the 'from .vxx import *' import
statement from the top level package.
master
Michael Chisholm 2019-07-24 15:35:59 -04:00
parent d69449706f
commit 38103ac6c5
2 changed files with 89 additions and 65 deletions

View File

@ -20,7 +20,7 @@
# flake8: noqa # flake8: noqa
DEFAULT_VERSION = '2.1' # Default version will always be the latest STIX 2.X version DEFAULT_VERSION = '2.0' # Default version will always be the latest STIX 2.X version
from .confidence import scales from .confidence import scales
from .core import _collect_stix2_mappings, parse, parse_observable from .core import _collect_stix2_mappings, parse, parse_observable

View File

@ -1,3 +1,4 @@
import importlib
import os import os
import stix2 import stix2
@ -12,26 +13,32 @@ from stix2.workbench import (
set_default_object_marking_refs, threat_actors, tools, vulnerabilities, set_default_object_marking_refs, threat_actors, tools, vulnerabilities,
) )
from .constants import (
ATTACK_PATTERN_ID, ATTACK_PATTERN_KWARGS, CAMPAIGN_ID, CAMPAIGN_KWARGS, # Auto-detect some settings based on the current default STIX version
COURSE_OF_ACTION_ID, COURSE_OF_ACTION_KWARGS, IDENTITY_ID, IDENTITY_KWARGS, _STIX_VID = "v" + stix2.DEFAULT_VERSION.replace(".", "")
INDICATOR_ID, INDICATOR_KWARGS, INTRUSION_SET_ID, INTRUSION_SET_KWARGS, _STIX_DATA_PATH = os.path.join(
MALWARE_ID, MALWARE_KWARGS, OBSERVED_DATA_ID, OBSERVED_DATA_KWARGS, os.path.dirname(os.path.realpath(__file__)),
REPORT_ID, REPORT_KWARGS, THREAT_ACTOR_ID, THREAT_ACTOR_KWARGS, TOOL_ID, _STIX_VID,
TOOL_KWARGS, VULNERABILITY_ID, VULNERABILITY_KWARGS, "stix2_data"
) )
_STIX_CONSTANTS_MODULE = "stix2.test." + _STIX_VID + ".constants"
constants = importlib.import_module(_STIX_CONSTANTS_MODULE)
def test_workbench_environment(): def test_workbench_environment():
# Create a STIX object # Create a STIX object
ind = create(Indicator, id=INDICATOR_ID, **INDICATOR_KWARGS) ind = create(
Indicator, id=constants.INDICATOR_ID, **constants.INDICATOR_KWARGS
)
save(ind) save(ind)
resp = get(INDICATOR_ID) resp = get(constants.INDICATOR_ID)
assert resp['labels'][0] == 'malicious-activity' assert resp['labels'][0] == 'malicious-activity'
resp = all_versions(INDICATOR_ID) resp = all_versions(constants.INDICATOR_ID)
assert len(resp) == 1 assert len(resp) == 1
# Search on something other than id # Search on something other than id
@ -41,176 +48,193 @@ def test_workbench_environment():
def test_workbench_get_all_attack_patterns(): def test_workbench_get_all_attack_patterns():
mal = AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS) mal = AttackPattern(
id=constants.ATTACK_PATTERN_ID, **constants.ATTACK_PATTERN_KWARGS
)
save(mal) save(mal)
resp = attack_patterns() resp = attack_patterns()
assert len(resp) == 1 assert len(resp) == 1
assert resp[0].id == ATTACK_PATTERN_ID assert resp[0].id == constants.ATTACK_PATTERN_ID
def test_workbench_get_all_campaigns(): def test_workbench_get_all_campaigns():
cam = Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS) cam = Campaign(id=constants.CAMPAIGN_ID, **constants.CAMPAIGN_KWARGS)
save(cam) save(cam)
resp = campaigns() resp = campaigns()
assert len(resp) == 1 assert len(resp) == 1
assert resp[0].id == CAMPAIGN_ID assert resp[0].id == constants.CAMPAIGN_ID
def test_workbench_get_all_courses_of_action(): def test_workbench_get_all_courses_of_action():
coa = CourseOfAction(id=COURSE_OF_ACTION_ID, **COURSE_OF_ACTION_KWARGS) coa = CourseOfAction(
id=constants.COURSE_OF_ACTION_ID, **constants.COURSE_OF_ACTION_KWARGS
)
save(coa) save(coa)
resp = courses_of_action() resp = courses_of_action()
assert len(resp) == 1 assert len(resp) == 1
assert resp[0].id == COURSE_OF_ACTION_ID assert resp[0].id == constants.COURSE_OF_ACTION_ID
def test_workbench_get_all_identities(): def test_workbench_get_all_identities():
idty = Identity(id=IDENTITY_ID, **IDENTITY_KWARGS) idty = Identity(id=constants.IDENTITY_ID, **constants.IDENTITY_KWARGS)
save(idty) save(idty)
resp = identities() resp = identities()
assert len(resp) == 1 assert len(resp) == 1
assert resp[0].id == IDENTITY_ID assert resp[0].id == constants.IDENTITY_ID
def test_workbench_get_all_indicators(): def test_workbench_get_all_indicators():
resp = indicators() resp = indicators()
assert len(resp) == 1 assert len(resp) == 1
assert resp[0].id == INDICATOR_ID assert resp[0].id == constants.INDICATOR_ID
def test_workbench_get_all_intrusion_sets(): def test_workbench_get_all_intrusion_sets():
ins = IntrusionSet(id=INTRUSION_SET_ID, **INTRUSION_SET_KWARGS) ins = IntrusionSet(
id=constants.INTRUSION_SET_ID, **constants.INTRUSION_SET_KWARGS
)
save(ins) save(ins)
resp = intrusion_sets() resp = intrusion_sets()
assert len(resp) == 1 assert len(resp) == 1
assert resp[0].id == INTRUSION_SET_ID assert resp[0].id == constants.INTRUSION_SET_ID
def test_workbench_get_all_malware(): def test_workbench_get_all_malware():
mal = Malware(id=MALWARE_ID, **MALWARE_KWARGS) mal = Malware(id=constants.MALWARE_ID, **constants.MALWARE_KWARGS)
save(mal) save(mal)
resp = malware() resp = malware()
assert len(resp) == 1 assert len(resp) == 1
assert resp[0].id == MALWARE_ID assert resp[0].id == constants.MALWARE_ID
def test_workbench_get_all_observed_data(): def test_workbench_get_all_observed_data():
od = ObservedData(id=OBSERVED_DATA_ID, **OBSERVED_DATA_KWARGS) od = ObservedData(
id=constants.OBSERVED_DATA_ID, **constants.OBSERVED_DATA_KWARGS
)
save(od) save(od)
resp = observed_data() resp = observed_data()
assert len(resp) == 1 assert len(resp) == 1
assert resp[0].id == OBSERVED_DATA_ID assert resp[0].id == constants.OBSERVED_DATA_ID
def test_workbench_get_all_reports(): def test_workbench_get_all_reports():
rep = Report(id=REPORT_ID, **REPORT_KWARGS) rep = Report(id=constants.REPORT_ID, **constants.REPORT_KWARGS)
save(rep) save(rep)
resp = reports() resp = reports()
assert len(resp) == 1 assert len(resp) == 1
assert resp[0].id == REPORT_ID assert resp[0].id == constants.REPORT_ID
def test_workbench_get_all_threat_actors(): def test_workbench_get_all_threat_actors():
thr = ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS) thr = ThreatActor(
id=constants.THREAT_ACTOR_ID, **constants.THREAT_ACTOR_KWARGS
)
save(thr) save(thr)
resp = threat_actors() resp = threat_actors()
assert len(resp) == 1 assert len(resp) == 1
assert resp[0].id == THREAT_ACTOR_ID assert resp[0].id == constants.THREAT_ACTOR_ID
def test_workbench_get_all_tools(): def test_workbench_get_all_tools():
tool = Tool(id=TOOL_ID, **TOOL_KWARGS) tool = Tool(id=constants.TOOL_ID, **constants.TOOL_KWARGS)
save(tool) save(tool)
resp = tools() resp = tools()
assert len(resp) == 1 assert len(resp) == 1
assert resp[0].id == TOOL_ID assert resp[0].id == constants.TOOL_ID
def test_workbench_get_all_vulnerabilities(): def test_workbench_get_all_vulnerabilities():
vuln = Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS) vuln = Vulnerability(
id=constants.VULNERABILITY_ID, **constants.VULNERABILITY_KWARGS
)
save(vuln) save(vuln)
resp = vulnerabilities() resp = vulnerabilities()
assert len(resp) == 1 assert len(resp) == 1
assert resp[0].id == VULNERABILITY_ID assert resp[0].id == constants.VULNERABILITY_ID
def test_workbench_add_to_bundle(): def test_workbench_add_to_bundle():
vuln = Vulnerability(**VULNERABILITY_KWARGS) vuln = Vulnerability(**constants.VULNERABILITY_KWARGS)
bundle = stix2.v20.Bundle(vuln) bundle = stix2.v20.Bundle(vuln)
assert bundle.objects[0].name == 'Heartbleed' assert bundle.objects[0].name == 'Heartbleed'
def test_workbench_relationships(): def test_workbench_relationships():
rel = Relationship(INDICATOR_ID, 'indicates', MALWARE_ID) rel = Relationship(
constants.INDICATOR_ID, 'indicates', constants.MALWARE_ID
)
save(rel) save(rel)
ind = get(INDICATOR_ID) ind = get(constants.INDICATOR_ID)
resp = ind.relationships() resp = ind.relationships()
assert len(resp) == 1 assert len(resp) == 1
assert resp[0].relationship_type == 'indicates' assert resp[0].relationship_type == 'indicates'
assert resp[0].source_ref == INDICATOR_ID assert resp[0].source_ref == constants.INDICATOR_ID
assert resp[0].target_ref == MALWARE_ID assert resp[0].target_ref == constants.MALWARE_ID
def test_workbench_created_by(): def test_workbench_created_by():
intset = IntrusionSet(name="Breach 123", created_by_ref=IDENTITY_ID) intset = IntrusionSet(
name="Breach 123", created_by_ref=constants.IDENTITY_ID
)
save(intset) save(intset)
creator = intset.created_by() creator = intset.created_by()
assert creator.id == IDENTITY_ID assert creator.id == constants.IDENTITY_ID
def test_workbench_related(): def test_workbench_related():
rel1 = Relationship(MALWARE_ID, 'targets', IDENTITY_ID) rel1 = Relationship(constants.MALWARE_ID, 'targets', constants.IDENTITY_ID)
rel2 = Relationship(CAMPAIGN_ID, 'uses', MALWARE_ID) rel2 = Relationship(constants.CAMPAIGN_ID, 'uses', constants.MALWARE_ID)
save([rel1, rel2]) save([rel1, rel2])
resp = get(MALWARE_ID).related() resp = get(constants.MALWARE_ID).related()
assert len(resp) == 3 assert len(resp) == 3
assert any(x['id'] == CAMPAIGN_ID for x in resp) assert any(x['id'] == constants.CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp) assert any(x['id'] == constants.INDICATOR_ID for x in resp)
assert any(x['id'] == IDENTITY_ID for x in resp) assert any(x['id'] == constants.IDENTITY_ID for x in resp)
resp = get(MALWARE_ID).related(relationship_type='indicates') resp = get(constants.MALWARE_ID).related(relationship_type='indicates')
assert len(resp) == 1 assert len(resp) == 1
def test_workbench_related_with_filters(): def test_workbench_related_with_filters():
malware = Malware(labels=["ransomware"], name="CryptorBit", created_by_ref=IDENTITY_ID) malware = Malware(
rel = Relationship(malware.id, 'variant-of', MALWARE_ID) labels=["ransomware"], name="CryptorBit", created_by_ref=constants.IDENTITY_ID
)
rel = Relationship(malware.id, 'variant-of', constants.MALWARE_ID)
save([malware, rel]) save([malware, rel])
filters = [Filter('created_by_ref', '=', IDENTITY_ID)] filters = [Filter('created_by_ref', '=', constants.IDENTITY_ID)]
resp = get(MALWARE_ID).related(filters=filters) resp = get(constants.MALWARE_ID).related(filters=filters)
assert len(resp) == 1 assert len(resp) == 1
assert resp[0].name == malware.name assert resp[0].name == malware.name
assert resp[0].created_by_ref == IDENTITY_ID assert resp[0].created_by_ref == constants.IDENTITY_ID
# filters arg can also be single filter # filters arg can also be single filter
resp = get(MALWARE_ID).related(filters=filters[0]) resp = get(constants.MALWARE_ID).related(filters=filters[0])
assert len(resp) == 1 assert len(resp) == 1
def test_add_data_source(): def test_add_data_source():
fs_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data") fs = FileSystemSource(_STIX_DATA_PATH)
fs = FileSystemSource(fs_path)
add_data_source(fs) add_data_source(fs)
resp = tools() resp = tools()
assert len(resp) == 3 assert len(resp) == 3
resp_ids = [tool.id for tool in resp] resp_ids = [tool.id for tool in resp]
assert TOOL_ID in resp_ids assert constants.TOOL_ID in resp_ids
assert 'tool--03342581-f790-4f03-ba41-e82e67392e23' in resp_ids assert 'tool--03342581-f790-4f03-ba41-e82e67392e23' in resp_ids
assert 'tool--242f3da3-4425-4d11-8f5c-b842886da966' in resp_ids assert 'tool--242f3da3-4425-4d11-8f5c-b842886da966' in resp_ids
@ -229,19 +253,19 @@ def test_additional_filters_list():
def test_default_creator(): def test_default_creator():
set_default_creator(IDENTITY_ID) set_default_creator(constants.IDENTITY_ID)
campaign = Campaign(**CAMPAIGN_KWARGS) campaign = Campaign(**constants.CAMPAIGN_KWARGS)
assert 'created_by_ref' not in CAMPAIGN_KWARGS assert 'created_by_ref' not in constants.CAMPAIGN_KWARGS
assert campaign.created_by_ref == IDENTITY_ID assert campaign.created_by_ref == constants.IDENTITY_ID
def test_default_created_timestamp(): def test_default_created_timestamp():
timestamp = "2018-03-19T01:02:03.000Z" timestamp = "2018-03-19T01:02:03.000Z"
set_default_created(timestamp) set_default_created(timestamp)
campaign = Campaign(**CAMPAIGN_KWARGS) campaign = Campaign(**constants.CAMPAIGN_KWARGS)
assert 'created' not in CAMPAIGN_KWARGS assert 'created' not in constants.CAMPAIGN_KWARGS
assert stix2.utils.format_datetime(campaign.created) == timestamp assert stix2.utils.format_datetime(campaign.created) == timestamp
assert stix2.utils.format_datetime(campaign.modified) == timestamp assert stix2.utils.format_datetime(campaign.modified) == timestamp
@ -252,7 +276,7 @@ def test_default_external_refs():
description="Threat report", description="Threat report",
) )
set_default_external_refs(ext_ref) set_default_external_refs(ext_ref)
campaign = Campaign(**CAMPAIGN_KWARGS) campaign = Campaign(**constants.CAMPAIGN_KWARGS)
assert campaign.external_references[0].source_name == "ACME Threat Intel" assert campaign.external_references[0].source_name == "ACME Threat Intel"
assert campaign.external_references[0].description == "Threat report" assert campaign.external_references[0].description == "Threat report"
@ -265,7 +289,7 @@ def test_default_object_marking_refs():
definition=stmt_marking, definition=stmt_marking,
) )
set_default_object_marking_refs(mark_def) set_default_object_marking_refs(mark_def)
campaign = Campaign(**CAMPAIGN_KWARGS) campaign = Campaign(**constants.CAMPAIGN_KWARGS)
assert campaign.object_marking_refs[0] == mark_def.id assert campaign.object_marking_refs[0] == mark_def.id