"""Functions and class wrappers for interacting with STIX data at a high level. .. autofunction:: create .. autofunction:: set_default_creator .. autofunction:: set_default_created .. autofunction:: set_default_external_refs .. autofunction:: set_default_object_marking_refs .. autofunction:: get .. autofunction:: all_versions .. autofunction:: query .. autofunction:: creator_of .. autofunction:: relationships .. autofunction:: related_to .. autofunction:: save .. autofunction:: add_filters .. autofunction:: add_filter .. autofunction:: parse .. autofunction:: add_data_source .. autofunction:: add_data_sources """ import stix2 from . import AttackPattern as _AttackPattern from . import Campaign as _Campaign from . import CourseOfAction as _CourseOfAction from . import Identity as _Identity from . import Indicator as _Indicator from . import IntrusionSet as _IntrusionSet from . import Malware as _Malware from . import ObservedData as _ObservedData from . import Report as _Report from . import ThreatActor as _ThreatActor from . import Tool as _Tool from . import Vulnerability as _Vulnerability from . import (AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, # noqa: F401 Bundle, CustomExtension, CustomMarking, CustomObservable, Directory, DomainName, EmailAddress, EmailMessage, EmailMIMEComponent, Environment, ExtensionsProperty, ExternalReference, File, FileSystemSource, Filter, GranularMarking, HTTPRequestExt, ICMPExt, IPv4Address, IPv6Address, KillChainPhase, MACAddress, MarkingDefinition, MemoryStore, Mutex, NetworkTraffic, NTFSExt, parse_observable, PDFExt, Process, RasterImageExt, Relationship, Sighting, SocketExt, Software, StatementMarking, TAXIICollectionSource, TCPExt, TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, TLPMarking, UNIXAccountExt, URL, UserAccount, WindowsPEBinaryExt, WindowsPEOptionalHeaderType, WindowsPESection, WindowsProcessExt, WindowsRegistryKey, WindowsRegistryValueType, WindowsServiceExt, X509Certificate, X509V3ExtenstionsType) from .datastore.filters import FilterSet # Use an implicit MemoryStore _environ = Environment(store=MemoryStore()) create = _environ.create set_default_creator = _environ.set_default_creator set_default_created = _environ.set_default_created set_default_external_refs = _environ.set_default_external_refs set_default_object_marking_refs = _environ.set_default_object_marking_refs get = _environ.get all_versions = _environ.all_versions query = _environ.query creator_of = _environ.creator_of relationships = _environ.relationships related_to = _environ.related_to save = _environ.add add_filters = _environ.add_filters add_filter = _environ.add_filter parse = _environ.parse add_data_source = _environ.source.add_data_source add_data_sources = _environ.source.add_data_sources # Wrap SDOs with helper functions STIX_OBJS = [_AttackPattern, _Campaign, _CourseOfAction, _Identity, _Indicator, _IntrusionSet, _Malware, _ObservedData, _Report, _ThreatActor, _Tool, _Vulnerability] STIX_OBJ_DOCS = """ .. method:: created_by(*args, **kwargs) {} .. method:: relationships(*args, **kwargs) {} .. method:: related(*args, **kwargs) {} """.format(_environ.creator_of.__doc__, _environ.relationships.__doc__, _environ.related_to.__doc__) def _created_by_wrapper(self, *args, **kwargs): return _environ.creator_of(self, *args, **kwargs) def _relationships_wrapper(self, *args, **kwargs): return _environ.relationships(self, *args, **kwargs) def _related_wrapper(self, *args, **kwargs): return _environ.related_to(self, *args, **kwargs) def _observed_data_init(self, *args, **kwargs): self.__allow_custom = kwargs.get('allow_custom', False) self._properties['objects'].allow_custom = kwargs.get('allow_custom', False) super(self.__class__, self).__init__(*args, **kwargs) def _constructor_wrapper(obj_type): # Use an intermediate wrapper class so the implicit environment will create objects that have our wrapper functions class_dict = dict( created_by=_created_by_wrapper, relationships=_relationships_wrapper, related=_related_wrapper, **obj_type.__dict__ ) # Avoid TypeError about super() in ObservedData if 'ObservedData' in obj_type.__name__: class_dict['__init__'] = _observed_data_init wrapped_type = type(obj_type.__name__, obj_type.__bases__, class_dict) @staticmethod def new_constructor(cls, *args, **kwargs): x = _environ.create(wrapped_type, *args, **kwargs) return x return new_constructor def _setup_workbench(): # Create wrapper classes whose constructors call the implicit environment's create() for obj_type in STIX_OBJS: new_class_dict = { '__new__': _constructor_wrapper(obj_type), '__doc__': 'Workbench wrapper around the `{0} `__ object. {1}'.format(obj_type.__name__, STIX_OBJ_DOCS) } new_class = type(obj_type.__name__, (), new_class_dict) # Add our new class to this module's globals and to the library-wide mapping. # This allows parse() to use the wrapped classes. globals()[obj_type.__name__] = new_class stix2.OBJ_MAP[obj_type._type] = new_class new_class = None _setup_workbench() # Functions to get all objects of a specific type def attack_patterns(filters=None): """Retrieve all Attack Pattern objects. Args: filters (list, optional): A list of additional filters to apply to the query. """ filter_list = FilterSet(filters) filter_list.add(Filter('type', '=', 'attack-pattern')) return query(filter_list) def campaigns(filters=None): """Retrieve all Campaign objects. Args: filters (list, optional): A list of additional filters to apply to the query. """ filter_list = FilterSet(filters) filter_list.add(Filter('type', '=', 'campaign')) return query(filter_list) def courses_of_action(filters=None): """Retrieve all Course of Action objects. Args: filters (list, optional): A list of additional filters to apply to the query. """ filter_list = FilterSet(filters) filter_list.add(Filter('type', '=', 'course-of-action')) return query(filter_list) def identities(filters=None): """Retrieve all Identity objects. Args: filters (list, optional): A list of additional filters to apply to the query. """ filter_list = FilterSet(filters) filter_list.add(Filter('type', '=', 'identity')) return query(filter_list) def indicators(filters=None): """Retrieve all Indicator objects. Args: filters (list, optional): A list of additional filters to apply to the query. """ filter_list = FilterSet(filters) filter_list.add(Filter('type', '=', 'indicator')) return query(filter_list) def intrusion_sets(filters=None): """Retrieve all Intrusion Set objects. Args: filters (list, optional): A list of additional filters to apply to the query. """ filter_list = FilterSet(filters) filter_list.add(Filter('type', '=', 'intrusion-set')) return query(filter_list) def malware(filters=None): """Retrieve all Malware objects. Args: filters (list, optional): A list of additional filters to apply to the query. """ filter_list = FilterSet(filters) filter_list.add(Filter('type', '=', 'malware')) return query(filter_list) def observed_data(filters=None): """Retrieve all Observed Data objects. Args: filters (list, optional): A list of additional filters to apply to the query. """ filter_list = FilterSet(filters) filter_list.add(Filter('type', '=', 'observed-data')) return query(filter_list) def reports(filters=None): """Retrieve all Report objects. Args: filters (list, optional): A list of additional filters to apply to the query. """ filter_list = FilterSet(filters) filter_list.add(Filter('type', '=', 'report')) return query(filter_list) def threat_actors(filters=None): """Retrieve all Threat Actor objects. Args: filters (list, optional): A list of additional filters to apply to the query. """ filter_list = FilterSet(filters) filter_list.add(Filter('type', '=', 'threat-actor')) return query(filter_list) def tools(filters=None): """Retrieve all Tool objects. Args: filters (list, optional): A list of additional filters to apply to the query. """ filter_list = FilterSet(filters) filter_list.add(Filter('type', '=', 'tool')) return query(filter_list) def vulnerabilities(filters=None): """Retrieve all Vulnerability objects. Args: filters (list, optional): A list of additional filters to apply to the query. """ filter_list = FilterSet(filters) filter_list.add(Filter('type', '=', 'vulnerability')) return query(filter_list)