409 lines
12 KiB
Python
409 lines
12 KiB
Python
import importlib
|
|
import os
|
|
|
|
import stix2
|
|
from stix2.workbench import (
|
|
_STIX_VID, AttackPattern, Bundle, Campaign, CourseOfAction,
|
|
ExternalReference, File, FileSystemSource, Filter, Grouping, Identity,
|
|
Indicator, Infrastructure, IntrusionSet, Location, Malware,
|
|
MalwareAnalysis, MarkingDefinition, Note, NTFSExt, ObservedData, Opinion,
|
|
Relationship, Report, StatementMarking, ThreatActor, Tool, Vulnerability,
|
|
add_data_source, all_versions, attack_patterns, campaigns,
|
|
courses_of_action, create, get, groupings, identities, indicators,
|
|
infrastructures, intrusion_sets, locations, malware, malware_analyses,
|
|
notes, observed_data, opinions, query, reports, save, set_default_created,
|
|
set_default_creator, set_default_external_refs,
|
|
set_default_object_marking_refs, threat_actors, tools, vulnerabilities,
|
|
)
|
|
|
|
# Auto-detect some settings based on the current default STIX version
|
|
_STIX_DATA_PATH = os.path.join(
|
|
os.path.dirname(os.path.realpath(__file__)),
|
|
_STIX_VID,
|
|
"stix2_data",
|
|
)
|
|
_STIX_CONSTANTS_MODULE = "stix2.test." + _STIX_VID + ".constants"
|
|
|
|
|
|
constants = importlib.import_module(_STIX_CONSTANTS_MODULE)
|
|
|
|
|
|
def test_workbench_environment():
|
|
|
|
# Create a STIX object
|
|
ind = create(
|
|
Indicator, id=constants.INDICATOR_ID, **constants.INDICATOR_KWARGS
|
|
)
|
|
save(ind)
|
|
|
|
resp = get(constants.INDICATOR_ID)
|
|
assert resp['indicator_types'][0] == 'malicious-activity'
|
|
|
|
resp = all_versions(constants.INDICATOR_ID)
|
|
assert len(resp) == 1
|
|
|
|
# Search on something other than id
|
|
q = [Filter('type', '=', 'vulnerability')]
|
|
resp = query(q)
|
|
assert len(resp) == 0
|
|
|
|
|
|
def test_workbench_get_all_attack_patterns():
|
|
mal = AttackPattern(
|
|
id=constants.ATTACK_PATTERN_ID, **constants.ATTACK_PATTERN_KWARGS
|
|
)
|
|
save(mal)
|
|
|
|
resp = attack_patterns()
|
|
assert len(resp) == 1
|
|
assert resp[0].id == constants.ATTACK_PATTERN_ID
|
|
|
|
|
|
def test_workbench_get_all_campaigns():
|
|
cam = Campaign(id=constants.CAMPAIGN_ID, **constants.CAMPAIGN_KWARGS)
|
|
save(cam)
|
|
|
|
resp = campaigns()
|
|
assert len(resp) == 1
|
|
assert resp[0].id == constants.CAMPAIGN_ID
|
|
|
|
|
|
def test_workbench_get_all_courses_of_action():
|
|
coa = CourseOfAction(
|
|
id=constants.COURSE_OF_ACTION_ID, **constants.COURSE_OF_ACTION_KWARGS
|
|
)
|
|
save(coa)
|
|
|
|
resp = courses_of_action()
|
|
assert len(resp) == 1
|
|
assert resp[0].id == constants.COURSE_OF_ACTION_ID
|
|
|
|
|
|
def test_workbench_get_all_groupings():
|
|
grup = Grouping(id=constants.GROUPING_ID, **constants.GROUPING_KWARGS)
|
|
save(grup)
|
|
|
|
resp = groupings()
|
|
assert len(resp) == 1
|
|
assert resp[0].id == constants.GROUPING_ID
|
|
|
|
|
|
def test_workbench_get_all_identities():
|
|
idty = Identity(id=constants.IDENTITY_ID, **constants.IDENTITY_KWARGS)
|
|
save(idty)
|
|
|
|
resp = identities()
|
|
assert len(resp) == 1
|
|
assert resp[0].id == constants.IDENTITY_ID
|
|
|
|
|
|
def test_workbench_get_all_indicators():
|
|
resp = indicators()
|
|
assert len(resp) == 1
|
|
assert resp[0].id == constants.INDICATOR_ID
|
|
|
|
|
|
def test_workbench_get_all_infrastructures():
|
|
inf = Infrastructure(id=constants.INFRASTRUCTURE_ID, **constants.INFRASTRUCTURE_KWARGS)
|
|
save(inf)
|
|
|
|
resp = infrastructures()
|
|
assert len(resp) == 1
|
|
assert resp[0].id == constants.INFRASTRUCTURE_ID
|
|
|
|
|
|
def test_workbench_get_all_intrusion_sets():
|
|
ins = IntrusionSet(
|
|
id=constants.INTRUSION_SET_ID, **constants.INTRUSION_SET_KWARGS
|
|
)
|
|
save(ins)
|
|
|
|
resp = intrusion_sets()
|
|
assert len(resp) == 1
|
|
assert resp[0].id == constants.INTRUSION_SET_ID
|
|
|
|
|
|
def test_workbench_get_all_locations():
|
|
loc = Location(id=constants.LOCATION_ID, **constants.LOCATION_KWARGS)
|
|
save(loc)
|
|
|
|
resp = locations()
|
|
assert len(resp) == 1
|
|
assert resp[0].id == constants.LOCATION_ID
|
|
|
|
|
|
def test_workbench_get_all_malware():
|
|
mal = Malware(id=constants.MALWARE_ID, **constants.MALWARE_KWARGS)
|
|
save(mal)
|
|
|
|
resp = malware()
|
|
assert len(resp) == 1
|
|
assert resp[0].id == constants.MALWARE_ID
|
|
|
|
|
|
def test_workbench_get_all_malware_analyses():
|
|
mal = MalwareAnalysis(id=constants.MALWARE_ANALYSIS_ID, **constants.MALWARE_ANALYSIS_KWARGS)
|
|
save(mal)
|
|
|
|
resp = malware_analyses()
|
|
assert len(resp) == 1
|
|
assert resp[0].id == constants.MALWARE_ANALYSIS_ID
|
|
|
|
|
|
def test_workbench_get_all_notes():
|
|
note = Note(id=constants.NOTE_ID, **constants.NOTE_KWARGS)
|
|
save(note)
|
|
|
|
resp = notes()
|
|
assert len(resp) == 1
|
|
assert resp[0].id == constants.NOTE_ID
|
|
|
|
|
|
def test_workbench_get_all_observed_data():
|
|
od = ObservedData(
|
|
id=constants.OBSERVED_DATA_ID, **constants.OBSERVED_DATA_KWARGS
|
|
)
|
|
save(od)
|
|
|
|
resp = observed_data()
|
|
assert len(resp) == 1
|
|
assert resp[0].id == constants.OBSERVED_DATA_ID
|
|
|
|
|
|
def test_workbench_get_all_opinions():
|
|
op = Opinion(id=constants.OPINION_ID, **constants.OPINION_KWARGS)
|
|
save(op)
|
|
|
|
resp = opinions()
|
|
assert len(resp) == 1
|
|
assert resp[0].id == constants.OPINION_ID
|
|
|
|
|
|
def test_workbench_get_all_reports():
|
|
rep = Report(id=constants.REPORT_ID, **constants.REPORT_KWARGS)
|
|
save(rep)
|
|
|
|
resp = reports()
|
|
assert len(resp) == 1
|
|
assert resp[0].id == constants.REPORT_ID
|
|
|
|
|
|
def test_workbench_get_all_threat_actors():
|
|
thr = ThreatActor(
|
|
id=constants.THREAT_ACTOR_ID, **constants.THREAT_ACTOR_KWARGS
|
|
)
|
|
save(thr)
|
|
|
|
resp = threat_actors()
|
|
assert len(resp) == 1
|
|
assert resp[0].id == constants.THREAT_ACTOR_ID
|
|
|
|
|
|
def test_workbench_get_all_tools():
|
|
tool = Tool(id=constants.TOOL_ID, **constants.TOOL_KWARGS)
|
|
save(tool)
|
|
|
|
resp = tools()
|
|
assert len(resp) == 1
|
|
assert resp[0].id == constants.TOOL_ID
|
|
|
|
|
|
def test_workbench_get_all_vulnerabilities():
|
|
vuln = Vulnerability(
|
|
id=constants.VULNERABILITY_ID, **constants.VULNERABILITY_KWARGS
|
|
)
|
|
save(vuln)
|
|
|
|
resp = vulnerabilities()
|
|
assert len(resp) == 1
|
|
assert resp[0].id == constants.VULNERABILITY_ID
|
|
|
|
|
|
def test_workbench_add_to_bundle():
|
|
vuln = Vulnerability(**constants.VULNERABILITY_KWARGS)
|
|
bundle = Bundle(vuln)
|
|
assert bundle.objects[0].name == 'Heartbleed'
|
|
|
|
|
|
def test_workbench_relationships():
|
|
rel = Relationship(
|
|
constants.INDICATOR_ID, 'indicates', constants.MALWARE_ID,
|
|
)
|
|
save(rel)
|
|
|
|
ind = get(constants.INDICATOR_ID)
|
|
resp = ind.relationships()
|
|
assert len(resp) == 1
|
|
assert resp[0].relationship_type == 'indicates'
|
|
assert resp[0].source_ref == constants.INDICATOR_ID
|
|
assert resp[0].target_ref == constants.MALWARE_ID
|
|
|
|
|
|
def test_workbench_created_by():
|
|
intset = IntrusionSet(
|
|
name="Breach 123", created_by_ref=constants.IDENTITY_ID,
|
|
)
|
|
save(intset)
|
|
creator = intset.created_by()
|
|
assert creator.id == constants.IDENTITY_ID
|
|
|
|
|
|
def test_workbench_related():
|
|
rel1 = Relationship(constants.MALWARE_ID, 'targets', constants.IDENTITY_ID)
|
|
rel2 = Relationship(constants.CAMPAIGN_ID, 'uses', constants.MALWARE_ID)
|
|
save([rel1, rel2])
|
|
|
|
resp = get(constants.MALWARE_ID).related()
|
|
assert len(resp) == 3
|
|
assert any(x['id'] == constants.CAMPAIGN_ID for x in resp)
|
|
assert any(x['id'] == constants.INDICATOR_ID for x in resp)
|
|
assert any(x['id'] == constants.IDENTITY_ID for x in resp)
|
|
|
|
resp = get(constants.MALWARE_ID).related(relationship_type='indicates')
|
|
assert len(resp) == 1
|
|
|
|
|
|
def test_workbench_related_with_filters():
|
|
malware = Malware(
|
|
labels=["ransomware"], name="CryptorBit", created_by_ref=constants.IDENTITY_ID,
|
|
is_family=False,
|
|
)
|
|
rel = Relationship(malware.id, 'variant-of', constants.MALWARE_ID)
|
|
save([malware, rel])
|
|
|
|
filters = [Filter('created_by_ref', '=', constants.IDENTITY_ID)]
|
|
resp = get(constants.MALWARE_ID).related(filters=filters)
|
|
|
|
assert len(resp) == 1
|
|
assert resp[0].name == malware.name
|
|
assert resp[0].created_by_ref == constants.IDENTITY_ID
|
|
|
|
# filters arg can also be single filter
|
|
resp = get(constants.MALWARE_ID).related(filters=filters[0])
|
|
assert len(resp) == 1
|
|
|
|
|
|
def test_add_data_source():
|
|
fs = FileSystemSource(_STIX_DATA_PATH)
|
|
add_data_source(fs)
|
|
|
|
resp = tools()
|
|
assert len(resp) == 3
|
|
resp_ids = [tool.id for tool in resp]
|
|
assert constants.TOOL_ID in resp_ids
|
|
assert 'tool--03342581-f790-4f03-ba41-e82e67392e23' in resp_ids
|
|
assert 'tool--242f3da3-4425-4d11-8f5c-b842886da966' in resp_ids
|
|
|
|
|
|
def test_additional_filter():
|
|
resp = tools(Filter('created_by_ref', '=', 'identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5'))
|
|
assert len(resp) == 2
|
|
|
|
|
|
def test_additional_filters_list():
|
|
resp = tools([
|
|
Filter('created_by_ref', '=', 'identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5'),
|
|
Filter('name', '=', 'Windows Credential Editor'),
|
|
])
|
|
assert len(resp) == 1
|
|
|
|
|
|
def test_default_creator():
|
|
set_default_creator(constants.IDENTITY_ID)
|
|
campaign = Campaign(**constants.CAMPAIGN_KWARGS)
|
|
|
|
assert 'created_by_ref' not in constants.CAMPAIGN_KWARGS
|
|
assert campaign.created_by_ref == constants.IDENTITY_ID
|
|
|
|
# turn off side-effects to avoid affecting future tests
|
|
set_default_creator(None)
|
|
|
|
|
|
def test_default_created_timestamp():
|
|
timestamp = "2018-03-19T01:02:03.000Z"
|
|
set_default_created(timestamp)
|
|
campaign = Campaign(**constants.CAMPAIGN_KWARGS)
|
|
|
|
assert 'created' not in constants.CAMPAIGN_KWARGS
|
|
assert stix2.utils.format_datetime(campaign.created) == timestamp
|
|
assert stix2.utils.format_datetime(campaign.modified) == timestamp
|
|
|
|
# turn off side-effects to avoid affecting future tests
|
|
set_default_created(None)
|
|
|
|
|
|
def test_default_external_refs():
|
|
ext_ref = ExternalReference(
|
|
source_name="ACME Threat Intel",
|
|
description="Threat report",
|
|
)
|
|
set_default_external_refs(ext_ref)
|
|
campaign = Campaign(**constants.CAMPAIGN_KWARGS)
|
|
|
|
assert campaign.external_references[0].source_name == "ACME Threat Intel"
|
|
assert campaign.external_references[0].description == "Threat report"
|
|
|
|
# turn off side-effects to avoid affecting future tests
|
|
set_default_external_refs([])
|
|
|
|
|
|
def test_default_object_marking_refs():
|
|
stmt_marking = StatementMarking("Copyright 2016, Example Corp")
|
|
mark_def = MarkingDefinition(
|
|
definition_type="statement",
|
|
definition=stmt_marking,
|
|
)
|
|
set_default_object_marking_refs(mark_def)
|
|
campaign = Campaign(**constants.CAMPAIGN_KWARGS)
|
|
|
|
assert campaign.object_marking_refs[0] == mark_def.id
|
|
|
|
# turn off side-effects to avoid affecting future tests
|
|
set_default_object_marking_refs([])
|
|
|
|
|
|
def test_workbench_custom_property_object_in_observable_extension():
|
|
ntfs = NTFSExt(
|
|
allow_custom=True,
|
|
sid=1,
|
|
x_foo='bar',
|
|
)
|
|
artifact = File(
|
|
name='test',
|
|
extensions={'ntfs-ext': ntfs},
|
|
)
|
|
observed_data = ObservedData(
|
|
allow_custom=True,
|
|
first_observed="2015-12-21T19:00:00Z",
|
|
last_observed="2015-12-21T19:00:00Z",
|
|
number_observed=1,
|
|
objects={"0": artifact},
|
|
)
|
|
|
|
assert observed_data.objects['0'].extensions['ntfs-ext'].x_foo == "bar"
|
|
assert '"x_foo": "bar"' in str(observed_data)
|
|
|
|
|
|
def test_workbench_custom_property_dict_in_observable_extension():
|
|
artifact = File(
|
|
allow_custom=True,
|
|
name='test',
|
|
extensions={
|
|
'ntfs-ext': {
|
|
'allow_custom': True,
|
|
'sid': 1,
|
|
'x_foo': 'bar',
|
|
},
|
|
},
|
|
)
|
|
observed_data = ObservedData(
|
|
allow_custom=True,
|
|
first_observed="2015-12-21T19:00:00Z",
|
|
last_observed="2015-12-21T19:00:00Z",
|
|
number_observed=1,
|
|
objects={"0": artifact},
|
|
)
|
|
|
|
assert observed_data.objects['0'].extensions['ntfs-ext'].x_foo == "bar"
|
|
assert '"x_foo": "bar"' in str(observed_data)
|