diff --git a/.isort.cfg b/.isort.cfg index 0fadb83..cca9d19 100644 --- a/.isort.cfg +++ b/.isort.cfg @@ -1,5 +1,6 @@ [settings] not_skip = __init__.py +skip = workbench.py known_third_party = dateutil, ordereddict, diff --git a/docs/api/stix2.workbench.rst b/docs/api/stix2.workbench.rst new file mode 100644 index 0000000..19345f0 --- /dev/null +++ b/docs/api/stix2.workbench.rst @@ -0,0 +1,5 @@ +workbench +=============== + +.. automodule:: stix2.workbench + :members: \ No newline at end of file diff --git a/stix2/__init__.py b/stix2/__init__.py index 401d44b..89043ec 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -11,6 +11,7 @@ patterns properties utils + workbench v20.common v20.observables v20.sdo diff --git a/stix2/test/test_workbench.py b/stix2/test/test_workbench.py index 13aad23..a2016cc 100644 --- a/stix2/test/test_workbench.py +++ b/stix2/test/test_workbench.py @@ -1,14 +1,16 @@ import os import stix2 -from stix2.workbench import (AttackPattern, Campaign, CourseOfAction, Identity, - Indicator, IntrusionSet, Malware, ObservedData, - Report, ThreatActor, Tool, Vulnerability, add, - add_data_source, all_versions, attack_patterns, - campaigns, courses_of_action, create, get, - identities, indicators, intrusion_sets, malware, - observed_data, query, reports, - set_default_created, set_default_creator, +from stix2.workbench import (AttackPattern, Campaign, CourseOfAction, + ExternalReference, FileSystemSource, Filter, + Identity, Indicator, IntrusionSet, Malware, + MarkingDefinition, ObservedData, Relationship, + Report, StatementMarking, ThreatActor, Tool, + Vulnerability, add, add_data_source, all_versions, + attack_patterns, campaigns, courses_of_action, + create, get, identities, indicators, + intrusion_sets, malware, observed_data, query, + reports, set_default_created, set_default_creator, set_default_external_refs, set_default_object_marking_refs, threat_actors, tools, vulnerabilities) @@ -37,7 +39,7 @@ def test_workbench_environment(): assert len(resp) == 1 # Search on something other than id - q = [stix2.Filter('type', '=', 'vulnerability')] + q = [Filter('type', '=', 'vulnerability')] resp = query(q) assert len(resp) == 0 @@ -148,7 +150,7 @@ def test_workbench_get_all_vulnerabilities(): def test_workbench_relationships(): - rel = stix2.Relationship(INDICATOR_ID, 'indicates', MALWARE_ID) + rel = Relationship(INDICATOR_ID, 'indicates', MALWARE_ID) add(rel) ind = get(INDICATOR_ID) @@ -167,8 +169,8 @@ def test_workbench_created_by(): def test_workbench_related(): - rel1 = stix2.Relationship(MALWARE_ID, 'targets', IDENTITY_ID) - rel2 = stix2.Relationship(CAMPAIGN_ID, 'uses', MALWARE_ID) + rel1 = Relationship(MALWARE_ID, 'targets', IDENTITY_ID) + rel2 = Relationship(CAMPAIGN_ID, 'uses', MALWARE_ID) add([rel1, rel2]) resp = get(MALWARE_ID).related() @@ -183,10 +185,10 @@ def test_workbench_related(): def test_workbench_related_with_filters(): malware = Malware(labels=["ransomware"], name="CryptorBit", created_by_ref=IDENTITY_ID) - rel = stix2.Relationship(malware.id, 'variant-of', MALWARE_ID) + rel = Relationship(malware.id, 'variant-of', MALWARE_ID) add([malware, rel]) - filters = [stix2.Filter('created_by_ref', '=', IDENTITY_ID)] + filters = [Filter('created_by_ref', '=', IDENTITY_ID)] resp = get(MALWARE_ID).related(filters=filters) assert len(resp) == 1 @@ -200,7 +202,7 @@ def test_workbench_related_with_filters(): def test_add_data_source(): fs_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data") - fs = stix2.FileSystemSource(fs_path) + fs = FileSystemSource(fs_path) add_data_source(fs) resp = tools() @@ -212,13 +214,13 @@ def test_add_data_source(): def test_additional_filter(): - resp = tools(stix2.Filter('created_by_ref', '=', 'identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5')) + resp = tools(Filter('created_by_ref', '=', 'identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5')) assert len(resp) == 2 def test_additional_filters_list(): - resp = tools([stix2.Filter('created_by_ref', '=', 'identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5'), - stix2.Filter('name', '=', 'Windows Credential Editor')]) + resp = tools([Filter('created_by_ref', '=', 'identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5'), + Filter('name', '=', 'Windows Credential Editor')]) assert len(resp) == 1 @@ -241,8 +243,8 @@ def test_default_created_timestamp(): def test_default_external_refs(): - ext_ref = stix2.ExternalReference(source_name="ACME Threat Intel", - description="Threat report") + ext_ref = ExternalReference(source_name="ACME Threat Intel", + description="Threat report") set_default_external_refs(ext_ref) campaign = Campaign(**CAMPAIGN_KWARGS) @@ -251,9 +253,9 @@ def test_default_external_refs(): def test_default_object_marking_refs(): - stmt_marking = stix2.StatementMarking("Copyright 2016, Example Corp") - mark_def = stix2.MarkingDefinition(definition_type="statement", - definition=stmt_marking) + stmt_marking = StatementMarking("Copyright 2016, Example Corp") + mark_def = MarkingDefinition(definition_type="statement", + definition=stmt_marking) set_default_object_marking_refs(mark_def) campaign = Campaign(**CAMPAIGN_KWARGS) diff --git a/stix2/workbench.py b/stix2/workbench.py index 3a26a64..91d626d 100644 --- a/stix2/workbench.py +++ b/stix2/workbench.py @@ -1,4 +1,24 @@ """Functions and class wrappers for interacting with STIX data at a high level. + +.. autofunction:: create +.. autofunction:: set_default_creator +.. autofunction:: set_default_created +.. autofunction:: set_default_external_refs +.. autofunction:: set_default_object_marking_refs +.. autofunction:: get +.. autofunction:: all_versions +.. autofunction:: query +.. autofunction:: query_by_type +.. autofunction:: creator_of +.. autofunction:: relationships +.. autofunction:: related_to +.. autofunction:: add +.. autofunction:: add_filters +.. autofunction:: add_filter +.. autofunction:: parse +.. autofunction:: add_data_source +.. autofunction:: add_data_sources + """ from . import AttackPattern as _AttackPattern @@ -13,8 +33,21 @@ from . import Report as _Report from . import ThreatActor as _ThreatActor from . import Tool as _Tool from . import Vulnerability as _Vulnerability -from .datastore.memory import MemoryStore -from .environment import Environment +from . import (AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, # noqa: F401 + Bundle, CustomExtension, CustomMarking, CustomObservable, + Directory, DomainName, EmailAddress, EmailMessage, + EmailMIMEComponent, Environment, ExtensionsProperty, + ExternalReference, File, FileSystemSource, Filter, + GranularMarking, HTTPRequestExt, ICMPExt, IPv4Address, + IPv6Address, KillChainPhase, MACAddress, MarkingDefinition, + MemoryStore, Mutex, NetworkTraffic, NTFSExt, parse_observable, + PDFExt, Process, RasterImageExt, Relationship, Sighting, + SocketExt, Software, StatementMarking, TAXIICollectionSource, + TCPExt, TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, TLPMarking, + UNIXAccountExt, URL, UserAccount, WindowsPEBinaryExt, + WindowsPEOptionalHeaderType, WindowsPESection, + WindowsProcessExt, WindowsRegistryKey, WindowsRegistryValueType, + WindowsServiceExt, X509Certificate, X509V3ExtenstionsType) # Use an implicit MemoryStore _environ = Environment(store=MemoryStore()) @@ -46,6 +79,24 @@ STIX_OBJS = [_AttackPattern, _Campaign, _CourseOfAction, _Identity, _Indicator, _IntrusionSet, _Malware, _ObservedData, _Report, _ThreatActor, _Tool, _Vulnerability] +STIX_OBJ_DOCS = """ + +.. method:: created_by(*args, **kwargs) + + {} + +.. method:: relationships(*args, **kwargs) + + {} + +.. method:: related(*args, **kwargs) + + {} + +""".format(_environ.creator_of.__doc__, + _environ.relationships.__doc__, + _environ.related_to.__doc__) + def _created_by_wrapper(self, *args, **kwargs): return _environ.creator_of(self, *args, **kwargs) @@ -76,58 +127,146 @@ def _constructor_wrapper(obj_type): # Create wrapper classes whose constructors call the implicit environment's create() for obj_type in STIX_OBJS: - new_class = type(obj_type.__name__, (), {}) - new_class.__new__ = _constructor_wrapper(obj_type) - new_class.__doc__ = ':autodoc-skip:' + new_class_dict = { + '__new__': _constructor_wrapper(obj_type), + '__doc__': 'Workbench wrapper around the `{0} `__. object. {1}'.format(obj_type.__name__, STIX_OBJ_DOCS) + } + new_class = type(obj_type.__name__, (), new_class_dict) + globals()[obj_type.__name__] = new_class + new_class = None # Functions to get all objects of a specific type def attack_patterns(filters=None): + """Retrieve all Attack Pattern objects. + + Args: + filters (list, optional): A list of additional filters to apply to + the query. + + """ return query_by_type('attack-pattern', filters) def campaigns(filters=None): + """Retrieve all Campaign objects. + + Args: + filters (list, optional): A list of additional filters to apply to + the query. + + """ return query_by_type('campaign', filters) def courses_of_action(filters=None): + """Retrieve all Course of Action objects. + + Args: + filters (list, optional): A list of additional filters to apply to + the query. + + """ return query_by_type('course-of-action', filters) def identities(filters=None): + """Retrieve all Identity objects. + + Args: + filters (list, optional): A list of additional filters to apply to + the query. + + """ return query_by_type('identity', filters) def indicators(filters=None): + """Retrieve all Indicator objects. + + Args: + filters (list, optional): A list of additional filters to apply to + the query. + + """ return query_by_type('indicator', filters) def intrusion_sets(filters=None): + """Retrieve all Intrusion Set objects. + + Args: + filters (list, optional): A list of additional filters to apply to + the query. + + """ return query_by_type('intrusion-set', filters) def malware(filters=None): + """Retrieve all Malware objects. + + Args: + filters (list, optional): A list of additional filters to apply to + the query. + + """ return query_by_type('malware', filters) def observed_data(filters=None): + """Retrieve all Observed Data objects. + + Args: + filters (list, optional): A list of additional filters to apply to + the query. + + """ return query_by_type('observed-data', filters) def reports(filters=None): + """Retrieve all Report objects. + + Args: + filters (list, optional): A list of additional filters to apply to + the query. + + """ return query_by_type('report', filters) def threat_actors(filters=None): + """Retrieve all Threat Actor objects. + + Args: + filters (list, optional): A list of additional filters to apply to + the query. + + """ return query_by_type('threat-actor', filters) def tools(filters=None): + """Retrieve all Tool objects. + + Args: + filters (list, optional): A list of additional filters to apply to + the query. + + """ return query_by_type('tool', filters) def vulnerabilities(filters=None): + """Retrieve all Vulnerability objects. + + Args: + filters (list, optional): A list of additional filters to apply to + the query. + + """ return query_by_type('vulnerability', filters)