cti-python-stix2/stix2/test/test_workbench.py

352 lines
10 KiB
Python

import importlib
import os
import stix2
from stix2.workbench import (
_STIX_VID, AttackPattern, Bundle, Campaign, CourseOfAction,
ExternalReference, File, FileSystemSource, Filter, Identity, Indicator,
IntrusionSet, Malware, MarkingDefinition, NTFSExt, ObservedData,
Relationship, Report, StatementMarking, ThreatActor, Tool, Vulnerability,
add_data_source, all_versions, attack_patterns, campaigns,
courses_of_action, create, get, identities, indicators, intrusion_sets,
malware, observed_data, query, reports, save, set_default_created,
set_default_creator, set_default_external_refs,
set_default_object_marking_refs, threat_actors, tools, vulnerabilities,
)
# Auto-detect some settings based on the current default STIX version
_STIX_DATA_PATH = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
_STIX_VID,
"stix2_data",
)
_STIX_CONSTANTS_MODULE = "stix2.test." + _STIX_VID + ".constants"
constants = importlib.import_module(_STIX_CONSTANTS_MODULE)
def test_workbench_environment():
# Create a STIX object
ind = create(
Indicator, id=constants.INDICATOR_ID, **constants.INDICATOR_KWARGS
)
save(ind)
resp = get(constants.INDICATOR_ID)
assert resp['labels'][0] == 'malicious-activity'
resp = all_versions(constants.INDICATOR_ID)
assert len(resp) == 1
# Search on something other than id
q = [Filter('type', '=', 'vulnerability')]
resp = query(q)
assert len(resp) == 0
def test_workbench_get_all_attack_patterns():
mal = AttackPattern(
id=constants.ATTACK_PATTERN_ID, **constants.ATTACK_PATTERN_KWARGS
)
save(mal)
resp = attack_patterns()
assert len(resp) == 1
assert resp[0].id == constants.ATTACK_PATTERN_ID
def test_workbench_get_all_campaigns():
cam = Campaign(id=constants.CAMPAIGN_ID, **constants.CAMPAIGN_KWARGS)
save(cam)
resp = campaigns()
assert len(resp) == 1
assert resp[0].id == constants.CAMPAIGN_ID
def test_workbench_get_all_courses_of_action():
coa = CourseOfAction(
id=constants.COURSE_OF_ACTION_ID, **constants.COURSE_OF_ACTION_KWARGS
)
save(coa)
resp = courses_of_action()
assert len(resp) == 1
assert resp[0].id == constants.COURSE_OF_ACTION_ID
def test_workbench_get_all_identities():
idty = Identity(id=constants.IDENTITY_ID, **constants.IDENTITY_KWARGS)
save(idty)
resp = identities()
assert len(resp) == 1
assert resp[0].id == constants.IDENTITY_ID
def test_workbench_get_all_indicators():
resp = indicators()
assert len(resp) == 1
assert resp[0].id == constants.INDICATOR_ID
def test_workbench_get_all_intrusion_sets():
ins = IntrusionSet(
id=constants.INTRUSION_SET_ID, **constants.INTRUSION_SET_KWARGS
)
save(ins)
resp = intrusion_sets()
assert len(resp) == 1
assert resp[0].id == constants.INTRUSION_SET_ID
def test_workbench_get_all_malware():
mal = Malware(id=constants.MALWARE_ID, **constants.MALWARE_KWARGS)
save(mal)
resp = malware()
assert len(resp) == 1
assert resp[0].id == constants.MALWARE_ID
def test_workbench_get_all_observed_data():
od = ObservedData(
id=constants.OBSERVED_DATA_ID, **constants.OBSERVED_DATA_KWARGS
)
save(od)
resp = observed_data()
assert len(resp) == 1
assert resp[0].id == constants.OBSERVED_DATA_ID
def test_workbench_get_all_reports():
rep = Report(id=constants.REPORT_ID, **constants.REPORT_KWARGS)
save(rep)
resp = reports()
assert len(resp) == 1
assert resp[0].id == constants.REPORT_ID
def test_workbench_get_all_threat_actors():
thr = ThreatActor(
id=constants.THREAT_ACTOR_ID, **constants.THREAT_ACTOR_KWARGS
)
save(thr)
resp = threat_actors()
assert len(resp) == 1
assert resp[0].id == constants.THREAT_ACTOR_ID
def test_workbench_get_all_tools():
tool = Tool(id=constants.TOOL_ID, **constants.TOOL_KWARGS)
save(tool)
resp = tools()
assert len(resp) == 1
assert resp[0].id == constants.TOOL_ID
def test_workbench_get_all_vulnerabilities():
vuln = Vulnerability(
id=constants.VULNERABILITY_ID, **constants.VULNERABILITY_KWARGS
)
save(vuln)
resp = vulnerabilities()
assert len(resp) == 1
assert resp[0].id == constants.VULNERABILITY_ID
def test_workbench_add_to_bundle():
vuln = Vulnerability(**constants.VULNERABILITY_KWARGS)
bundle = Bundle(vuln)
assert bundle.objects[0].name == 'Heartbleed'
def test_workbench_relationships():
rel = Relationship(
constants.INDICATOR_ID, 'indicates', constants.MALWARE_ID,
)
save(rel)
ind = get(constants.INDICATOR_ID)
resp = ind.relationships()
assert len(resp) == 1
assert resp[0].relationship_type == 'indicates'
assert resp[0].source_ref == constants.INDICATOR_ID
assert resp[0].target_ref == constants.MALWARE_ID
def test_workbench_created_by():
intset = IntrusionSet(
name="Breach 123", created_by_ref=constants.IDENTITY_ID,
)
save(intset)
creator = intset.created_by()
assert creator.id == constants.IDENTITY_ID
def test_workbench_related():
rel1 = Relationship(constants.MALWARE_ID, 'targets', constants.IDENTITY_ID)
rel2 = Relationship(constants.CAMPAIGN_ID, 'uses', constants.MALWARE_ID)
save([rel1, rel2])
resp = get(constants.MALWARE_ID).related()
assert len(resp) == 3
assert any(x['id'] == constants.CAMPAIGN_ID for x in resp)
assert any(x['id'] == constants.INDICATOR_ID for x in resp)
assert any(x['id'] == constants.IDENTITY_ID for x in resp)
resp = get(constants.MALWARE_ID).related(relationship_type='indicates')
assert len(resp) == 1
def test_workbench_related_with_filters():
malware = Malware(
labels=["ransomware"], name="CryptorBit", created_by_ref=constants.IDENTITY_ID,
)
rel = Relationship(malware.id, 'variant-of', constants.MALWARE_ID)
save([malware, rel])
filters = [Filter('created_by_ref', '=', constants.IDENTITY_ID)]
resp = get(constants.MALWARE_ID).related(filters=filters)
assert len(resp) == 1
assert resp[0].name == malware.name
assert resp[0].created_by_ref == constants.IDENTITY_ID
# filters arg can also be single filter
resp = get(constants.MALWARE_ID).related(filters=filters[0])
assert len(resp) == 1
def test_add_data_source():
fs = FileSystemSource(_STIX_DATA_PATH)
add_data_source(fs)
resp = tools()
assert len(resp) == 3
resp_ids = [tool.id for tool in resp]
assert constants.TOOL_ID in resp_ids
assert 'tool--03342581-f790-4f03-ba41-e82e67392e23' in resp_ids
assert 'tool--242f3da3-4425-4d11-8f5c-b842886da966' in resp_ids
def test_additional_filter():
resp = tools(Filter('created_by_ref', '=', 'identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5'))
assert len(resp) == 2
def test_additional_filters_list():
resp = tools([
Filter('created_by_ref', '=', 'identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5'),
Filter('name', '=', 'Windows Credential Editor'),
])
assert len(resp) == 1
def test_default_creator():
set_default_creator(constants.IDENTITY_ID)
campaign = Campaign(**constants.CAMPAIGN_KWARGS)
assert 'created_by_ref' not in constants.CAMPAIGN_KWARGS
assert campaign.created_by_ref == constants.IDENTITY_ID
# turn off side-effects to avoid affecting future tests
set_default_creator(None)
def test_default_created_timestamp():
timestamp = "2018-03-19T01:02:03.000Z"
set_default_created(timestamp)
campaign = Campaign(**constants.CAMPAIGN_KWARGS)
assert 'created' not in constants.CAMPAIGN_KWARGS
assert stix2.utils.format_datetime(campaign.created) == timestamp
assert stix2.utils.format_datetime(campaign.modified) == timestamp
# turn off side-effects to avoid affecting future tests
set_default_created(None)
def test_default_external_refs():
ext_ref = ExternalReference(
source_name="ACME Threat Intel",
description="Threat report",
)
set_default_external_refs(ext_ref)
campaign = Campaign(**constants.CAMPAIGN_KWARGS)
assert campaign.external_references[0].source_name == "ACME Threat Intel"
assert campaign.external_references[0].description == "Threat report"
# turn off side-effects to avoid affecting future tests
set_default_external_refs([])
def test_default_object_marking_refs():
stmt_marking = StatementMarking("Copyright 2016, Example Corp")
mark_def = MarkingDefinition(
definition_type="statement",
definition=stmt_marking,
)
set_default_object_marking_refs(mark_def)
campaign = Campaign(**constants.CAMPAIGN_KWARGS)
assert campaign.object_marking_refs[0] == mark_def.id
# turn off side-effects to avoid affecting future tests
set_default_object_marking_refs([])
def test_workbench_custom_property_object_in_observable_extension():
ntfs = NTFSExt(
allow_custom=True,
sid=1,
x_foo='bar',
)
artifact = File(
name='test',
extensions={'ntfs-ext': ntfs},
)
observed_data = ObservedData(
allow_custom=True,
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=1,
objects={"0": artifact},
)
assert observed_data.objects['0'].extensions['ntfs-ext'].x_foo == "bar"
assert '"x_foo": "bar"' in str(observed_data)
def test_workbench_custom_property_dict_in_observable_extension():
artifact = File(
allow_custom=True,
name='test',
extensions={
'ntfs-ext': {
'allow_custom': True,
'sid': 1,
'x_foo': 'bar',
},
},
)
observed_data = ObservedData(
allow_custom=True,
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=1,
objects={"0": artifact},
)
assert observed_data.objects['0'].extensions['ntfs-ext'].x_foo == "bar"
assert '"x_foo": "bar"' in str(observed_data)