diff --git a/stix2/test/constants.py b/stix2/test/constants.py index 3db39d6..ab7fcf3 100644 --- a/stix2/test/constants.py +++ b/stix2/test/constants.py @@ -34,14 +34,18 @@ RELATIONSHIP_IDS = [ 'relationship--a0cbb21c-8daf-4a7f-96aa-7155a4ef8f70' ] -# All required args for a Campaign instance +# *_KWARGS contains all required arguments to create an instance of that STIX object +# *_MORE_KWARGS contains all the required arguments, plus some optional ones + +ATTACK_PATTERN_KWARGS = dict( + name="Phishing", +) + CAMPAIGN_KWARGS = dict( name="Green Group Attacks Against Finance", description="Campaign by Green Group against a series of targets in the financial services sector.", ) - -# All required args for a Campaign instance, plus some optional args CAMPAIGN_MORE_KWARGS = dict( type='campaign', id=CAMPAIGN_ID, @@ -52,25 +56,29 @@ CAMPAIGN_MORE_KWARGS = dict( description="Campaign by Green Group against a series of targets in the financial services sector.", ) -# Minimum required args for an Identity instance +COURSE_OF_ACTION_KWARGS = dict( + name="Block", +) + IDENTITY_KWARGS = dict( name="John Smith", identity_class="individual", ) -# Minimum required args for an Indicator instance INDICATOR_KWARGS = dict( labels=['malicious-activity'], pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']", ) -# Minimum required args for a Malware instance +INTRUSION_SET_KWARGS = dict( + name="Bobcat Breakin", +) + MALWARE_KWARGS = dict( labels=['ransomware'], name="Cryptolocker", ) -# All required args for a Malware instance, plus some optional args MALWARE_MORE_KWARGS = dict( type='malware', id=MALWARE_ID, @@ -81,14 +89,45 @@ MALWARE_MORE_KWARGS = dict( description="A ransomware related to ..." ) -# Minimum required args for a Relationship instance +OBSERVED_DATA_KWARGS = dict( + first_observed=FAKE_TIME, + last_observed=FAKE_TIME, + number_observed=1, + objects={ + "0": { + "type": "windows-registry-key", + "key": "HKEY_LOCAL_MACHINE\\System\\Foo\\Bar", + } + } +) + +REPORT_KWARGS = dict( + labels=["campaign"], + name="Bad Cybercrime", + published=FAKE_TIME, + object_refs=[INDICATOR_ID], +) + RELATIONSHIP_KWARGS = dict( relationship_type="indicates", source_ref=INDICATOR_ID, target_ref=MALWARE_ID, ) -# Minimum required args for a Sighting instance SIGHTING_KWARGS = dict( sighting_of_ref=INDICATOR_ID, ) + +THREAT_ACTOR_KWARGS = dict( + labels=["crime-syndicate"], + name="Evil Org", +) + +TOOL_KWARGS = dict( + labels=["remote-access"], + name="VNC", +) + +VULNERABILITY_KWARGS = dict( + name="Heartbleed", +) diff --git a/stix2/test/test_workbench.py b/stix2/test/test_workbench.py new file mode 100644 index 0000000..5e2809b --- /dev/null +++ b/stix2/test/test_workbench.py @@ -0,0 +1,139 @@ +import stix2 +from stix2.workbench import (add, all_versions, attack_patterns, campaigns, + courses_of_action, create, get, identities, + indicators, intrusion_sets, malware, + observed_data, query, reports, threat_actors, + tools, vulnerabilities) + +from .constants import (ATTACK_PATTERN_ID, ATTACK_PATTERN_KWARGS, CAMPAIGN_ID, + CAMPAIGN_KWARGS, COURSE_OF_ACTION_ID, + COURSE_OF_ACTION_KWARGS, IDENTITY_ID, IDENTITY_KWARGS, + INDICATOR_ID, INDICATOR_KWARGS, INTRUSION_SET_ID, + INTRUSION_SET_KWARGS, MALWARE_ID, MALWARE_KWARGS, + OBSERVED_DATA_ID, OBSERVED_DATA_KWARGS, REPORT_ID, + REPORT_KWARGS, THREAT_ACTOR_ID, THREAT_ACTOR_KWARGS, + TOOL_ID, TOOL_KWARGS, VULNERABILITY_ID, + VULNERABILITY_KWARGS) + + +def test_workbench_environment(): + + # Create a STIX object + ind = create(stix2.Indicator, id=INDICATOR_ID, **INDICATOR_KWARGS) + add(ind) + + resp = get(INDICATOR_ID) + assert resp['labels'][0] == 'malicious-activity' + + resp = all_versions(INDICATOR_ID) + assert len(resp) == 1 + + # Search on something other than id + q = [stix2.Filter('type', '=', 'vulnerability')] + resp = query(q) + assert len(resp) == 0 + + +def test_workbench_get_all_attack_patterns(): + mal = stix2.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS) + add(mal) + + resp = attack_patterns() + assert len(resp) == 1 + assert resp[0].id == ATTACK_PATTERN_ID + + +def test_workbench_get_all_campaigns(): + cam = stix2.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS) + add(cam) + + resp = campaigns() + assert len(resp) == 1 + assert resp[0].id == CAMPAIGN_ID + + +def test_workbench_get_all_courses_of_action(): + coa = stix2.CourseOfAction(id=COURSE_OF_ACTION_ID, **COURSE_OF_ACTION_KWARGS) + add(coa) + + resp = courses_of_action() + assert len(resp) == 1 + assert resp[0].id == COURSE_OF_ACTION_ID + + +def test_workbench_get_all_identities(): + idty = stix2.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS) + add(idty) + + resp = identities() + assert len(resp) == 1 + assert resp[0].id == IDENTITY_ID + + +def test_workbench_get_all_indicators(): + resp = indicators() + assert len(resp) == 1 + assert resp[0].id == INDICATOR_ID + + +def test_workbench_get_all_intrusion_sets(): + ins = stix2.IntrusionSet(id=INTRUSION_SET_ID, **INTRUSION_SET_KWARGS) + add(ins) + + resp = intrusion_sets() + assert len(resp) == 1 + assert resp[0].id == INTRUSION_SET_ID + + +def test_workbench_get_all_malware(): + mal = stix2.Malware(id=MALWARE_ID, **MALWARE_KWARGS) + add(mal) + + resp = malware() + assert len(resp) == 1 + assert resp[0].id == MALWARE_ID + + +def test_workbench_get_all_observed_data(): + od = stix2.ObservedData(id=OBSERVED_DATA_ID, **OBSERVED_DATA_KWARGS) + add(od) + + resp = observed_data() + assert len(resp) == 1 + assert resp[0].id == OBSERVED_DATA_ID + + +def test_workbench_get_all_reports(): + rep = stix2.Report(id=REPORT_ID, **REPORT_KWARGS) + add(rep) + + resp = reports() + assert len(resp) == 1 + assert resp[0].id == REPORT_ID + + +def test_workbench_get_all_threat_actors(): + thr = stix2.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS) + add(thr) + + resp = threat_actors() + assert len(resp) == 1 + assert resp[0].id == THREAT_ACTOR_ID + + +def test_workbench_get_all_tools(): + tool = stix2.Tool(id=TOOL_ID, **TOOL_KWARGS) + add(tool) + + resp = tools() + assert len(resp) == 1 + assert resp[0].id == TOOL_ID + + +def test_workbench_get_all_vulnerabilities(): + vuln = stix2.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS) + add(vuln) + + resp = vulnerabilities() + assert len(resp) == 1 + assert resp[0].id == VULNERABILITY_ID diff --git a/stix2/workbench.py b/stix2/workbench.py new file mode 100644 index 0000000..55c8009 --- /dev/null +++ b/stix2/workbench.py @@ -0,0 +1,72 @@ +"""Functions and class wrappers for interacting with STIX data at a high level. +""" + +from .environment import Environment +from .sources.filters import Filter +from .sources.memory import MemoryStore + +_environ = Environment(store=MemoryStore()) + +create = _environ.create +get = _environ.get +all_versions = _environ.all_versions +query = _environ.query +creator_of = _environ.creator_of +relationships = _environ.relationships +related_to = _environ.related_to +add = _environ.add +add_filters = _environ.add_filters +add_filter = _environ.add_filter +parse = _environ.parse +add_data_source = _environ.source.add_data_source + + +# Functions to get all objects of a specific type + + +def attack_patterns(): + return query(Filter('type', '=', 'attack-pattern')) + + +def campaigns(): + return query(Filter('type', '=', 'campaign')) + + +def courses_of_action(): + return query(Filter('type', '=', 'course-of-action')) + + +def identities(): + return query(Filter('type', '=', 'identity')) + + +def indicators(): + return query(Filter('type', '=', 'indicator')) + + +def intrusion_sets(): + return query(Filter('type', '=', 'intrusion-set')) + + +def malware(): + return query(Filter('type', '=', 'malware')) + + +def observed_data(): + return query(Filter('type', '=', 'observed-data')) + + +def reports(): + return query(Filter('type', '=', 'report')) + + +def threat_actors(): + return query(Filter('type', '=', 'threat-actor')) + + +def tools(): + return query(Filter('type', '=', 'tool')) + + +def vulnerabilities(): + return query(Filter('type', '=', 'vulnerability'))