ABC for DataSink, DataStore and DataSource. Fixes across the concrete objects

stix2.0
Emmanuelle Vargas-Gonzalez 2017-11-02 21:29:25 -04:00
parent 37e9049536
commit 1e591a827d
4 changed files with 336 additions and 138 deletions

View File

@ -11,8 +11,11 @@
|
"""
from abc import ABCMeta, abstractmethod
import uuid
from six import with_metaclass
from stix2.utils import deduplicate
@ -20,95 +23,94 @@ def make_id():
return str(uuid.uuid4())
class DataStore(object):
class DataStore(with_metaclass(ABCMeta)):
"""An implementer will create a concrete subclass from
this class for the specific DataStore.
Args:
source (DataSource): An existing DataSource to use
as this DataStore's DataSource component
sink (DataSink): An existing DataSink to use
as this DataStore's DataSink component
Attributes:
id (str): A unique UUIDv4 to identify this DataStore.
source (DataSource): An object that implements DataSource class.
sink (DataSink): An object that implements DataSink class.
"""
def __init__(self, source=None, sink=None):
super(DataStore, self).__init__()
self.id = make_id()
self.source = source
self.sink = sink
def get(self, stix_id, allow_custom=False):
@abstractmethod
def get(self, stix_id): # pragma: no cover
"""Retrieve the most recent version of a single STIX object by ID.
Translate get() call to the appropriate DataSource call.
Args:
stix_id (str): the id of the STIX object to retrieve.
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
stix_obj: the single most recent version of the STIX
object specified by the "id".
"""
return self.source.get(stix_id, allow_custom=allow_custom)
return NotImplementedError()
def all_versions(self, stix_id, allow_custom=False):
@abstractmethod
def all_versions(self, stix_id): # pragma: no cover
"""Retrieve all versions of a single STIX object by ID.
Implement: Translate all_versions() call to the appropriate DataSource call
Implement: Define a function that performs any custom behavior before
calling the associated DataSource all_versions() method.
Args:
stix_id (str): the id of the STIX object to retrieve.
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
stix_objs (list): a list of STIX objects
"""
return self.source.all_versions(stix_id, allow_custom=allow_custom)
return NotImplementedError()
def query(self, query=None, allow_custom=False):
@abstractmethod
def query(self, query=None): # pragma: no cover
"""Retrieve STIX objects matching a set of filters.
Implement: Specific data source API calls, processing,
functionality required for retrieving query from the data source.
Define custom behavior before calling the associated DataSource query()
Args:
query (list): a list of filters (which collectively are the query)
to conduct search on.
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
stix_objs (list): a list of STIX objects
"""
return self.source.query(query=query)
return NotImplementedError()
def add(self, stix_objs, allow_custom=False):
"""Store STIX objects.
@abstractmethod
def add(self, stix_objs): # pragma: no cover
"""Method for storing STIX objects.
Translates add() to the appropriate DataSink call.
Define custom behavior before storing STIX objects using the associated
DataSink. Translates add() to the appropriate DataSink call.
Args:
stix_objs (list): a list of STIX objects
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
"""
return self.sink.add(stix_objs, allow_custom=allow_custom)
return NotImplementedError()
class DataSink(object):
class DataSink(with_metaclass(ABCMeta)):
"""An implementer will create a concrete subclass from
this class for the specific DataSink.
@ -117,10 +119,12 @@ class DataSink(object):
"""
def __init__(self):
super(DataSink, self).__init__()
self.id = make_id()
def add(self, stix_objs, allow_custom=False):
"""Store STIX objects.
@abstractmethod
def add(self, stix_objs): # pragma: no cover
"""Method for storing STIX objects.
Implement: Specific data sink API calls, processing,
functionality required for adding data to the sink
@ -128,28 +132,27 @@ class DataSink(object):
Args:
stix_objs (list): a list of STIX objects (where each object is a
STIX object)
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
"""
raise NotImplementedError()
class DataSource(object):
class DataSource(with_metaclass(ABCMeta)):
"""An implementer will create a concrete subclass from
this class for the specific DataSource.
Attributes:
id (str): A unique UUIDv4 to identify this DataSource.
_filters (set): A collection of filters attached to this DataSource.
filters (set): A collection of filters attached to this DataSource.
"""
def __init__(self):
super(DataSource, self).__init__()
self.id = make_id()
self.filters = set()
def get(self, stix_id, _composite_filters=None, allow_custom=False):
@abstractmethod
def get(self, stix_id): # pragma: no cover
"""
Implement: Specific data source API calls, processing,
functionality required for retrieving data from the data source
@ -158,10 +161,6 @@ class DataSource(object):
stix_id (str): the id of the STIX 2.0 object to retrieve. Should
return a single object, the most recent version of the object
specified by the "id".
_composite_filters (set): set of filters passed from the parent
the CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
stix_obj: the STIX object
@ -169,10 +168,11 @@ class DataSource(object):
"""
raise NotImplementedError()
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
@abstractmethod
def all_versions(self, stix_id): # pragma: no cover
"""
Implement: Similar to get() except returns list of all object versions of
the specified "id". In addition, implement the specific data
Implement: Similar to get() except returns list of all object versions
of the specified "id". In addition, implement the specific data
source API calls, processing, functionality required for retrieving
data from the data source.
@ -180,10 +180,6 @@ class DataSource(object):
stix_id (str): The id of the STIX 2.0 object to retrieve. Should
return a list of objects, all the versions of the object
specified by the "id".
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
stix_objs (list): a list of STIX objects
@ -191,18 +187,15 @@ class DataSource(object):
"""
raise NotImplementedError()
def query(self, query=None, _composite_filters=None, allow_custom=False):
@abstractmethod
def query(self, query=None): # pragma: no cover
"""
Implement:Implement the specific data source API calls, processing,
Implement: The specific data source API calls, processing,
functionality required for retrieving query from the data source
Args:
query (list): a list of filters (which collectively are the query)
to conduct search on
_composite_filters (set): a set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
to conduct search on.
Returns:
stix_objs (list): a list of STIX objects
@ -224,7 +217,7 @@ class CompositeDataSource(DataSource):
Attributes:
data_sources (dict): A dictionary of DataSource objects; to be
data_sources (list): A dictionary of DataSource objects; to be
controlled and used by the Data Source Controller object.
"""
@ -237,7 +230,7 @@ class CompositeDataSource(DataSource):
super(CompositeDataSource, self).__init__()
self.data_sources = []
def get(self, stix_id, _composite_filters=None, allow_custom=False):
def get(self, stix_id, _composite_filters=None):
"""Retrieve STIX object by STIX ID
Federated retrieve method, iterates through all DataSources
@ -253,9 +246,7 @@ class CompositeDataSource(DataSource):
stix_id (str): the id of the STIX object to retrieve.
_composite_filters (list): a list of filters passed from a
CompositeDataSource (i.e. if this CompositeDataSource is attached
to another parent CompositeDataSource), not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
to another parent CompositeDataSource), not user supplied.
Returns:
stix_obj: the STIX object to be returned.
@ -273,7 +264,7 @@ class CompositeDataSource(DataSource):
# for every configured Data Source, call its retrieve handler
for ds in self.data_sources:
data = ds.get(stix_id=stix_id, _composite_filters=all_filters, allow_custom=allow_custom)
data = ds.get(stix_id=stix_id, _composite_filters=all_filters)
if data:
all_data.append(data)
@ -288,22 +279,20 @@ class CompositeDataSource(DataSource):
return stix_obj
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
"""Retrieve STIX objects by STIX ID
def all_versions(self, stix_id, _composite_filters=None):
"""Retrieve all versions of a STIX object by STIX ID.
Federated all_versions retrieve method - iterates through all DataSources
defined in "data_sources"
Federated all_versions retrieve method - iterates through all
DataSources defined in "data_sources".
A composite data source will pass its attached filters to
each configured data source, pushing filtering to them to handle
each configured data source, pushing filtering to them to handle.
Args:
stix_id (str): id of the STIX objects to retrieve
stix_id (str): id of the STIX objects to retrieve.
_composite_filters (list): a list of filters passed from a
CompositeDataSource (i.e. if this CompositeDataSource is attached
to a parent CompositeDataSource), not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
CompositeDataSource (i.e. if this CompositeDataSource is
attached to a parent CompositeDataSource), not user supplied.
Returns:
all_data (list): list of STIX objects that have the specified id
@ -322,7 +311,7 @@ class CompositeDataSource(DataSource):
# retrieve STIX objects from all configured data sources
for ds in self.data_sources:
data = ds.all_versions(stix_id=stix_id, _composite_filters=all_filters, allow_custom=allow_custom)
data = ds.all_versions(stix_id=stix_id, _composite_filters=all_filters)
all_data.extend(data)
# remove exact duplicates (where duplicates are STIX 2.0 objects
@ -332,19 +321,17 @@ class CompositeDataSource(DataSource):
return all_data
def query(self, query=None, _composite_filters=None, allow_custom=False):
"""Retrieve STIX objects that match query
def query(self, query=None, _composite_filters=None):
"""Retrieve STIX objects that match a query.
Federate the query to all DataSources attached to the
Composite Data Source.
Args:
query (list): list of filters to search on
query (list): list of filters to search on.
_composite_filters (list): a list of filters passed from a
CompositeDataSource (i.e. if this CompositeDataSource is attached
to a parent CompositeDataSource), not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
CompositeDataSource (i.e. if this CompositeDataSource is
attached to a parent CompositeDataSource), not user supplied.
Returns:
all_data (list): list of STIX objects to be returned
@ -354,7 +341,7 @@ class CompositeDataSource(DataSource):
raise AttributeError('CompositeDataSource has no data sources')
if not query:
# dont mess with the query (i.e. convert to a set, as thats done
# don't mess with the query (i.e. convert to a set, as that's done
# within the specific DataSources that are called)
query = []
@ -369,7 +356,7 @@ class CompositeDataSource(DataSource):
# federate query to all attached data sources,
# pass composite filters to id
for ds in self.data_sources:
data = ds.query(query=query, _composite_filters=all_filters, allow_custom=allow_custom)
data = ds.query(query=query, _composite_filters=all_filters)
all_data.extend(data)
# remove exact duplicates (where duplicates are STIX 2.0

View File

@ -26,7 +26,7 @@ class FileSystemStore(DataStore):
Default: False.
Attributes:
source (FileSystemSource): FuleSystemSource
source (FileSystemSource): FileSystemSource
sink (FileSystemSink): FileSystemSink
"""
@ -35,6 +35,85 @@ class FileSystemStore(DataStore):
self.source = FileSystemSource(stix_dir=stix_dir)
self.sink = FileSystemSink(stix_dir=stix_dir, bundlify=bundlify)
def get(self, stix_id, allow_custom=False, version=None, _composite_filters=None):
"""Retrieve the most recent version of a single STIX object by ID.
Translate get() call to the appropriate DataSource call.
Args:
stix_id (str): the id of the STIX object to retrieve.
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
stix_obj: the single most recent version of the STIX
object specified by the "id".
"""
return self.source.get(stix_id, allow_custom=allow_custom, version=version, _composite_filters=_composite_filters)
def all_versions(self, stix_id, allow_custom=False, version=None, _composite_filters=None):
"""Retrieve all versions of a single STIX object by ID.
Implement: Translate all_versions() call to the appropriate DataSource
call.
Args:
stix_id (str): the id of the STIX object to retrieve.
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
stix_objs (list): a list of STIX objects
"""
return self.source.all_versions(stix_id, allow_custom=allow_custom, version=version, _composite_filters=_composite_filters)
def query(self, query=None, allow_custom=False, version=None, _composite_filters=None):
"""Retrieve STIX objects matching a set of filters.
Implement: Specific data source API calls, processing,
functionality required for retrieving query from the data source.
Args:
query (list): a list of filters (which collectively are the query)
to conduct search on.
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
stix_objs (list): a list of STIX objects
"""
return self.source.query(query=query, allow_custom=allow_custom, version=version, _composite_filters=_composite_filters)
def add(self, stix_objs, allow_custom=False, version=None):
"""Store STIX objects.
Translates add() to the appropriate DataSink call.
Args:
stix_objs (list): a list of STIX objects
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
return self.sink.add(stix_objs, allow_custom=allow_custom, version=version)
class FileSystemSink(DataSink):
"""Interface for adding/pushing STIX objects to file directory of STIX
@ -99,11 +178,11 @@ class FileSystemSink(DataSink):
self._check_path_and_write(stix_data)
elif isinstance(stix_data, (str, dict)):
stix_data = parse(stix_data, allow_custom, version)
stix_data = parse(stix_data, allow_custom=allow_custom, version=version)
if stix_data["type"] == "bundle":
# extract STIX objects
for stix_obj in stix_data.get("objects", []):
self.add(stix_obj)
self.add(stix_obj, allow_custom=allow_custom, version=version)
else:
# adding json-formatted STIX
self._check_path_and_write(stix_data)
@ -111,12 +190,12 @@ class FileSystemSink(DataSink):
elif isinstance(stix_data, Bundle):
# recursively add individual STIX objects
for stix_obj in stix_data.get("objects", []):
self.add(stix_obj)
self.add(stix_obj, allow_custom=allow_custom, version=version)
elif isinstance(stix_data, list):
# recursively add individual STIX objects
for stix_obj in stix_data:
self.add(stix_obj)
self.add(stix_obj, allow_custom=allow_custom, version=version)
else:
raise TypeError("stix_data must be a STIX object (or list of), "
@ -146,7 +225,7 @@ class FileSystemSource(DataSource):
def stix_dir(self):
return self._stix_dir
def get(self, stix_id, _composite_filters=None, allow_custom=False, version=None):
def get(self, stix_id, allow_custom=False, version=None, _composite_filters=None):
"""Retrieve STIX object from file directory via STIX ID.
Args:
@ -166,8 +245,7 @@ class FileSystemSource(DataSource):
"""
query = [Filter("id", "=", stix_id)]
all_data = self.query(query=query, _composite_filters=_composite_filters,
allow_custom=allow_custom, version=version)
all_data = self.query(query=query, allow_custom=allow_custom, version=version, _composite_filters=_composite_filters)
if all_data:
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0]
@ -176,7 +254,7 @@ class FileSystemSource(DataSource):
return stix_obj
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False, version=None):
def all_versions(self, stix_id, allow_custom=False, version=None, _composite_filters=None):
"""Retrieve STIX object from file directory via STIX ID, all versions.
Note: Since FileSystem sources/sinks don't handle multiple versions
@ -197,10 +275,9 @@ class FileSystemSource(DataSource):
a python STIX objects and then returned
"""
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters,
allow_custom=allow_custom, version=version)]
return [self.get(stix_id=stix_id, allow_custom=allow_custom, version=version, _composite_filters=_composite_filters)]
def query(self, query=None, _composite_filters=None, allow_custom=False, version=None):
def query(self, query=None, allow_custom=False, version=None, _composite_filters=None):
"""Search and retrieve STIX objects based on the complete query.
A "complete query" includes the filters from the query, the filters
@ -305,7 +382,7 @@ class FileSystemSource(DataSource):
all_data = deduplicate(all_data)
# parse python STIX objects from the STIX object dicts
stix_objs = [parse(stix_obj_dict, allow_custom, version) for stix_obj_dict in all_data]
stix_objs = [parse(stix_obj_dict, allow_custom=allow_custom, version=version) for stix_obj_dict in all_data]
return stix_objs

View File

@ -24,7 +24,7 @@ from stix2.sources import DataSink, DataSource, DataStore
from stix2.sources.filters import Filter, apply_common_filters
def _add(store, stix_data=None, allow_custom=False):
def _add(store, stix_data=None, allow_custom=False, version=None):
"""Add STIX objects to MemoryStore/Sink.
Adds STIX objects to an in-memory dictionary for fast lookup.
@ -34,6 +34,8 @@ def _add(store, stix_data=None, allow_custom=False):
stix_data (list OR dict OR STIX object): STIX objects to be added
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
if isinstance(stix_data, _STIXBase):
@ -44,25 +46,25 @@ def _add(store, stix_data=None, allow_custom=False):
if stix_data["type"] == "bundle":
# adding a json bundle - so just grab STIX objects
for stix_obj in stix_data.get("objects", []):
_add(store, stix_obj, allow_custom=allow_custom)
_add(store, stix_obj, allow_custom=allow_custom, version=version)
else:
# adding a json STIX object
store._data[stix_data["id"]] = stix_data
elif isinstance(stix_data, str):
# adding json encoded string of STIX content
stix_data = parse(stix_data, allow_custom=allow_custom)
stix_data = parse(stix_data, allow_custom=allow_custom, version=version)
if stix_data["type"] == "bundle":
# recurse on each STIX object in bundle
for stix_obj in stix_data.get("objects", []):
_add(store, stix_obj, allow_custom=allow_custom)
_add(store, stix_obj, allow_custom=allow_custom, version=version)
else:
_add(store, stix_data)
_add(store, stix_data, allow_custom=allow_custom, version=version)
elif isinstance(stix_data, list):
# STIX objects are in a list- recurse on each object
for stix_obj in stix_data:
_add(store, stix_obj, allow_custom=allow_custom)
_add(store, stix_obj, allow_custom=allow_custom, version=version)
else:
raise TypeError("stix_data must be a STIX object (or list of), JSON formatted STIX (or list of), or a JSON formatted STIX bundle")
@ -81,6 +83,8 @@ class MemoryStore(DataStore):
stix_data (list OR dict OR STIX object): STIX content to be added
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Attributes:
_data (dict): the in-memory dict that holds STIX objects
@ -88,15 +92,15 @@ class MemoryStore(DataStore):
sink (MemorySink): MemorySink
"""
def __init__(self, stix_data=None, allow_custom=False):
def __init__(self, stix_data=None, allow_custom=False, version=None):
super(MemoryStore, self).__init__()
self._data = {}
if stix_data:
_add(self, stix_data, allow_custom=allow_custom)
_add(self, stix_data, allow_custom=allow_custom, version=version)
self.source = MemorySource(stix_data=self._data, _store=True, allow_custom=allow_custom)
self.sink = MemorySink(stix_data=self._data, _store=True, allow_custom=allow_custom)
self.source = MemorySource(stix_data=self._data, allow_custom=allow_custom, version=version, _store=True)
self.sink = MemorySink(stix_data=self._data, allow_custom=allow_custom, version=version, _store=True)
def save_to_file(self, file_path, allow_custom=False):
"""Write SITX objects from in-memory dictionary to JSON file, as a STIX
@ -110,7 +114,7 @@ class MemoryStore(DataStore):
"""
return self.sink.save_to_file(file_path=file_path, allow_custom=allow_custom)
def load_from_file(self, file_path, allow_custom=False):
def load_from_file(self, file_path, allow_custom=False, version=None):
"""Load STIX data from JSON file.
File format is expected to be a single JSON
@ -120,9 +124,72 @@ class MemoryStore(DataStore):
file_path (str): file path to load STIX data from
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
return self.source.load_from_file(file_path=file_path, allow_custom=allow_custom)
return self.source.load_from_file(file_path=file_path, allow_custom=allow_custom, version=version)
def get(self, stix_id, _composite_filters=None):
"""Retrieve the most recent version of a single STIX object by ID.
Translate get() call to the appropriate DataSource call.
Args:
stix_id (str): the id of the STIX object to retrieve.
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
Returns:
stix_obj: the single most recent version of the STIX
object specified by the "id".
"""
return self.source.get(stix_id, _composite_filters=_composite_filters)
def all_versions(self, stix_id, _composite_filters=None):
"""Retrieve all versions of a single STIX object by ID.
Translate all_versions() call to the appropriate DataSource call.
Args:
stix_id (str): the id of the STIX object to retrieve.
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
Returns:
stix_objs (list): a list of STIX objects
"""
return self.source.all_versions(stix_id, _composite_filters=_composite_filters)
def query(self, query=None, _composite_filters=None):
"""Retrieve STIX objects matching a set of filters.
Translates query() to appropriate DataStore call.
Args:
query (list): a list of filters (which collectively are the query)
to conduct search on.
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
Returns:
stix_objs (list): a list of STIX objects
"""
return self.source.query(query=query, _composite_filters=_composite_filters)
def add(self, stix_objs, allow_custom=False, version=None):
"""Store STIX objects.
Translates add() to the appropriate DataSink call.
Args:
stix_objs (list): a list of STIX objects
"""
return self.sink.add(stix_objs, allow_custom=allow_custom, version=version)
class MemorySink(DataSink):
@ -146,17 +213,17 @@ class MemorySink(DataSink):
a MemorySource
"""
def __init__(self, stix_data=None, _store=False, allow_custom=False):
def __init__(self, stix_data=None, allow_custom=False, version=None, _store=False):
super(MemorySink, self).__init__()
self._data = {}
if _store:
self._data = stix_data
elif stix_data:
_add(self, stix_data, allow_custom=allow_custom)
_add(self, stix_data, allow_custom=allow_custom, version=version)
def add(self, stix_data, allow_custom=False):
_add(self, stix_data, allow_custom=allow_custom)
def add(self, stix_data, allow_custom=False, version=None):
_add(self, stix_data, allow_custom=allow_custom, version=version)
add.__doc__ = _add.__doc__
def save_to_file(self, file_path, allow_custom=False):
@ -190,24 +257,22 @@ class MemorySource(DataSource):
a MemorySink
"""
def __init__(self, stix_data=None, _store=False, allow_custom=False):
def __init__(self, stix_data=None, allow_custom=False, version=None, _store=False):
super(MemorySource, self).__init__()
self._data = {}
if _store:
self._data = stix_data
elif stix_data:
_add(self, stix_data, allow_custom=allow_custom)
_add(self, stix_data, allow_custom=allow_custom, version=version)
def get(self, stix_id, _composite_filters=None, allow_custom=False):
def get(self, stix_id, _composite_filters=None):
"""Retrieve STIX object from in-memory dict via STIX ID.
Args:
stix_id (str): The STIX ID of the STIX object to be retrieved.
composite_filters (set): set of filters passed from the parent
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
(dict OR STIX object): STIX object that has the supplied
@ -227,7 +292,7 @@ class MemorySource(DataSource):
# if there are filters from the composite level, process full query
query = [Filter("id", "=", stix_id)]
all_data = self.query(query=query, _composite_filters=_composite_filters, allow_custom=allow_custom)
all_data = self.query(query=query, _composite_filters=_composite_filters)
if all_data:
# reduce to most recent version
@ -237,7 +302,7 @@ class MemorySource(DataSource):
else:
return None
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
def all_versions(self, stix_id, _composite_filters=None):
"""Retrieve STIX objects from in-memory dict via STIX ID, all versions of it
Note: Since Memory sources/sinks don't handle multiple versions of a
@ -245,10 +310,8 @@ class MemorySource(DataSource):
Args:
stix_id (str): The STIX ID of the STIX 2 object to retrieve.
composite_filters (set): set of filters passed from the parent
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
(list): list of STIX objects that has the supplied ID. As the
@ -257,9 +320,9 @@ class MemorySource(DataSource):
is returned in the same form as it as added
"""
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters, allow_custom=allow_custom)]
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]
def query(self, query=None, _composite_filters=None, allow_custom=False):
def query(self, query=None, _composite_filters=None):
"""Search and retrieve STIX objects based on the complete query.
A "complete query" includes the filters from the query, the filters
@ -268,10 +331,8 @@ class MemorySource(DataSource):
Args:
query (list): list of filters to search on
composite_filters (set): set of filters passed from the
_composite_filters (set): set of filters passed from the
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
(list): list of STIX objects that matches the supplied
@ -284,7 +345,7 @@ class MemorySource(DataSource):
query = set()
else:
if not isinstance(query, list):
# make sure dont make set from a Filter object,
# make sure don't make set from a Filter object,
# need to make a set from a list of Filter objects (even if just one Filter)
query = [query]
query = set(query)
@ -300,8 +361,8 @@ class MemorySource(DataSource):
return all_data
def load_from_file(self, file_path, allow_custom=False):
def load_from_file(self, file_path, allow_custom=False, version=None):
file_path = os.path.abspath(file_path)
stix_data = json.load(open(file_path, "r"))
_add(self, stix_data, allow_custom=allow_custom)
_add(self, stix_data, allow_custom=allow_custom, version=version)
load_from_file.__doc__ = MemoryStore.load_from_file.__doc__

View File

@ -1,5 +1,5 @@
"""
Python STIX 2.x TaxiiCollectionStore
Python STIX 2.x TAXIICollectionStore
"""
from stix2.base import _STIXBase
@ -24,6 +24,71 @@ class TAXIICollectionStore(DataStore):
self.source = TAXIICollectionSource(collection)
self.sink = TAXIICollectionSink(collection)
def get(self, stix_id, allow_custom=False, version=None, _composite_filters=None):
"""Retrieve the most recent version of a single STIX object by ID.
Translate get() call to the appropriate DataSource call.
Args:
stix_id (str): the id of the STIX object to retrieve.
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
stix_obj: the single most recent version of the STIX
object specified by the "id".
"""
return self.source.get(stix_id, allow_custom=allow_custom, version=version, _composite_filters=_composite_filters)
def all_versions(self, stix_id):
"""Retrieve all versions of a single STIX object by ID.
Translate all_versions() to the appropriate DataSource call.
Args:
stix_id (str): the id of the STIX object to retrieve.
Returns:
stix_objs (list): a list of STIX objects
"""
return self.source.all_versions(stix_id)
def query(self, query=None):
"""Retrieve STIX objects matching a set of filters.
Translate query() to the appropriate DataSource call.
Args:
query (list): a list of filters (which collectively are the query)
to conduct search on.
Returns:
stix_objs (list): a list of STIX objects
"""
return self.source.query(query=query)
def add(self, stix_objs, allow_custom=False, version=None):
"""Store STIX objects.
Translate add() to the appropriate DataSink call.
Args:
stix_objs (list): a list of STIX objects
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
return self.sink.add(stix_objs, allow_custom=allow_custom, version=version)
class TAXIICollectionSink(DataSink):
"""Provides an interface for pushing STIX objects to a local/remote
@ -37,7 +102,7 @@ class TAXIICollectionSink(DataSink):
super(TAXIICollectionSink, self).__init__()
self.collection = collection
def add(self, stix_data, allow_custom=False):
def add(self, stix_data, allow_custom=False, version=None):
"""Add/push STIX content to TAXII Collection endpoint
Args:
@ -46,6 +111,8 @@ class TAXIICollectionSink(DataSink):
json encoded string, or list of any of the following
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
if isinstance(stix_data, _STIXBase):
@ -62,11 +129,11 @@ class TAXIICollectionSink(DataSink):
elif isinstance(stix_data, list):
# adding list of something - recurse on each
for obj in stix_data:
self.add(obj, allow_custom=allow_custom)
self.add(obj, allow_custom=allow_custom, version=version)
elif isinstance(stix_data, str):
# adding json encoded string of STIX content
stix_data = parse(stix_data, allow_custom=allow_custom)
stix_data = parse(stix_data, allow_custom=allow_custom, version=version)
if stix_data["type"] == "bundle":
bundle = dict(stix_data)
else:
@ -90,16 +157,18 @@ class TAXIICollectionSource(DataSource):
super(TAXIICollectionSource, self).__init__()
self.collection = collection
def get(self, stix_id, _composite_filters=None, allow_custom=False):
def get(self, stix_id, _composite_filters=None, allow_custom=False, version=None):
"""Retrieve STIX object from local/remote STIX Collection
endpoint.
Args:
stix_id (str): The STIX ID of the STIX object to be retrieved.
composite_filters (set): set of filters passed from the parent
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
(STIX object): STIX object that has the supplied STIX ID.
@ -121,7 +190,7 @@ class TAXIICollectionSource(DataSource):
stix_obj = list(apply_common_filters(stix_objs, query))
if len(stix_obj):
stix_obj = parse(stix_obj[0], allow_custom=allow_custom)
stix_obj = parse(stix_obj[0], allow_custom=allow_custom, version=version)
if stix_obj.id != stix_id:
# check - was added to handle erroneous TAXII servers
stix_obj = None
@ -130,16 +199,18 @@ class TAXIICollectionSource(DataSource):
return stix_obj
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
def all_versions(self, stix_id, allow_custom=False, version=None, _composite_filters=None):
"""Retrieve STIX object from local/remote TAXII Collection
endpoint, all versions of it
Args:
stix_id (str): The STIX ID of the STIX objects to be retrieved.
composite_filters (set): set of filters passed from the parent
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
(see query() as all_versions() is just a wrapper)
@ -154,14 +225,14 @@ class TAXIICollectionSource(DataSource):
all_data = self.query(query=query, _composite_filters=_composite_filters, allow_custom=allow_custom)
# parse STIX objects from TAXII returned json
all_data = [parse(stix_obj) for stix_obj in all_data]
all_data = [parse(stix_obj, allow_custom=allow_custom, version=version) for stix_obj in all_data]
# check - was added to handle erroneous TAXII servers
all_data_clean = [stix_obj for stix_obj in all_data if stix_obj.id == stix_id]
return all_data_clean
def query(self, query=None, _composite_filters=None, allow_custom=False):
def query(self, query=None, allow_custom=False, version=None, _composite_filters=None):
"""Search and retreive STIX objects based on the complete query
A "complete query" includes the filters from the query, the filters
@ -170,10 +241,12 @@ class TAXIICollectionSource(DataSource):
Args:
query (list): list of filters to search on
composite_filters (set): set of filters passed from the
_composite_filters (set): set of filters passed from the
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
(list): list of STIX objects that matches the supplied
@ -200,7 +273,7 @@ class TAXIICollectionSource(DataSource):
taxii_filters = self._parse_taxii_filters(query)
# query TAXII collection
all_data = self.collection.get_objects(filters=taxii_filters, allow_custom=allow_custom)["objects"]
all_data = self.collection.get_objects(filters=taxii_filters)["objects"]
# deduplicate data (before filtering as reduces wasted filtering)
all_data = deduplicate(all_data)
@ -209,7 +282,7 @@ class TAXIICollectionSource(DataSource):
all_data = list(apply_common_filters(all_data, query))
# parse python STIX objects from the STIX object dicts
stix_objs = [parse(stix_obj_dict, allow_custom=allow_custom) for stix_obj_dict in all_data]
stix_objs = [parse(stix_obj_dict, allow_custom=allow_custom, version=version) for stix_obj_dict in all_data]
return stix_objs