Make DataStore a regular class, remove unwanted overrides, update tests. Remove CustomProperty since it is no longer needed

stix2.0
Emmanuelle Vargas-Gonzalez 2017-11-08 13:53:21 -05:00
parent 489e45ad1b
commit da66f10147
6 changed files with 31 additions and 275 deletions

View File

@ -393,15 +393,3 @@ class PatternProperty(StringProperty):
raise ValueError(str(errors[0]))
return self.string_type(value)
class CustomProperty(Property):
"""
The custom property class can be used as a base to extend further
functionality of a custom property.
Note:
This class is used internally to signal the use of any custom property
that is parsed by any object or `parse()` method and allow_custom=True.
"""
pass

View File

@ -23,9 +23,9 @@ def make_id():
return str(uuid.uuid4())
class DataStore(with_metaclass(ABCMeta)):
"""An implementer will create a concrete subclass from
this class for the specific DataStore.
class DataStore(object):
"""An implementer can subclass to create custom behavior from
this class for the specific DataStores.
Args:
source (DataSource): An existing DataSource to use
@ -45,8 +45,7 @@ class DataStore(with_metaclass(ABCMeta)):
self.source = source
self.sink = sink
@abstractmethod
def get(self, stix_id): # pragma: no cover
def get(self, *args, **kwargs):
"""Retrieve the most recent version of a single STIX object by ID.
Translate get() call to the appropriate DataSource call.
@ -59,14 +58,12 @@ class DataStore(with_metaclass(ABCMeta)):
object specified by the "id".
"""
return NotImplementedError()
return self.source.get(*args, **kwargs)
@abstractmethod
def all_versions(self, stix_id): # pragma: no cover
def all_versions(self, *args, **kwargs):
"""Retrieve all versions of a single STIX object by ID.
Implement: Define a function that performs any custom behavior before
calling the associated DataSource all_versions() method.
Translate all_versions() call to the appropriate DataSource call.
Args:
stix_id (str): the id of the STIX object to retrieve.
@ -75,16 +72,12 @@ class DataStore(with_metaclass(ABCMeta)):
stix_objs (list): a list of STIX objects
"""
return NotImplementedError()
return self.source.all_versions(*args, **kwargs)
@abstractmethod
def query(self, query=None): # pragma: no cover
def query(self, *args, **kwargs):
"""Retrieve STIX objects matching a set of filters.
Implement: Specific data source API calls, processing,
functionality required for retrieving query from the data source.
Define custom behavior before calling the associated DataSource query()
Translate query() call to the appropriate DataSource call.
Args:
query (list): a list of filters (which collectively are the query)
@ -94,10 +87,9 @@ class DataStore(with_metaclass(ABCMeta)):
stix_objs (list): a list of STIX objects
"""
return NotImplementedError()
return self.source.query(*args, **kwargs)
@abstractmethod
def add(self, stix_objs): # pragma: no cover
def add(self, *args, **kwargs):
"""Method for storing STIX objects.
Define custom behavior before storing STIX objects using the associated
@ -107,7 +99,7 @@ class DataStore(with_metaclass(ABCMeta)):
stix_objs (list): a list of STIX objects
"""
return NotImplementedError()
return self.sink.add(*args, **kwargs)
class DataSink(with_metaclass(ABCMeta)):
@ -123,7 +115,7 @@ class DataSink(with_metaclass(ABCMeta)):
self.id = make_id()
@abstractmethod
def add(self, stix_objs): # pragma: no cover
def add(self, stix_objs):
"""Method for storing STIX objects.
Implement: Specific data sink API calls, processing,
@ -134,7 +126,6 @@ class DataSink(with_metaclass(ABCMeta)):
STIX object)
"""
raise NotImplementedError()
class DataSource(with_metaclass(ABCMeta)):
@ -152,7 +143,7 @@ class DataSource(with_metaclass(ABCMeta)):
self.filters = set()
@abstractmethod
def get(self, stix_id): # pragma: no cover
def get(self, stix_id):
"""
Implement: Specific data source API calls, processing,
functionality required for retrieving data from the data source
@ -166,10 +157,9 @@ class DataSource(with_metaclass(ABCMeta)):
stix_obj: the STIX object
"""
raise NotImplementedError()
@abstractmethod
def all_versions(self, stix_id): # pragma: no cover
def all_versions(self, stix_id):
"""
Implement: Similar to get() except returns list of all object versions
of the specified "id". In addition, implement the specific data
@ -185,10 +175,9 @@ class DataSource(with_metaclass(ABCMeta)):
stix_objs (list): a list of STIX objects
"""
raise NotImplementedError()
@abstractmethod
def query(self, query=None): # pragma: no cover
def query(self, query=None):
"""
Implement: The specific data source API calls, processing,
functionality required for retrieving query from the data source
@ -201,7 +190,6 @@ class DataSource(with_metaclass(ABCMeta)):
stix_objs (list): a list of STIX objects
"""
raise NotImplementedError()
class CompositeDataSource(DataSource):

View File

@ -31,88 +31,10 @@ class FileSystemStore(DataStore):
"""
def __init__(self, stix_dir, bundlify=False):
super(FileSystemStore, self).__init__()
self.source = FileSystemSource(stix_dir=stix_dir)
self.sink = FileSystemSink(stix_dir=stix_dir, bundlify=bundlify)
def get(self, stix_id, allow_custom=False, version=None, _composite_filters=None):
"""Retrieve the most recent version of a single STIX object by ID.
Translate get() call to the appropriate DataSource call.
Args:
stix_id (str): the id of the STIX object to retrieve.
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
stix_obj: the single most recent version of the STIX
object specified by the "id".
"""
return self.source.get(stix_id, allow_custom=allow_custom, version=version, _composite_filters=_composite_filters)
def all_versions(self, stix_id, allow_custom=False, version=None, _composite_filters=None):
"""Retrieve all versions of a single STIX object by ID.
Implement: Translate all_versions() call to the appropriate DataSource
call.
Args:
stix_id (str): the id of the STIX object to retrieve.
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
stix_objs (list): a list of STIX objects
"""
return self.source.all_versions(stix_id, allow_custom=allow_custom, version=version, _composite_filters=_composite_filters)
def query(self, query=None, allow_custom=False, version=None, _composite_filters=None):
"""Retrieve STIX objects matching a set of filters.
Implement: Specific data source API calls, processing,
functionality required for retrieving query from the data source.
Args:
query (list): a list of filters (which collectively are the query)
to conduct search on.
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
stix_objs (list): a list of STIX objects
"""
return self.source.query(query=query, allow_custom=allow_custom, version=version, _composite_filters=_composite_filters)
def add(self, stix_objs, allow_custom=False, version=None):
"""Store STIX objects.
Translates add() to the appropriate DataSink call.
Args:
stix_objs (list): a list of STIX objects
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
return self.sink.add(stix_objs, allow_custom=allow_custom, version=version)
super(FileSystemStore, self).__init__(
source=FileSystemSource(stix_dir=stix_dir),
sink=FileSystemSink(stix_dir=stix_dir, bundlify=bundlify)
)
class FileSystemSink(DataSink):

View File

@ -93,14 +93,15 @@ class MemoryStore(DataStore):
"""
def __init__(self, stix_data=None, allow_custom=False, version=None):
super(MemoryStore, self).__init__()
self._data = {}
if stix_data:
_add(self, stix_data, allow_custom=allow_custom, version=version)
self.source = MemorySource(stix_data=self._data, allow_custom=allow_custom, version=version, _store=True)
self.sink = MemorySink(stix_data=self._data, allow_custom=allow_custom, version=version, _store=True)
super(MemoryStore, self).__init__(
source=MemorySource(stix_data=self._data, allow_custom=allow_custom, version=version, _store=True),
sink=MemorySink(stix_data=self._data, allow_custom=allow_custom, version=version, _store=True)
)
def save_to_file(self, file_path, allow_custom=False):
"""Write SITX objects from in-memory dictionary to JSON file, as a STIX
@ -130,67 +131,6 @@ class MemoryStore(DataStore):
"""
return self.source.load_from_file(file_path=file_path, allow_custom=allow_custom, version=version)
def get(self, stix_id, _composite_filters=None):
"""Retrieve the most recent version of a single STIX object by ID.
Translate get() call to the appropriate DataSource call.
Args:
stix_id (str): the id of the STIX object to retrieve.
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
Returns:
stix_obj: the single most recent version of the STIX
object specified by the "id".
"""
return self.source.get(stix_id, _composite_filters=_composite_filters)
def all_versions(self, stix_id, _composite_filters=None):
"""Retrieve all versions of a single STIX object by ID.
Translate all_versions() call to the appropriate DataSource call.
Args:
stix_id (str): the id of the STIX object to retrieve.
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
Returns:
stix_objs (list): a list of STIX objects
"""
return self.source.all_versions(stix_id, _composite_filters=_composite_filters)
def query(self, query=None, _composite_filters=None):
"""Retrieve STIX objects matching a set of filters.
Translates query() to appropriate DataStore call.
Args:
query (list): a list of filters (which collectively are the query)
to conduct search on.
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
Returns:
stix_objs (list): a list of STIX objects
"""
return self.source.query(query=query, _composite_filters=_composite_filters)
def add(self, stix_objs, allow_custom=False, version=None):
"""Store STIX objects.
Translates add() to the appropriate DataSink call.
Args:
stix_objs (list): a list of STIX objects
"""
return self.sink.add(stix_objs, allow_custom=allow_custom, version=version)
class MemorySink(DataSink):
"""Interface for adding/pushing STIX objects to an in-memory dictionary.

View File

@ -20,86 +20,10 @@ class TAXIICollectionStore(DataStore):
collection (taxii2.Collection): TAXII Collection instance
"""
def __init__(self, collection):
super(TAXIICollectionStore, self).__init__()
self.source = TAXIICollectionSource(collection)
self.sink = TAXIICollectionSink(collection)
def get(self, stix_id, allow_custom=False, version=None, _composite_filters=None):
"""Retrieve the most recent version of a single STIX object by ID.
Translate get() call to the appropriate DataSource call.
Args:
stix_id (str): the id of the STIX object to retrieve.
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
stix_obj: the single most recent version of the STIX
object specified by the "id".
"""
return self.source.get(stix_id, allow_custom=allow_custom, version=version, _composite_filters=_composite_filters)
def all_versions(self, stix_id, allow_custom=False, version=None, _composite_filters=None):
"""Retrieve all versions of a single STIX object by ID.
Translate all_versions() to the appropriate DataSource call.
Args:
stix_id (str): the id of the STIX object to retrieve.
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
stix_objs (list): a list of STIX objects
"""
return self.source.all_versions(stix_id, allow_custom=allow_custom, version=version, _composite_filters=_composite_filters)
def query(self, query=None, allow_custom=False, version=None, _composite_filters=None):
"""Retrieve STIX objects matching a set of filters.
Translate query() to the appropriate DataSource call.
Args:
query (list): a list of filters (which collectively are the query)
to conduct search on.
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
stix_objs (list): a list of STIX objects
"""
return self.source.query(query=query, allow_custom=allow_custom, version=version, _composite_filters=_composite_filters)
def add(self, stix_objs, allow_custom=False, version=None):
"""Store STIX objects.
Translate add() to the appropriate DataSink call.
Args:
stix_objs (list): a list of STIX objects
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
return self.sink.add(stix_objs, allow_custom=allow_custom, version=version)
super(TAXIICollectionStore, self).__init__(
source=TAXIICollectionSource(collection),
sink=TAXIICollectionSink(collection)
)
class TAXIICollectionSink(DataSink):

View File

@ -2,8 +2,8 @@ import pytest
from taxii2client import Collection
from stix2 import Filter, MemorySink, MemorySource
from stix2.sources import (CompositeDataSource, DataSink, DataSource,
DataStore, make_id, taxii)
from stix2.sources import (CompositeDataSource, DataSink, DataSource, make_id,
taxii)
from stix2.sources.filters import apply_common_filters
from stix2.utils import deduplicate
@ -122,12 +122,6 @@ STIX_OBJS1 = [IND1, IND2, IND3, IND4, IND5]
def test_ds_abstract_class_smoke():
with pytest.raises(TypeError):
DataStore()
with pytest.raises(TypeError):
DataStore.get()
with pytest.raises(TypeError):
DataSource()