cti-python-stix2/stix2/workbench.py

317 lines
9.2 KiB
Python
Raw Normal View History

"""Functions and class wrappers for interacting with STIX data at a high level.
.. autofunction:: create
.. autofunction:: set_default_creator
.. autofunction:: set_default_created
.. autofunction:: set_default_external_refs
.. autofunction:: set_default_object_marking_refs
.. autofunction:: get
.. autofunction:: all_versions
.. autofunction:: query
.. autofunction:: creator_of
.. autofunction:: relationships
.. autofunction:: related_to
.. autofunction:: save
.. autofunction:: add_filters
.. autofunction:: add_filter
.. autofunction:: parse
.. autofunction:: add_data_source
.. autofunction:: add_data_sources
"""
import stix2
from . import AttackPattern as _AttackPattern
from . import Campaign as _Campaign
from . import CourseOfAction as _CourseOfAction
from . import Identity as _Identity
from . import Indicator as _Indicator
from . import IntrusionSet as _IntrusionSet
from . import Malware as _Malware
from . import ObservedData as _ObservedData
from . import Report as _Report
from . import ThreatActor as _ThreatActor
from . import Tool as _Tool
from . import Vulnerability as _Vulnerability
from . import (AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, # noqa: F401
Bundle, CustomExtension, CustomMarking, CustomObservable,
Directory, DomainName, EmailAddress, EmailMessage,
EmailMIMEComponent, Environment, ExtensionsProperty,
ExternalReference, File, FileSystemSource, Filter,
GranularMarking, HTTPRequestExt, ICMPExt, IPv4Address,
IPv6Address, KillChainPhase, MACAddress, MarkingDefinition,
MemoryStore, Mutex, NetworkTraffic, NTFSExt, parse_observable,
PDFExt, Process, RasterImageExt, Relationship, Sighting,
SocketExt, Software, StatementMarking, TAXIICollectionSource,
TCPExt, TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, TLPMarking,
UNIXAccountExt, URL, UserAccount, WindowsPEBinaryExt,
WindowsPEOptionalHeaderType, WindowsPESection,
WindowsProcessExt, WindowsRegistryKey, WindowsRegistryValueType,
WindowsServiceExt, X509Certificate, X509V3ExtenstionsType)
2018-04-13 17:08:03 +02:00
from .datastore.filters import FilterSet
# Use an implicit MemoryStore
_environ = Environment(store=MemoryStore())
create = _environ.create
set_default_creator = _environ.set_default_creator
set_default_created = _environ.set_default_created
set_default_external_refs = _environ.set_default_external_refs
set_default_object_marking_refs = _environ.set_default_object_marking_refs
get = _environ.get
all_versions = _environ.all_versions
query = _environ.query
creator_of = _environ.creator_of
relationships = _environ.relationships
related_to = _environ.related_to
save = _environ.add
add_filters = _environ.add_filters
add_filter = _environ.add_filter
parse = _environ.parse
add_data_source = _environ.source.add_data_source
add_data_sources = _environ.source.add_data_sources
2017-11-29 20:12:54 +01:00
# Wrap SDOs with helper functions
STIX_OBJS = [_AttackPattern, _Campaign, _CourseOfAction, _Identity,
_Indicator, _IntrusionSet, _Malware, _ObservedData, _Report,
_ThreatActor, _Tool, _Vulnerability]
STIX_OBJ_DOCS = """
.. method:: created_by(*args, **kwargs)
{}
.. method:: relationships(*args, **kwargs)
{}
.. method:: related(*args, **kwargs)
{}
""".format(_environ.creator_of.__doc__,
_environ.relationships.__doc__,
_environ.related_to.__doc__)
2018-03-21 18:56:50 +01:00
def _created_by_wrapper(self, *args, **kwargs):
2017-11-29 20:12:54 +01:00
return _environ.creator_of(self, *args, **kwargs)
2018-03-21 18:56:50 +01:00
def _relationships_wrapper(self, *args, **kwargs):
2017-11-29 20:12:54 +01:00
return _environ.relationships(self, *args, **kwargs)
2018-03-21 18:56:50 +01:00
def _related_wrapper(self, *args, **kwargs):
2017-11-29 20:12:54 +01:00
return _environ.related_to(self, *args, **kwargs)
def _observed_data_init(self, *args, **kwargs):
self.__allow_custom = kwargs.get('allow_custom', False)
self._properties['objects'].allow_custom = kwargs.get('allow_custom', False)
super(self.__class__, self).__init__(*args, **kwargs)
2018-03-21 18:56:50 +01:00
def _constructor_wrapper(obj_type):
# Use an intermediate wrapper class so the implicit environment will create objects that have our wrapper functions
class_dict = dict(
2018-03-21 18:56:50 +01:00
created_by=_created_by_wrapper,
relationships=_relationships_wrapper,
related=_related_wrapper,
**obj_type.__dict__
)
# Avoid TypeError about super() in ObservedData
if 'ObservedData' in obj_type.__name__:
class_dict['__init__'] = _observed_data_init
wrapped_type = type(obj_type.__name__, obj_type.__bases__, class_dict)
@staticmethod
def new_constructor(cls, *args, **kwargs):
x = _environ.create(wrapped_type, *args, **kwargs)
return x
return new_constructor
2017-11-29 20:12:54 +01:00
def _setup_workbench():
# Create wrapper classes whose constructors call the implicit environment's create()
for obj_type in STIX_OBJS:
new_class_dict = {
'__new__': _constructor_wrapper(obj_type),
2018-04-09 15:55:29 +02:00
'__doc__': 'Workbench wrapper around the `{0} <stix2.v20.sdo.rst#stix2.v20.sdo.{0}>`__ object. {1}'.format(obj_type.__name__, STIX_OBJ_DOCS)
}
new_class = type(obj_type.__name__, (), new_class_dict)
# Add our new class to this module's globals and to the library-wide mapping.
# This allows parse() to use the wrapped classes.
globals()[obj_type.__name__] = new_class
stix2.OBJ_MAP[obj_type._type] = new_class
new_class = None
_setup_workbench()
2017-11-29 20:12:54 +01:00
# Functions to get all objects of a specific type
2018-04-05 16:07:35 +02:00
def attack_patterns(filters=None):
"""Retrieve all Attack Pattern objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'attack-pattern'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def campaigns(filters=None):
"""Retrieve all Campaign objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'campaign'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def courses_of_action(filters=None):
"""Retrieve all Course of Action objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'course-of-action'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def identities(filters=None):
"""Retrieve all Identity objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'identity'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def indicators(filters=None):
"""Retrieve all Indicator objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'indicator'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def intrusion_sets(filters=None):
"""Retrieve all Intrusion Set objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'intrusion-set'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def malware(filters=None):
"""Retrieve all Malware objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'malware'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def observed_data(filters=None):
"""Retrieve all Observed Data objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'observed-data'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def reports(filters=None):
"""Retrieve all Report objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
2018-04-13 20:21:44 +02:00
filter_list.add(Filter('type', '=', 'report'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def threat_actors(filters=None):
"""Retrieve all Threat Actor objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'threat-actor'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def tools(filters=None):
"""Retrieve all Tool objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'tool'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def vulnerabilities(filters=None):
"""Retrieve all Vulnerability objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'vulnerability'))
return query(filter_list)