Create Workbench layer

Contains a default implicit Environment and functions to get all objects
a specific type.
stix2.0
Chris Lenk 2017-11-29 08:58:01 -05:00
parent e92db2417a
commit aeff8f4bc0
3 changed files with 259 additions and 9 deletions

View File

@ -34,14 +34,18 @@ RELATIONSHIP_IDS = [
'relationship--a0cbb21c-8daf-4a7f-96aa-7155a4ef8f70'
]
# All required args for a Campaign instance
# *_KWARGS contains all required arguments to create an instance of that STIX object
# *_MORE_KWARGS contains all the required arguments, plus some optional ones
ATTACK_PATTERN_KWARGS = dict(
name="Phishing",
)
CAMPAIGN_KWARGS = dict(
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector.",
)
# All required args for a Campaign instance, plus some optional args
CAMPAIGN_MORE_KWARGS = dict(
type='campaign',
id=CAMPAIGN_ID,
@ -52,25 +56,29 @@ CAMPAIGN_MORE_KWARGS = dict(
description="Campaign by Green Group against a series of targets in the financial services sector.",
)
# Minimum required args for an Identity instance
COURSE_OF_ACTION_KWARGS = dict(
name="Block",
)
IDENTITY_KWARGS = dict(
name="John Smith",
identity_class="individual",
)
# Minimum required args for an Indicator instance
INDICATOR_KWARGS = dict(
labels=['malicious-activity'],
pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
)
# Minimum required args for a Malware instance
INTRUSION_SET_KWARGS = dict(
name="Bobcat Breakin",
)
MALWARE_KWARGS = dict(
labels=['ransomware'],
name="Cryptolocker",
)
# All required args for a Malware instance, plus some optional args
MALWARE_MORE_KWARGS = dict(
type='malware',
id=MALWARE_ID,
@ -81,14 +89,45 @@ MALWARE_MORE_KWARGS = dict(
description="A ransomware related to ..."
)
# Minimum required args for a Relationship instance
OBSERVED_DATA_KWARGS = dict(
first_observed=FAKE_TIME,
last_observed=FAKE_TIME,
number_observed=1,
objects={
"0": {
"type": "windows-registry-key",
"key": "HKEY_LOCAL_MACHINE\\System\\Foo\\Bar",
}
}
)
REPORT_KWARGS = dict(
labels=["campaign"],
name="Bad Cybercrime",
published=FAKE_TIME,
object_refs=[INDICATOR_ID],
)
RELATIONSHIP_KWARGS = dict(
relationship_type="indicates",
source_ref=INDICATOR_ID,
target_ref=MALWARE_ID,
)
# Minimum required args for a Sighting instance
SIGHTING_KWARGS = dict(
sighting_of_ref=INDICATOR_ID,
)
THREAT_ACTOR_KWARGS = dict(
labels=["crime-syndicate"],
name="Evil Org",
)
TOOL_KWARGS = dict(
labels=["remote-access"],
name="VNC",
)
VULNERABILITY_KWARGS = dict(
name="Heartbleed",
)

View File

@ -0,0 +1,139 @@
import stix2
from stix2.workbench import (add, all_versions, attack_patterns, campaigns,
courses_of_action, create, get, identities,
indicators, intrusion_sets, malware,
observed_data, query, reports, threat_actors,
tools, vulnerabilities)
from .constants import (ATTACK_PATTERN_ID, ATTACK_PATTERN_KWARGS, CAMPAIGN_ID,
CAMPAIGN_KWARGS, COURSE_OF_ACTION_ID,
COURSE_OF_ACTION_KWARGS, IDENTITY_ID, IDENTITY_KWARGS,
INDICATOR_ID, INDICATOR_KWARGS, INTRUSION_SET_ID,
INTRUSION_SET_KWARGS, MALWARE_ID, MALWARE_KWARGS,
OBSERVED_DATA_ID, OBSERVED_DATA_KWARGS, REPORT_ID,
REPORT_KWARGS, THREAT_ACTOR_ID, THREAT_ACTOR_KWARGS,
TOOL_ID, TOOL_KWARGS, VULNERABILITY_ID,
VULNERABILITY_KWARGS)
def test_workbench_environment():
# Create a STIX object
ind = create(stix2.Indicator, id=INDICATOR_ID, **INDICATOR_KWARGS)
add(ind)
resp = get(INDICATOR_ID)
assert resp['labels'][0] == 'malicious-activity'
resp = all_versions(INDICATOR_ID)
assert len(resp) == 1
# Search on something other than id
q = [stix2.Filter('type', '=', 'vulnerability')]
resp = query(q)
assert len(resp) == 0
def test_workbench_get_all_attack_patterns():
mal = stix2.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS)
add(mal)
resp = attack_patterns()
assert len(resp) == 1
assert resp[0].id == ATTACK_PATTERN_ID
def test_workbench_get_all_campaigns():
cam = stix2.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
add(cam)
resp = campaigns()
assert len(resp) == 1
assert resp[0].id == CAMPAIGN_ID
def test_workbench_get_all_courses_of_action():
coa = stix2.CourseOfAction(id=COURSE_OF_ACTION_ID, **COURSE_OF_ACTION_KWARGS)
add(coa)
resp = courses_of_action()
assert len(resp) == 1
assert resp[0].id == COURSE_OF_ACTION_ID
def test_workbench_get_all_identities():
idty = stix2.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
add(idty)
resp = identities()
assert len(resp) == 1
assert resp[0].id == IDENTITY_ID
def test_workbench_get_all_indicators():
resp = indicators()
assert len(resp) == 1
assert resp[0].id == INDICATOR_ID
def test_workbench_get_all_intrusion_sets():
ins = stix2.IntrusionSet(id=INTRUSION_SET_ID, **INTRUSION_SET_KWARGS)
add(ins)
resp = intrusion_sets()
assert len(resp) == 1
assert resp[0].id == INTRUSION_SET_ID
def test_workbench_get_all_malware():
mal = stix2.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
add(mal)
resp = malware()
assert len(resp) == 1
assert resp[0].id == MALWARE_ID
def test_workbench_get_all_observed_data():
od = stix2.ObservedData(id=OBSERVED_DATA_ID, **OBSERVED_DATA_KWARGS)
add(od)
resp = observed_data()
assert len(resp) == 1
assert resp[0].id == OBSERVED_DATA_ID
def test_workbench_get_all_reports():
rep = stix2.Report(id=REPORT_ID, **REPORT_KWARGS)
add(rep)
resp = reports()
assert len(resp) == 1
assert resp[0].id == REPORT_ID
def test_workbench_get_all_threat_actors():
thr = stix2.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS)
add(thr)
resp = threat_actors()
assert len(resp) == 1
assert resp[0].id == THREAT_ACTOR_ID
def test_workbench_get_all_tools():
tool = stix2.Tool(id=TOOL_ID, **TOOL_KWARGS)
add(tool)
resp = tools()
assert len(resp) == 1
assert resp[0].id == TOOL_ID
def test_workbench_get_all_vulnerabilities():
vuln = stix2.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
add(vuln)
resp = vulnerabilities()
assert len(resp) == 1
assert resp[0].id == VULNERABILITY_ID

72
stix2/workbench.py Normal file
View File

@ -0,0 +1,72 @@
"""Functions and class wrappers for interacting with STIX data at a high level.
"""
from .environment import Environment
from .sources.filters import Filter
from .sources.memory import MemoryStore
_environ = Environment(store=MemoryStore())
create = _environ.create
get = _environ.get
all_versions = _environ.all_versions
query = _environ.query
creator_of = _environ.creator_of
relationships = _environ.relationships
related_to = _environ.related_to
add = _environ.add
add_filters = _environ.add_filters
add_filter = _environ.add_filter
parse = _environ.parse
add_data_source = _environ.source.add_data_source
# Functions to get all objects of a specific type
def attack_patterns():
return query(Filter('type', '=', 'attack-pattern'))
def campaigns():
return query(Filter('type', '=', 'campaign'))
def courses_of_action():
return query(Filter('type', '=', 'course-of-action'))
def identities():
return query(Filter('type', '=', 'identity'))
def indicators():
return query(Filter('type', '=', 'indicator'))
def intrusion_sets():
return query(Filter('type', '=', 'intrusion-set'))
def malware():
return query(Filter('type', '=', 'malware'))
def observed_data():
return query(Filter('type', '=', 'observed-data'))
def reports():
return query(Filter('type', '=', 'report'))
def threat_actors():
return query(Filter('type', '=', 'threat-actor'))
def tools():
return query(Filter('type', '=', 'tool'))
def vulnerabilities():
return query(Filter('type', '=', 'vulnerability'))