From 3ef63d5e175481ddf9860f759f2f48148ed3ea09 Mon Sep 17 00:00:00 2001 From: Chris Lenk Date: Thu, 18 Jun 2020 11:37:52 -0400 Subject: [PATCH] Update Workbench for 2.1 --- stix2/test/test_workbench.py | 67 +++++++++++++++++++++++-- stix2/test/v21/constants.py | 20 ++++++++ stix2/workbench.py | 94 ++++++++++++++++++++++++++++++++++-- 3 files changed, 172 insertions(+), 9 deletions(-) diff --git a/stix2/test/test_workbench.py b/stix2/test/test_workbench.py index d946547..433bf81 100644 --- a/stix2/test/test_workbench.py +++ b/stix2/test/test_workbench.py @@ -4,12 +4,14 @@ import os import stix2 from stix2.workbench import ( _STIX_VID, AttackPattern, Bundle, Campaign, CourseOfAction, - ExternalReference, File, FileSystemSource, Filter, Identity, Indicator, - IntrusionSet, Malware, MarkingDefinition, NTFSExt, ObservedData, + ExternalReference, File, FileSystemSource, Filter, Grouping, Identity, + Indicator, Infrastructure, IntrusionSet, Location, Malware, + MalwareAnalysis, MarkingDefinition, Note, NTFSExt, ObservedData, Opinion, Relationship, Report, StatementMarking, ThreatActor, Tool, Vulnerability, add_data_source, all_versions, attack_patterns, campaigns, - courses_of_action, create, get, identities, indicators, intrusion_sets, - malware, observed_data, query, reports, save, set_default_created, + courses_of_action, create, get, groupings, identities, indicators, + infrastructures, intrusion_sets, locations, malware, malware_analyses, + notes, observed_data, opinions, query, reports, save, set_default_created, set_default_creator, set_default_external_refs, set_default_object_marking_refs, threat_actors, tools, vulnerabilities, ) @@ -35,7 +37,7 @@ def test_workbench_environment(): save(ind) resp = get(constants.INDICATOR_ID) - assert resp['labels'][0] == 'malicious-activity' + assert resp['indicator_types'][0] == 'malicious-activity' resp = all_versions(constants.INDICATOR_ID) assert len(resp) == 1 @@ -77,6 +79,15 @@ def test_workbench_get_all_courses_of_action(): assert resp[0].id == constants.COURSE_OF_ACTION_ID +def test_workbench_get_all_groupings(): + grup = Grouping(id=constants.GROUPING_ID, **constants.GROUPING_KWARGS) + save(grup) + + resp = groupings() + assert len(resp) == 1 + assert resp[0].id == constants.GROUPING_ID + + def test_workbench_get_all_identities(): idty = Identity(id=constants.IDENTITY_ID, **constants.IDENTITY_KWARGS) save(idty) @@ -92,6 +103,15 @@ def test_workbench_get_all_indicators(): assert resp[0].id == constants.INDICATOR_ID +def test_workbench_get_all_infrastructures(): + inf = Infrastructure(id=constants.INFRASTRUCTURE_ID, **constants.INFRASTRUCTURE_KWARGS) + save(inf) + + resp = infrastructures() + assert len(resp) == 1 + assert resp[0].id == constants.INFRASTRUCTURE_ID + + def test_workbench_get_all_intrusion_sets(): ins = IntrusionSet( id=constants.INTRUSION_SET_ID, **constants.INTRUSION_SET_KWARGS @@ -103,6 +123,15 @@ def test_workbench_get_all_intrusion_sets(): assert resp[0].id == constants.INTRUSION_SET_ID +def test_workbench_get_all_locations(): + loc = Location(id=constants.LOCATION_ID, **constants.LOCATION_KWARGS) + save(loc) + + resp = locations() + assert len(resp) == 1 + assert resp[0].id == constants.LOCATION_ID + + def test_workbench_get_all_malware(): mal = Malware(id=constants.MALWARE_ID, **constants.MALWARE_KWARGS) save(mal) @@ -112,6 +141,24 @@ def test_workbench_get_all_malware(): assert resp[0].id == constants.MALWARE_ID +def test_workbench_get_all_malware_analyses(): + mal = MalwareAnalysis(id=constants.MALWARE_ANALYSIS_ID, **constants.MALWARE_ANALYSIS_KWARGS) + save(mal) + + resp = malware_analyses() + assert len(resp) == 1 + assert resp[0].id == constants.MALWARE_ANALYSIS_ID + + +def test_workbench_get_all_notes(): + note = Note(id=constants.NOTE_ID, **constants.NOTE_KWARGS) + save(note) + + resp = notes() + assert len(resp) == 1 + assert resp[0].id == constants.NOTE_ID + + def test_workbench_get_all_observed_data(): od = ObservedData( id=constants.OBSERVED_DATA_ID, **constants.OBSERVED_DATA_KWARGS @@ -123,6 +170,15 @@ def test_workbench_get_all_observed_data(): assert resp[0].id == constants.OBSERVED_DATA_ID +def test_workbench_get_all_opinions(): + op = Opinion(id=constants.OPINION_ID, **constants.OPINION_KWARGS) + save(op) + + resp = opinions() + assert len(resp) == 1 + assert resp[0].id == constants.OPINION_ID + + def test_workbench_get_all_reports(): rep = Report(id=constants.REPORT_ID, **constants.REPORT_KWARGS) save(rep) @@ -210,6 +266,7 @@ def test_workbench_related(): def test_workbench_related_with_filters(): malware = Malware( labels=["ransomware"], name="CryptorBit", created_by_ref=constants.IDENTITY_ID, + is_family=False, ) rel = Relationship(malware.id, 'variant-of', constants.MALWARE_ID) save([malware, rel]) diff --git a/stix2/test/v21/constants.py b/stix2/test/v21/constants.py index c3ce3c0..d5bef24 100644 --- a/stix2/test/v21/constants.py +++ b/stix2/test/v21/constants.py @@ -14,6 +14,7 @@ INFRASTRUCTURE_ID = "infrastructure--3000ae1b-784c-f03d-8abc-0a625b2ff018" INTRUSION_SET_ID = "intrusion-set--4e78f46f-a023-4e5f-bc24-71b3ca22ec29" LOCATION_ID = "location--a6e9345f-5a15-4c29-8bb3-7dcc5d168d64" MALWARE_ID = "malware--9c4638ec-f1de-4ddb-abf4-1b760417654e" +MALWARE_ANALYSIS_ID = "malware-analysis--b46ee0ad-9443-41c5-a8e3-0fa053262805" MARKING_DEFINITION_ID = "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" NOTE_ID = "note--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061" OBSERVED_DATA_ID = "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf" @@ -102,6 +103,10 @@ INTRUSION_SET_KWARGS = dict( name="Bobcat Breakin", ) +LOCATION_KWARGS = dict( + region="africa", +) + MALWARE_KWARGS = dict( malware_types=['ransomware'], name="Cryptolocker", @@ -119,6 +124,16 @@ MALWARE_MORE_KWARGS = dict( is_family=False, ) +MALWARE_ANALYSIS_KWARGS = dict( + product="microsoft", + result="malicious", +) + +NOTE_KWARGS = dict( + content="Heartbleed", + object_refs=[CAMPAIGN_ID] +) + OBSERVED_DATA_KWARGS = dict( first_observed=FAKE_TIME, last_observed=FAKE_TIME, @@ -131,6 +146,11 @@ OBSERVED_DATA_KWARGS = dict( }, ) +OPINION_KWARGS = dict( + opinion="agree", + object_refs=[CAMPAIGN_ID] +) + REPORT_KWARGS = dict( report_types=["campaign"], name="Bad Cybercrime", diff --git a/stix2/workbench.py b/stix2/workbench.py index 57474a9..7b3f46a 100644 --- a/stix2/workbench.py +++ b/stix2/workbench.py @@ -25,11 +25,17 @@ import stix2 from . import AttackPattern as _AttackPattern from . import Campaign as _Campaign from . import CourseOfAction as _CourseOfAction +from . import Grouping as _Grouping from . import Identity as _Identity from . import Indicator as _Indicator +from . import Infrastructure as _Infrastructure from . import IntrusionSet as _IntrusionSet +from . import Location as _Location from . import Malware as _Malware +from . import MalwareAnalysis as _MalwareAnalysis +from . import Note as _Note from . import ObservedData as _ObservedData +from . import Opinion as _Opinion from . import Report as _Report from . import ThreatActor as _ThreatActor from . import Tool as _Tool @@ -40,7 +46,7 @@ from . import ( # noqa: F401 Directory, DomainName, EmailAddress, EmailMessage, EmailMIMEComponent, Environment, ExternalReference, File, FileSystemSource, Filter, GranularMarking, HTTPRequestExt, - ICMPExt, IPv4Address, IPv6Address, KillChainPhase, MACAddress, + ICMPExt, IPv4Address, IPv6Address, KillChainPhase, LanguageContent, MACAddress, MarkingDefinition, MemoryStore, Mutex, NetworkTraffic, NTFSExt, parse_observable, PDFExt, Process, RasterImageExt, Relationship, Sighting, SocketExt, Software, StatementMarking, @@ -56,6 +62,7 @@ from .datastore.filters import FilterSet # Enable some adaptation to the current default supported STIX version. _STIX_VID = "v" + stix2.DEFAULT_VERSION.replace(".", "") +print(_STIX_VID) # Use an implicit MemoryStore @@ -84,12 +91,13 @@ add_data_sources = _environ.source.add_data_sources STIX_OBJS = [ - _AttackPattern, _Campaign, _CourseOfAction, _Identity, - _Indicator, _IntrusionSet, _Malware, _ObservedData, _Report, + _AttackPattern, _Campaign, _CourseOfAction, _Identity, _Grouping, + _Indicator, _Infrastructure, _IntrusionSet, _Location, _Malware, + _MalwareAnalysis, _Note, _ObservedData, _Opinion, _Report, _ThreatActor, _Tool, _Vulnerability, ] -STIX_OBJ_DOCS = """ +STIX_OBJ_DOCS = """s .. method:: created_by(*args, **kwargs) @@ -202,6 +210,19 @@ def courses_of_action(filters=None): return query(filter_list) +def groupings(filters=None): + """Retrieve all Grouping objects. + + Args: + filters (list, optional): A list of additional filters to apply to + the query. + + """ + filter_list = FilterSet(filters) + filter_list.add(Filter('type', '=', 'grouping')) + return query(filter_list) + + def identities(filters=None): """Retrieve all Identity objects. @@ -228,6 +249,19 @@ def indicators(filters=None): return query(filter_list) +def infrastructures(filters=None): + """Retrieve all Infrastructure objects. + + Args: + filters (list, optional): A list of additional filters to apply to + the query. + + """ + filter_list = FilterSet(filters) + filter_list.add(Filter('type', '=', 'infrastructure')) + return query(filter_list) + + def intrusion_sets(filters=None): """Retrieve all Intrusion Set objects. @@ -241,6 +275,19 @@ def intrusion_sets(filters=None): return query(filter_list) +def locations(filters=None): + """Retrieve all Location objects. + + Args: + filters (list, optional): A list of additional filters to apply to + the query. + + """ + filter_list = FilterSet(filters) + filter_list.add(Filter('type', '=', 'location')) + return query(filter_list) + + def malware(filters=None): """Retrieve all Malware objects. @@ -254,6 +301,32 @@ def malware(filters=None): return query(filter_list) +def malware_analyses(filters=None): + """Retrieve all Malware Analysis objects. + + Args: + filters (list, optional): A list of additional filters to apply to + the query. + + """ + filter_list = FilterSet(filters) + filter_list.add(Filter('type', '=', 'malware-analysis')) + return query(filter_list) + + +def notes(filters=None): + """Retrieve all Note objects. + + Args: + filters (list, optional): A list of additional filters to apply to + the query. + + """ + filter_list = FilterSet(filters) + filter_list.add(Filter('type', '=', 'note')) + return query(filter_list) + + def observed_data(filters=None): """Retrieve all Observed Data objects. @@ -267,6 +340,19 @@ def observed_data(filters=None): return query(filter_list) +def opinions(filters=None): + """Retrieve all Opinion objects. + + Args: + filters (list, optional): A list of additional filters to apply to + the query. + + """ + filter_list = FilterSet(filters) + filter_list.add(Filter('type', '=', 'opinion')) + return query(filter_list) + + def reports(filters=None): """Retrieve all Report objects.