cti-python-stix2/stix2/workbench.py

323 lines
9.0 KiB
Python
Raw Normal View History

2018-11-28 22:51:00 +01:00
"""Functions and class wrappers for interacting with STIX2 data at a high level.
.. autofunction:: create
.. autofunction:: set_default_creator
.. autofunction:: set_default_created
.. autofunction:: set_default_external_refs
.. autofunction:: set_default_object_marking_refs
.. autofunction:: get
.. autofunction:: all_versions
.. autofunction:: query
.. autofunction:: creator_of
.. autofunction:: relationships
.. autofunction:: related_to
.. autofunction:: save
.. autofunction:: add_filters
.. autofunction:: add_filter
.. autofunction:: parse
.. autofunction:: add_data_source
.. autofunction:: add_data_sources
"""
import stix2
from . import AttackPattern as _AttackPattern
from . import Campaign as _Campaign
from . import CourseOfAction as _CourseOfAction
from . import Identity as _Identity
from . import Indicator as _Indicator
from . import IntrusionSet as _IntrusionSet
from . import Malware as _Malware
from . import ObservedData as _ObservedData
from . import Report as _Report
from . import ThreatActor as _ThreatActor
from . import Tool as _Tool
from . import Vulnerability as _Vulnerability
from . import ( # noqa: F401
AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem,
Bundle, CustomExtension, CustomMarking, CustomObservable,
Directory, DomainName, EmailAddress, EmailMessage,
EmailMIMEComponent, Environment, ExternalReference, File,
FileSystemSource, Filter, GranularMarking, HTTPRequestExt,
ICMPExt, IPv4Address, IPv6Address, KillChainPhase, MACAddress,
MarkingDefinition, MemoryStore, Mutex, NetworkTraffic, NTFSExt,
parse_observable, PDFExt, Process, RasterImageExt, Relationship,
Sighting, SocketExt, Software, StatementMarking,
TAXIICollectionSource, TCPExt, TLP_AMBER, TLP_GREEN, TLP_RED,
TLP_WHITE, TLPMarking, UNIXAccountExt, URL, UserAccount,
WindowsPEBinaryExt, WindowsPEOptionalHeaderType,
WindowsPESection, WindowsProcessExt, WindowsRegistryKey,
WindowsRegistryValueType, WindowsServiceExt, X509Certificate,
X509V3ExtenstionsType
)
2018-04-13 17:08:03 +02:00
from .datastore.filters import FilterSet
# Use an implicit MemoryStore
_environ = Environment(store=MemoryStore())
create = _environ.create
set_default_creator = _environ.set_default_creator
set_default_created = _environ.set_default_created
set_default_external_refs = _environ.set_default_external_refs
set_default_object_marking_refs = _environ.set_default_object_marking_refs
get = _environ.get
all_versions = _environ.all_versions
query = _environ.query
creator_of = _environ.creator_of
relationships = _environ.relationships
related_to = _environ.related_to
save = _environ.add
add_filters = _environ.add_filters
add_filter = _environ.add_filter
parse = _environ.parse
add_data_source = _environ.source.add_data_source
add_data_sources = _environ.source.add_data_sources
2017-11-29 20:12:54 +01:00
# Wrap SDOs with helper functions
STIX_OBJS = [
_AttackPattern, _Campaign, _CourseOfAction, _Identity,
_Indicator, _IntrusionSet, _Malware, _ObservedData, _Report,
_ThreatActor, _Tool, _Vulnerability,
]
STIX_OBJ_DOCS = """
.. method:: created_by(*args, **kwargs)
{}
.. method:: relationships(*args, **kwargs)
{}
.. method:: related(*args, **kwargs)
{}
""".format(
_environ.creator_of.__doc__,
_environ.relationships.__doc__,
_environ.related_to.__doc__
)
2018-03-21 18:56:50 +01:00
def _created_by_wrapper(self, *args, **kwargs):
2017-11-29 20:12:54 +01:00
return _environ.creator_of(self, *args, **kwargs)
2018-03-21 18:56:50 +01:00
def _relationships_wrapper(self, *args, **kwargs):
2017-11-29 20:12:54 +01:00
return _environ.relationships(self, *args, **kwargs)
2018-03-21 18:56:50 +01:00
def _related_wrapper(self, *args, **kwargs):
2017-11-29 20:12:54 +01:00
return _environ.related_to(self, *args, **kwargs)
def _observed_data_init(self, *args, **kwargs):
self.__allow_custom = kwargs.get('allow_custom', False)
self._properties['objects'].allow_custom = kwargs.get('allow_custom', False)
super(self.__class__, self).__init__(*args, **kwargs)
2018-03-21 18:56:50 +01:00
def _constructor_wrapper(obj_type):
# Use an intermediate wrapper class so the implicit environment will create objects that have our wrapper functions
class_dict = dict(
2018-03-21 18:56:50 +01:00
created_by=_created_by_wrapper,
relationships=_relationships_wrapper,
related=_related_wrapper,
**obj_type.__dict__
)
# Avoid TypeError about super() in ObservedData
if 'ObservedData' in obj_type.__name__:
class_dict['__init__'] = _observed_data_init
wrapped_type = type(obj_type.__name__, obj_type.__bases__, class_dict)
@staticmethod
def new_constructor(cls, *args, **kwargs):
x = _environ.create(wrapped_type, *args, **kwargs)
return x
return new_constructor
2017-11-29 20:12:54 +01:00
def _setup_workbench():
# Create wrapper classes whose constructors call the implicit environment's create()
for obj_type in STIX_OBJS:
new_class_dict = {
'__new__': _constructor_wrapper(obj_type),
'__doc__': 'Workbench wrapper around the `{0} <stix2.v20.sdo.rst#stix2.v20.sdo.{0}>`__ object. {1}'.format(obj_type.__name__, STIX_OBJ_DOCS),
}
new_class = type(obj_type.__name__, (), new_class_dict)
# Add our new class to this module's globals and to the library-wide mapping.
# This allows parse() to use the wrapped classes.
globals()[obj_type.__name__] = new_class
stix2.OBJ_MAP[obj_type._type] = new_class
new_class = None
_setup_workbench()
2017-11-29 20:12:54 +01:00
# Functions to get all objects of a specific type
2018-04-05 16:07:35 +02:00
def attack_patterns(filters=None):
"""Retrieve all Attack Pattern objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'attack-pattern'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def campaigns(filters=None):
"""Retrieve all Campaign objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'campaign'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def courses_of_action(filters=None):
"""Retrieve all Course of Action objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'course-of-action'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def identities(filters=None):
"""Retrieve all Identity objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'identity'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def indicators(filters=None):
"""Retrieve all Indicator objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'indicator'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def intrusion_sets(filters=None):
"""Retrieve all Intrusion Set objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'intrusion-set'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def malware(filters=None):
"""Retrieve all Malware objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'malware'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def observed_data(filters=None):
"""Retrieve all Observed Data objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'observed-data'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def reports(filters=None):
"""Retrieve all Report objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
2018-04-13 20:21:44 +02:00
filter_list.add(Filter('type', '=', 'report'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def threat_actors(filters=None):
"""Retrieve all Threat Actor objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'threat-actor'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def tools(filters=None):
"""Retrieve all Tool objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'tool'))
return query(filter_list)
2018-04-05 16:07:35 +02:00
def vulnerabilities(filters=None):
"""Retrieve all Vulnerability objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
2018-04-13 17:08:03 +02:00
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'vulnerability'))
return query(filter_list)