Update Workbench for 2.1

pull/1/head
Chris Lenk 2020-06-18 11:37:52 -04:00
parent d62f5ee141
commit 3ef63d5e17
3 changed files with 172 additions and 9 deletions

View File

@ -4,12 +4,14 @@ import os
import stix2
from stix2.workbench import (
_STIX_VID, AttackPattern, Bundle, Campaign, CourseOfAction,
ExternalReference, File, FileSystemSource, Filter, Identity, Indicator,
IntrusionSet, Malware, MarkingDefinition, NTFSExt, ObservedData,
ExternalReference, File, FileSystemSource, Filter, Grouping, Identity,
Indicator, Infrastructure, IntrusionSet, Location, Malware,
MalwareAnalysis, MarkingDefinition, Note, NTFSExt, ObservedData, Opinion,
Relationship, Report, StatementMarking, ThreatActor, Tool, Vulnerability,
add_data_source, all_versions, attack_patterns, campaigns,
courses_of_action, create, get, identities, indicators, intrusion_sets,
malware, observed_data, query, reports, save, set_default_created,
courses_of_action, create, get, groupings, identities, indicators,
infrastructures, intrusion_sets, locations, malware, malware_analyses,
notes, observed_data, opinions, query, reports, save, set_default_created,
set_default_creator, set_default_external_refs,
set_default_object_marking_refs, threat_actors, tools, vulnerabilities,
)
@ -35,7 +37,7 @@ def test_workbench_environment():
save(ind)
resp = get(constants.INDICATOR_ID)
assert resp['labels'][0] == 'malicious-activity'
assert resp['indicator_types'][0] == 'malicious-activity'
resp = all_versions(constants.INDICATOR_ID)
assert len(resp) == 1
@ -77,6 +79,15 @@ def test_workbench_get_all_courses_of_action():
assert resp[0].id == constants.COURSE_OF_ACTION_ID
def test_workbench_get_all_groupings():
grup = Grouping(id=constants.GROUPING_ID, **constants.GROUPING_KWARGS)
save(grup)
resp = groupings()
assert len(resp) == 1
assert resp[0].id == constants.GROUPING_ID
def test_workbench_get_all_identities():
idty = Identity(id=constants.IDENTITY_ID, **constants.IDENTITY_KWARGS)
save(idty)
@ -92,6 +103,15 @@ def test_workbench_get_all_indicators():
assert resp[0].id == constants.INDICATOR_ID
def test_workbench_get_all_infrastructures():
inf = Infrastructure(id=constants.INFRASTRUCTURE_ID, **constants.INFRASTRUCTURE_KWARGS)
save(inf)
resp = infrastructures()
assert len(resp) == 1
assert resp[0].id == constants.INFRASTRUCTURE_ID
def test_workbench_get_all_intrusion_sets():
ins = IntrusionSet(
id=constants.INTRUSION_SET_ID, **constants.INTRUSION_SET_KWARGS
@ -103,6 +123,15 @@ def test_workbench_get_all_intrusion_sets():
assert resp[0].id == constants.INTRUSION_SET_ID
def test_workbench_get_all_locations():
loc = Location(id=constants.LOCATION_ID, **constants.LOCATION_KWARGS)
save(loc)
resp = locations()
assert len(resp) == 1
assert resp[0].id == constants.LOCATION_ID
def test_workbench_get_all_malware():
mal = Malware(id=constants.MALWARE_ID, **constants.MALWARE_KWARGS)
save(mal)
@ -112,6 +141,24 @@ def test_workbench_get_all_malware():
assert resp[0].id == constants.MALWARE_ID
def test_workbench_get_all_malware_analyses():
mal = MalwareAnalysis(id=constants.MALWARE_ANALYSIS_ID, **constants.MALWARE_ANALYSIS_KWARGS)
save(mal)
resp = malware_analyses()
assert len(resp) == 1
assert resp[0].id == constants.MALWARE_ANALYSIS_ID
def test_workbench_get_all_notes():
note = Note(id=constants.NOTE_ID, **constants.NOTE_KWARGS)
save(note)
resp = notes()
assert len(resp) == 1
assert resp[0].id == constants.NOTE_ID
def test_workbench_get_all_observed_data():
od = ObservedData(
id=constants.OBSERVED_DATA_ID, **constants.OBSERVED_DATA_KWARGS
@ -123,6 +170,15 @@ def test_workbench_get_all_observed_data():
assert resp[0].id == constants.OBSERVED_DATA_ID
def test_workbench_get_all_opinions():
op = Opinion(id=constants.OPINION_ID, **constants.OPINION_KWARGS)
save(op)
resp = opinions()
assert len(resp) == 1
assert resp[0].id == constants.OPINION_ID
def test_workbench_get_all_reports():
rep = Report(id=constants.REPORT_ID, **constants.REPORT_KWARGS)
save(rep)
@ -210,6 +266,7 @@ def test_workbench_related():
def test_workbench_related_with_filters():
malware = Malware(
labels=["ransomware"], name="CryptorBit", created_by_ref=constants.IDENTITY_ID,
is_family=False,
)
rel = Relationship(malware.id, 'variant-of', constants.MALWARE_ID)
save([malware, rel])

View File

@ -14,6 +14,7 @@ INFRASTRUCTURE_ID = "infrastructure--3000ae1b-784c-f03d-8abc-0a625b2ff018"
INTRUSION_SET_ID = "intrusion-set--4e78f46f-a023-4e5f-bc24-71b3ca22ec29"
LOCATION_ID = "location--a6e9345f-5a15-4c29-8bb3-7dcc5d168d64"
MALWARE_ID = "malware--9c4638ec-f1de-4ddb-abf4-1b760417654e"
MALWARE_ANALYSIS_ID = "malware-analysis--b46ee0ad-9443-41c5-a8e3-0fa053262805"
MARKING_DEFINITION_ID = "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
NOTE_ID = "note--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061"
OBSERVED_DATA_ID = "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf"
@ -102,6 +103,10 @@ INTRUSION_SET_KWARGS = dict(
name="Bobcat Breakin",
)
LOCATION_KWARGS = dict(
region="africa",
)
MALWARE_KWARGS = dict(
malware_types=['ransomware'],
name="Cryptolocker",
@ -119,6 +124,16 @@ MALWARE_MORE_KWARGS = dict(
is_family=False,
)
MALWARE_ANALYSIS_KWARGS = dict(
product="microsoft",
result="malicious",
)
NOTE_KWARGS = dict(
content="Heartbleed",
object_refs=[CAMPAIGN_ID]
)
OBSERVED_DATA_KWARGS = dict(
first_observed=FAKE_TIME,
last_observed=FAKE_TIME,
@ -131,6 +146,11 @@ OBSERVED_DATA_KWARGS = dict(
},
)
OPINION_KWARGS = dict(
opinion="agree",
object_refs=[CAMPAIGN_ID]
)
REPORT_KWARGS = dict(
report_types=["campaign"],
name="Bad Cybercrime",

View File

@ -25,11 +25,17 @@ import stix2
from . import AttackPattern as _AttackPattern
from . import Campaign as _Campaign
from . import CourseOfAction as _CourseOfAction
from . import Grouping as _Grouping
from . import Identity as _Identity
from . import Indicator as _Indicator
from . import Infrastructure as _Infrastructure
from . import IntrusionSet as _IntrusionSet
from . import Location as _Location
from . import Malware as _Malware
from . import MalwareAnalysis as _MalwareAnalysis
from . import Note as _Note
from . import ObservedData as _ObservedData
from . import Opinion as _Opinion
from . import Report as _Report
from . import ThreatActor as _ThreatActor
from . import Tool as _Tool
@ -40,7 +46,7 @@ from . import ( # noqa: F401
Directory, DomainName, EmailAddress, EmailMessage,
EmailMIMEComponent, Environment, ExternalReference, File,
FileSystemSource, Filter, GranularMarking, HTTPRequestExt,
ICMPExt, IPv4Address, IPv6Address, KillChainPhase, MACAddress,
ICMPExt, IPv4Address, IPv6Address, KillChainPhase, LanguageContent, MACAddress,
MarkingDefinition, MemoryStore, Mutex, NetworkTraffic, NTFSExt,
parse_observable, PDFExt, Process, RasterImageExt, Relationship,
Sighting, SocketExt, Software, StatementMarking,
@ -56,6 +62,7 @@ from .datastore.filters import FilterSet
# Enable some adaptation to the current default supported STIX version.
_STIX_VID = "v" + stix2.DEFAULT_VERSION.replace(".", "")
print(_STIX_VID)
# Use an implicit MemoryStore
@ -84,12 +91,13 @@ add_data_sources = _environ.source.add_data_sources
STIX_OBJS = [
_AttackPattern, _Campaign, _CourseOfAction, _Identity,
_Indicator, _IntrusionSet, _Malware, _ObservedData, _Report,
_AttackPattern, _Campaign, _CourseOfAction, _Identity, _Grouping,
_Indicator, _Infrastructure, _IntrusionSet, _Location, _Malware,
_MalwareAnalysis, _Note, _ObservedData, _Opinion, _Report,
_ThreatActor, _Tool, _Vulnerability,
]
STIX_OBJ_DOCS = """
STIX_OBJ_DOCS = """s
.. method:: created_by(*args, **kwargs)
@ -202,6 +210,19 @@ def courses_of_action(filters=None):
return query(filter_list)
def groupings(filters=None):
"""Retrieve all Grouping objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'grouping'))
return query(filter_list)
def identities(filters=None):
"""Retrieve all Identity objects.
@ -228,6 +249,19 @@ def indicators(filters=None):
return query(filter_list)
def infrastructures(filters=None):
"""Retrieve all Infrastructure objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'infrastructure'))
return query(filter_list)
def intrusion_sets(filters=None):
"""Retrieve all Intrusion Set objects.
@ -241,6 +275,19 @@ def intrusion_sets(filters=None):
return query(filter_list)
def locations(filters=None):
"""Retrieve all Location objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'location'))
return query(filter_list)
def malware(filters=None):
"""Retrieve all Malware objects.
@ -254,6 +301,32 @@ def malware(filters=None):
return query(filter_list)
def malware_analyses(filters=None):
"""Retrieve all Malware Analysis objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'malware-analysis'))
return query(filter_list)
def notes(filters=None):
"""Retrieve all Note objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'note'))
return query(filter_list)
def observed_data(filters=None):
"""Retrieve all Observed Data objects.
@ -267,6 +340,19 @@ def observed_data(filters=None):
return query(filter_list)
def opinions(filters=None):
"""Retrieve all Opinion objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'opinion'))
return query(filter_list)
def reports(filters=None):
"""Retrieve all Report objects.