import importlib import os import stix2 from stix2.workbench import ( _STIX_VID, AttackPattern, Bundle, Campaign, CourseOfAction, ExternalReference, File, FileSystemSource, Filter, Identity, Indicator, IntrusionSet, Malware, MarkingDefinition, NTFSExt, ObservedData, Relationship, Report, StatementMarking, ThreatActor, Tool, Vulnerability, add_data_source, all_versions, attack_patterns, campaigns, courses_of_action, create, get, identities, indicators, intrusion_sets, malware, observed_data, query, reports, save, set_default_created, set_default_creator, set_default_external_refs, set_default_object_marking_refs, threat_actors, tools, vulnerabilities, ) # Auto-detect some settings based on the current default STIX version _STIX_DATA_PATH = os.path.join( os.path.dirname(os.path.realpath(__file__)), _STIX_VID, "stix2_data", ) _STIX_CONSTANTS_MODULE = "stix2.test." + _STIX_VID + ".constants" constants = importlib.import_module(_STIX_CONSTANTS_MODULE) def test_workbench_environment(): # Create a STIX object ind = create( Indicator, id=constants.INDICATOR_ID, **constants.INDICATOR_KWARGS ) save(ind) resp = get(constants.INDICATOR_ID) assert resp['labels'][0] == 'malicious-activity' resp = all_versions(constants.INDICATOR_ID) assert len(resp) == 1 # Search on something other than id q = [Filter('type', '=', 'vulnerability')] resp = query(q) assert len(resp) == 0 def test_workbench_get_all_attack_patterns(): mal = AttackPattern( id=constants.ATTACK_PATTERN_ID, **constants.ATTACK_PATTERN_KWARGS ) save(mal) resp = attack_patterns() assert len(resp) == 1 assert resp[0].id == constants.ATTACK_PATTERN_ID def test_workbench_get_all_campaigns(): cam = Campaign(id=constants.CAMPAIGN_ID, **constants.CAMPAIGN_KWARGS) save(cam) resp = campaigns() assert len(resp) == 1 assert resp[0].id == constants.CAMPAIGN_ID def test_workbench_get_all_courses_of_action(): coa = CourseOfAction( id=constants.COURSE_OF_ACTION_ID, **constants.COURSE_OF_ACTION_KWARGS ) save(coa) resp = courses_of_action() assert len(resp) == 1 assert resp[0].id == constants.COURSE_OF_ACTION_ID def test_workbench_get_all_identities(): idty = Identity(id=constants.IDENTITY_ID, **constants.IDENTITY_KWARGS) save(idty) resp = identities() assert len(resp) == 1 assert resp[0].id == constants.IDENTITY_ID def test_workbench_get_all_indicators(): resp = indicators() assert len(resp) == 1 assert resp[0].id == constants.INDICATOR_ID def test_workbench_get_all_intrusion_sets(): ins = IntrusionSet( id=constants.INTRUSION_SET_ID, **constants.INTRUSION_SET_KWARGS ) save(ins) resp = intrusion_sets() assert len(resp) == 1 assert resp[0].id == constants.INTRUSION_SET_ID def test_workbench_get_all_malware(): mal = Malware(id=constants.MALWARE_ID, **constants.MALWARE_KWARGS) save(mal) resp = malware() assert len(resp) == 1 assert resp[0].id == constants.MALWARE_ID def test_workbench_get_all_observed_data(): od = ObservedData( id=constants.OBSERVED_DATA_ID, **constants.OBSERVED_DATA_KWARGS ) save(od) resp = observed_data() assert len(resp) == 1 assert resp[0].id == constants.OBSERVED_DATA_ID def test_workbench_get_all_reports(): rep = Report(id=constants.REPORT_ID, **constants.REPORT_KWARGS) save(rep) resp = reports() assert len(resp) == 1 assert resp[0].id == constants.REPORT_ID def test_workbench_get_all_threat_actors(): thr = ThreatActor( id=constants.THREAT_ACTOR_ID, **constants.THREAT_ACTOR_KWARGS ) save(thr) resp = threat_actors() assert len(resp) == 1 assert resp[0].id == constants.THREAT_ACTOR_ID def test_workbench_get_all_tools(): tool = Tool(id=constants.TOOL_ID, **constants.TOOL_KWARGS) save(tool) resp = tools() assert len(resp) == 1 assert resp[0].id == constants.TOOL_ID def test_workbench_get_all_vulnerabilities(): vuln = Vulnerability( id=constants.VULNERABILITY_ID, **constants.VULNERABILITY_KWARGS ) save(vuln) resp = vulnerabilities() assert len(resp) == 1 assert resp[0].id == constants.VULNERABILITY_ID def test_workbench_add_to_bundle(): vuln = Vulnerability(**constants.VULNERABILITY_KWARGS) bundle = Bundle(vuln) assert bundle.objects[0].name == 'Heartbleed' def test_workbench_relationships(): rel = Relationship( constants.INDICATOR_ID, 'indicates', constants.MALWARE_ID, ) save(rel) ind = get(constants.INDICATOR_ID) resp = ind.relationships() assert len(resp) == 1 assert resp[0].relationship_type == 'indicates' assert resp[0].source_ref == constants.INDICATOR_ID assert resp[0].target_ref == constants.MALWARE_ID def test_workbench_created_by(): intset = IntrusionSet( name="Breach 123", created_by_ref=constants.IDENTITY_ID, ) save(intset) creator = intset.created_by() assert creator.id == constants.IDENTITY_ID def test_workbench_related(): rel1 = Relationship(constants.MALWARE_ID, 'targets', constants.IDENTITY_ID) rel2 = Relationship(constants.CAMPAIGN_ID, 'uses', constants.MALWARE_ID) save([rel1, rel2]) resp = get(constants.MALWARE_ID).related() assert len(resp) == 3 assert any(x['id'] == constants.CAMPAIGN_ID for x in resp) assert any(x['id'] == constants.INDICATOR_ID for x in resp) assert any(x['id'] == constants.IDENTITY_ID for x in resp) resp = get(constants.MALWARE_ID).related(relationship_type='indicates') assert len(resp) == 1 def test_workbench_related_with_filters(): malware = Malware( labels=["ransomware"], name="CryptorBit", created_by_ref=constants.IDENTITY_ID, ) rel = Relationship(malware.id, 'variant-of', constants.MALWARE_ID) save([malware, rel]) filters = [Filter('created_by_ref', '=', constants.IDENTITY_ID)] resp = get(constants.MALWARE_ID).related(filters=filters) assert len(resp) == 1 assert resp[0].name == malware.name assert resp[0].created_by_ref == constants.IDENTITY_ID # filters arg can also be single filter resp = get(constants.MALWARE_ID).related(filters=filters[0]) assert len(resp) == 1 def test_add_data_source(): fs = FileSystemSource(_STIX_DATA_PATH) add_data_source(fs) resp = tools() assert len(resp) == 3 resp_ids = [tool.id for tool in resp] assert constants.TOOL_ID in resp_ids assert 'tool--03342581-f790-4f03-ba41-e82e67392e23' in resp_ids assert 'tool--242f3da3-4425-4d11-8f5c-b842886da966' in resp_ids def test_additional_filter(): resp = tools(Filter('created_by_ref', '=', 'identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5')) assert len(resp) == 2 def test_additional_filters_list(): resp = tools([ Filter('created_by_ref', '=', 'identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5'), Filter('name', '=', 'Windows Credential Editor'), ]) assert len(resp) == 1 def test_default_creator(): set_default_creator(constants.IDENTITY_ID) campaign = Campaign(**constants.CAMPAIGN_KWARGS) assert 'created_by_ref' not in constants.CAMPAIGN_KWARGS assert campaign.created_by_ref == constants.IDENTITY_ID # turn off side-effects to avoid affecting future tests set_default_creator(None) def test_default_created_timestamp(): timestamp = "2018-03-19T01:02:03.000Z" set_default_created(timestamp) campaign = Campaign(**constants.CAMPAIGN_KWARGS) assert 'created' not in constants.CAMPAIGN_KWARGS assert stix2.utils.format_datetime(campaign.created) == timestamp assert stix2.utils.format_datetime(campaign.modified) == timestamp # turn off side-effects to avoid affecting future tests set_default_created(None) def test_default_external_refs(): ext_ref = ExternalReference( source_name="ACME Threat Intel", description="Threat report", ) set_default_external_refs(ext_ref) campaign = Campaign(**constants.CAMPAIGN_KWARGS) assert campaign.external_references[0].source_name == "ACME Threat Intel" assert campaign.external_references[0].description == "Threat report" # turn off side-effects to avoid affecting future tests set_default_external_refs([]) def test_default_object_marking_refs(): stmt_marking = StatementMarking("Copyright 2016, Example Corp") mark_def = MarkingDefinition( definition_type="statement", definition=stmt_marking, ) set_default_object_marking_refs(mark_def) campaign = Campaign(**constants.CAMPAIGN_KWARGS) assert campaign.object_marking_refs[0] == mark_def.id # turn off side-effects to avoid affecting future tests set_default_object_marking_refs([]) def test_workbench_custom_property_object_in_observable_extension(): ntfs = NTFSExt( allow_custom=True, sid=1, x_foo='bar', ) artifact = File( name='test', extensions={'ntfs-ext': ntfs}, ) observed_data = ObservedData( allow_custom=True, first_observed="2015-12-21T19:00:00Z", last_observed="2015-12-21T19:00:00Z", number_observed=1, objects={"0": artifact}, ) assert observed_data.objects['0'].extensions['ntfs-ext'].x_foo == "bar" assert '"x_foo": "bar"' in str(observed_data) def test_workbench_custom_property_dict_in_observable_extension(): artifact = File( allow_custom=True, name='test', extensions={ 'ntfs-ext': { 'allow_custom': True, 'sid': 1, 'x_foo': 'bar', }, }, ) observed_data = ObservedData( allow_custom=True, first_observed="2015-12-21T19:00:00Z", last_observed="2015-12-21T19:00:00Z", number_observed=1, objects={"0": artifact}, ) assert observed_data.objects['0'].extensions['ntfs-ext'].x_foo == "bar" assert '"x_foo": "bar"' in str(observed_data)