code changes brought about by discussion of pull request of 'bug_fixes' branch

stix2.1
= 2017-09-29 11:24:19 -04:00
parent 55943847fa
commit ffa2242878
9 changed files with 294 additions and 329 deletions

View File

@ -1,7 +1,7 @@
import copy
from .core import parse as _parse
from .sources import CompositeDataSource, DataSource, DataStore
from .sources import CompositeDataSource, DataStore
class ObjectFactory(object):
@ -132,10 +132,15 @@ class Environment(object):
def add_filters(self, *args, **kwargs):
try:
return self.source.add_filters(*args, **kwargs)
return self.source.filters.update(*args, **kwargs)
except AttributeError:
raise AttributeError('Environment has no data source')
def add_filter(self, *args, **kwargs):
try:
return self.source.filters.add(*args, **kwargs)
except AttributeError:
raise AttributeError('Environment has no data source')
add_filters.__doc__ = DataSource.add_filters.__doc__
def add(self, *args, **kwargs):
try:

View File

@ -7,103 +7,20 @@ Classes:
DataSource
CompositeDataSource
Notes:
Q: We have add_filters() but no remove_filter()
"""
import uuid
from six import iteritems
from stix2.sources.filters import (FILTER_OPS, FILTER_VALUE_TYPES,
STIX_COMMON_FIELDS, STIX_COMMON_FILTERS_MAP)
from stix2.utils import deduplicate
def make_id():
return str(uuid.uuid4())
def apply_common_filters(stix_objs, query):
"""Evaluate filters against a set of STIX 2.0 objects.
Supports only STIX 2.0 common property fields
Args:
stix_objs (list): list of STIX objects to apply the query to
query (set): set of filters (combined form complete query)
Returns:
(list): list of STIX objects that successfully evaluate against
the query.
"""
filtered_stix_objs = []
# evaluate objects against filter
for stix_obj in stix_objs:
clean = True
for filter_ in query:
# skip filter as filter was identified (when added) as
# not a common filter
if filter_.field not in STIX_COMMON_FIELDS:
raise ValueError("Error, field: {0} is not supported for filtering on.".format(filter_.field))
# For properties like granular_markings and external_references
# need to break the first property from the string.
if "." in filter_.field:
field = filter_.field.split(".")[0]
else:
field = filter_.field
# check filter "field" is in STIX object - if cant be
# applied due to STIX object, STIX object is discarded
# (i.e. did not make it through the filter)
if field not in stix_obj.keys():
clean = False
break
match = STIX_COMMON_FILTERS_MAP[filter_.field.split('.')[0]](filter_, stix_obj)
if not match:
clean = False
break
elif match == -1:
raise ValueError("Error, filter operator: {0} not supported for specified field: {1}".format(filter_.op, filter_.field))
# if object unmarked after all filters, add it
if clean:
filtered_stix_objs.append(stix_obj)
return filtered_stix_objs
def deduplicate(stix_obj_list):
"""Deduplicate a list of STIX objects to a unique set
Reduces a set of STIX objects to unique set by looking
at 'id' and 'modified' fields - as a unique object version
is determined by the combination of those fields
Args:
stix_obj_list (list): list of STIX objects (dicts)
Returns:
A list with a unique set of the passed list of STIX objects.
"""
unique_objs = {}
for obj in stix_obj_list:
unique_objs[(obj['id'], obj['modified'])] = obj
return list(unique_objs.values())
class DataStore(object):
"""DataStore
An implementer will create a concrete subclass from
"""An implementer will create a concrete subclass from
this class for the specific DataStore.
Args:
@ -181,22 +98,9 @@ class DataStore(object):
"""
return self.sink.add(stix_objs)
def add_filters(self, filters):
"""add query filters (to DataSource component)
Translates add_filters() to appropriate DataSource call.
Args:
filters (list or Filter obj): Filters to be added to DataStore
"""
return self.source.add_filters(filters)
class DataSink(object):
"""DataSink
An implementer will create a concrete subclass from
"""An implementer will create a concrete subclass from
this class for the specific DataSink.
Attributes:
@ -221,9 +125,7 @@ class DataSink(object):
class DataSource(object):
"""DataSource
An implementer will create a concrete subclass from
"""An implementer will create a concrete subclass from
this class for the specific DataSource.
Attributes:
@ -234,7 +136,7 @@ class DataSource(object):
"""
def __init__(self):
self.id = make_id()
self._filters = set()
self.filters = set()
def get(self, stix_id, _composite_filters=None):
"""
@ -294,37 +196,9 @@ class DataSource(object):
"""
raise NotImplementedError()
def add_filters(self, filters):
"""Add a filter to be applied to all queries for STIX objects.
Args:
filters (list or Filter obj): filter(s) to add to the Data Source.
"""
if isinstance(filters, list) or isinstance(filters, set):
for filter_ in filters:
self.add_filters(filter_)
else:
filter_ = filters
# check filter field is a supported STIX 2.0 common field
if filter_.field not in STIX_COMMON_FIELDS:
raise ValueError("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported")
# check filter operator is supported
if filter_.op not in FILTER_OPS:
raise ValueError("Filter operation (from 'op' field) not supported")
# check filter value type is supported
if type(filter_.value) not in FILTER_VALUE_TYPES:
raise ValueError("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary")
self._filters.add(filter_)
class CompositeDataSource(DataSource):
"""CompostiteDataSource
Controller for all the attached DataSources.
"""Controller for all the attached DataSources.
A user can have a single CompositeDataSource as an interface
the a set of DataSources. When an API call is made to the
@ -335,7 +209,6 @@ class CompositeDataSource(DataSource):
of reasons, e.g. common filters, organization, less API calls.
Attributes:
name (str): The name that identifies this CompositeDataSource.
data_sources (dict): A dictionary of DataSource objects; to be
controlled and used by the Data Source Controller object.
@ -345,12 +218,10 @@ class CompositeDataSource(DataSource):
"""Create a new STIX Data Source.
Args:
name (str): A string containing the name to attach in the
CompositeDataSource instance.
"""
super(CompositeDataSource, self).__init__()
self.data_sources = {}
self.data_sources = []
def get(self, stix_id, _composite_filters=None):
"""Retrieve STIX object by STIX ID
@ -375,18 +246,18 @@ class CompositeDataSource(DataSource):
stix_obj: the STIX object to be returned.
"""
if not self.get_all_data_sources():
if not self.has_data_sources():
raise AttributeError('CompositeDataSource has no data sources')
all_data = []
all_filters = set()
all_filters.update(self._filters)
all_filters.update(self.filters)
if _composite_filters:
all_filters.update(_composite_filters)
# for every configured Data Source, call its retrieve handler
for ds_id, ds in iteritems(self.data_sources):
for ds in self.data_sources:
data = ds.get(stix_id=stix_id, _composite_filters=all_filters)
all_data.append(data)
@ -419,19 +290,19 @@ class CompositeDataSource(DataSource):
all_data (list): list of STIX objects that have the specified id
"""
if not self.get_all_data_sources():
if not self.has_data_sources():
raise AttributeError('CompositeDataSource has no data sources')
all_data = []
all_filters = set()
all_filters.update(self._filters)
all_filters.update(self.filters)
if _composite_filters:
all_filters.update(_composite_filters)
# retrieve STIX objects from all configured data sources
for ds_id, ds in iteritems(self.data_sources):
for ds in self.data_sources:
data = ds.all_versions(stix_id=stix_id, _composite_filters=all_filters)
all_data.extend(data)
@ -459,7 +330,7 @@ class CompositeDataSource(DataSource):
all_data (list): list of STIX objects to be returned
"""
if not self.get_all_data_sources():
if not self.has_data_sources():
raise AttributeError('CompositeDataSource has no data sources')
if not query:
@ -470,14 +341,14 @@ class CompositeDataSource(DataSource):
all_data = []
all_filters = set()
all_filters.update(self._filters)
all_filters.update(self.filters)
if _composite_filters:
all_filters.update(_composite_filters)
# federate query to all attached data sources,
# pass composite filters to id
for ds_id, ds in iteritems(self.data_sources):
for ds in self.data_sources:
data = ds.query(query=query, _composite_filters=all_filters)
all_data.extend(data)
@ -488,45 +359,61 @@ class CompositeDataSource(DataSource):
return all_data
def add_data_source(self, data_sources):
"""Attach a DataSource to the CompositeDataSource instance
def add_data_source(self, data_source):
"""Attach a DataSource to CompositeDataSource instance
Args:
data_sources (list): a list of DataSource(s) to attach
data_source (DataSource): a stix2.DataSource to attach
to the CompositeDataSource
"""
if not isinstance(data_sources, list):
data_sources = [data_sources]
for ds in data_sources:
if issubclass(ds.__class__, DataSource):
if ds.id in self.data_sources:
# DataSource already attached to CompositeDataSource
continue
# add DataSource to CompositeDataSource, its ID is used as key
self.data_sources[ds.id] = ds
if issubclass(data_source.__class__, DataSource):
if data_source.id not in [ds_.id for ds_ in self.data_sources]:
# check DataSource not already attached CompositeDataSource
self.data_sources.append(data_source)
else:
# the Data Source object not a subclass of DataSource
# TODO: maybe log error?
continue
raise TypeError("DataSource (to be added) is not of type stix2.DataSource. DataSource type is '%s'" % type(data_source))
return
def remove_data_source(self, data_source_ids):
def add_data_sources(self, data_sources):
"""Attach list of DataSources to CompositeDataSource instance
Args:
data_sources (list): stix2.DataSources to attach to
CompositeDataSource
"""
for ds in data_sources:
self.add_data_source(ds)
return
def remove_data_source(self, data_source_id):
"""Remove DataSource from the CompositeDataSource instance
Args:
data_source_ids (list): a list of Data Source id(s).
data_source_id (str): DataSource IDs.
"""
for id in data_source_ids:
if id in self.data_sources:
del self.data_sources[id]
else:
raise ValueError("DataSource 'id' not found in CompositeDataSource.data_sources ")
def _match(ds_id, candidate_ds_id):
return ds_id == candidate_ds_id
self.data_sources[:] = [ds for ds in self.data_sources if not _match(ds.id, data_source_id)]
return
def remove_data_sources(self, data_source_ids):
"""Remove DataSources from the CompositeDataSource instance
Args:
data_source_ids (list): DataSource IDs
"""
for ds_id in data_source_ids:
self.remove_data_source(ds_id)
return
def has_data_sources(self):
return len(self.data_sources)
def get_all_data_sources(self):
"""Return all attached DataSource(s)"""
return self.data_sources.values()
return self.data_sources

View File

@ -14,9 +14,9 @@ import os
from stix2.base import _STIXBase
from stix2.core import Bundle, parse
from stix2.sources import (DataSink, DataSource, DataStore,
apply_common_filters, deduplicate)
from stix2.sources.filters import Filter
from stix2.sources import DataSink, DataSource, DataStore
from stix2.sources.filters import Filter, apply_common_filters
from stix2.utils import deduplicate
class FileSystemStore(DataStore):
@ -35,7 +35,7 @@ class FileSystemStore(DataStore):
sink (FileSystemSink): FileSystemSink
"""
def __init__(self, stix_dir="stix_data"):
def __init__(self, stix_dir):
super(FileSystemStore, self).__init__()
self.source = FileSystemSource(stix_dir=stix_dir)
self.sink = FileSystemSink(stix_dir=stix_dir)
@ -54,12 +54,12 @@ class FileSystemSink(DataSink):
stix_dir (str): path to directory of STIX objects
"""
def __init__(self, stix_dir="stix_data"):
def __init__(self, stix_dir):
super(FileSystemSink, self).__init__()
self._stix_dir = os.path.abspath(stix_dir)
if not os.path.exists(self._stix_dir):
print("Error: directory path for STIX data does not exist")
raise ValueError("directory path for STIX data does not exist")
@property
def stix_dir(self):
@ -111,6 +111,7 @@ class FileSystemSink(DataSink):
# if list, recurse call on individual STIX objects
for stix_obj in stix_data:
self.add(stix_obj)
else:
raise ValueError("stix_data must be a STIX object(or list of, json formatted STIX(or list of) or a json formatted STIX bundle")
@ -128,7 +129,7 @@ class FileSystemSource(DataSource):
stix_dir (str): path to directory of STIX objects
"""
def __init__(self, stix_dir="stix_data"):
def __init__(self, stix_dir):
super(FileSystemSource, self).__init__()
self._stix_dir = os.path.abspath(stix_dir)
@ -213,8 +214,8 @@ class FileSystemSource(DataSource):
query = set(query)
# combine all query filters
if self._filters:
query.update(self._filters)
if self.filters:
query.update(self.filters)
if _composite_filters:
query.update(_composite_filters)
@ -278,11 +279,13 @@ class FileSystemSource(DataSource):
# since ID is specified in one of filters, can evaluate against filename first without loading
stix_obj = json.load(open(os.path.join(root, file_)))["objects"][0]
# check against other filters, add if match
all_data.extend(apply_common_filters([stix_obj], query))
matches = [stix_obj_ for stix_obj_ in apply_common_filters([stix_obj], query)]
all_data.extend(matches)
else:
# have to load into memory regardless to evaluate other filters
stix_obj = json.load(open(os.path.join(root, file_)))["objects"][0]
all_data.extend(apply_common_filters([stix_obj], query))
matches = [stix_obj_ for stix_obj_ in apply_common_filters([stix_obj], query)]
all_data.extend(matches)
all_data = deduplicate(all_data)

View File

@ -41,13 +41,35 @@ FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple]
STIX_COMMON_FILTERS_MAP = {}
class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])):
"""Filter
def _check_filter_components(field, op, value):
"""check filter meets minimum validity
STIX 2 filters that support the querying functionality of STIX 2
Note: Currently can create Filters that are not valid
STIX2 object common properties, as filter.field value
is not checked, only filter.op, filter.value are checked
here. They are just ignored when
applied within the DataSource API. For example, a user
can add a TAXII Filter, that is extracted and sent to
a TAXII endpoint within TAXIICollection and not applied
locally (within this API).
"""
if op not in FILTER_OPS:
# check filter operator is supported
raise ValueError("Filter operator '%s' not supported for specified field: '%s'" % (op, field))
if type(value) not in FILTER_VALUE_TYPES:
# check filter value type is supported
raise TypeError("Filter value type '%s' is not supported. The type must be a python immutable type or dictionary" % type(value))
return True
class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])):
"""STIX 2 filters that support the querying functionality of STIX 2
DataStores and DataSources.
Initialized in the manner of python tuples
Initialized like a python tuple
Args:
field (str): filter field name, corresponds to STIX 2 object property
@ -66,9 +88,77 @@ class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])):
# If value is a list, convert it to a tuple so it is hashable.
if isinstance(value, list):
value = tuple(value)
_check_filter_components(field, op, value)
self = super(Filter, cls).__new__(cls, field, op, value)
return self
@property
def common(self):
"""return whether Filter is valid STIX2 Object common property
Note: The Filter operator and Filter value type are checked when
the filter is created, thus only leaving the Filter field to be
checked to make sure a valid STIX2 Object common property.
Note: Filters that are not valid STIX2 Object common property
Filters are still allowed to be created for extended usage of
Filter. (e.g. TAXII specific filters can be created, which are
then extracted and sent to TAXII endpoint.)
"""
return self.field in STIX_COMMON_FIELDS
def apply_common_filters(stix_objs, query):
"""Evaluate filters against a set of STIX 2.0 objects.
Supports only STIX 2.0 common property fields
Args:
stix_objs (list): list of STIX objects to apply the query to
query (set): set of filters (combined form complete query)
Returns:
(generator): of STIX objects that successfully evaluate against
the query.
"""
for stix_obj in stix_objs:
clean = True
for filter_ in query:
if not filter_.common:
# skip filter as it is not a STIX2 Object common property filter
continue
if "." in filter_.field:
# For properties like granular_markings and external_references
# need to extract the first property from the string.
field = filter_.field.split(".")[0]
else:
field = filter_.field
if field not in stix_obj.keys():
# check filter "field" is in STIX object - if cant be
# applied to STIX object, STIX object is discarded
# (i.e. did not make it through the filter)
clean = False
break
match = STIX_COMMON_FILTERS_MAP[filter_.field.split('.')[0]](filter_, stix_obj)
if not match:
clean = False
break
elif match == -1:
raise ValueError("Error, filter operator: {0} not supported for specified field: {1}".format(filter_.op, filter_.field))
# if object unmarked after all filters, add it
if clean:
yield stix_obj
"""Base type filters"""

View File

@ -24,8 +24,8 @@ import os
from stix2.base import _STIXBase
from stix2.core import Bundle, parse
from stix2.sources import DataSink, DataSource, DataStore, apply_common_filters
from stix2.sources.filters import Filter
from stix2.sources import DataSink, DataSource, DataStore
from stix2.sources.filters import Filter, apply_common_filters
def _add(store, stix_data=None):
@ -65,14 +65,13 @@ def _add(store, stix_data=None):
# STIX objects are in a list- recurse on each object
for stix_obj in stix_data:
_add(store, stix_obj)
else:
raise ValueError("stix_data must be as STIX object(or list of),json formatted STIX (or list of), or a json formatted STIX bundle")
raise TypeError("stix_data must be as STIX object(or list of),json formatted STIX (or list of), or a json formatted STIX bundle")
class MemoryStore(DataStore):
"""MemoryStore
Provides an interface to an in-memory dictionary
"""Provides an interface to an in-memory dictionary
of STIX objects. MemoryStore is a wrapper around a paired
MemorySink and MemorySource
@ -110,9 +109,7 @@ class MemoryStore(DataStore):
class MemorySink(DataSink):
"""MemorySink
Provides an interface for adding/pushing STIX objects
"""Provides an interface for adding/pushing STIX objects
to an in-memory dictionary.
Designed to be paired with a MemorySource, together as the two
@ -164,9 +161,7 @@ class MemorySink(DataSink):
class MemorySource(DataSource):
"""MemorySource
Provides an interface for searching/retrieving
"""Provides an interface for searching/retrieving
STIX objects from an in-memory dictionary.
Designed to be paired with a MemorySink, together as the two
@ -280,13 +275,13 @@ class MemorySource(DataSource):
query = set(query)
# combine all query filters
if self._filters:
query.update(self._filters)
if self.filters:
query.update(self.filters)
if _composite_filters:
query.update(_composite_filters)
# Apply STIX common property filters.
all_data = apply_common_filters(self._data.values(), query)
all_data = [stix_obj for stix_obj in apply_common_filters(self._data.values(), query)]
return all_data

View File

@ -12,16 +12,15 @@ TODO: Test everything
from stix2.base import _STIXBase
from stix2.core import Bundle, parse
from stix2.sources import DataSink, DataSource, DataStore, make_id
from stix2.sources.filters import Filter
from stix2.sources import DataSink, DataSource, DataStore
from stix2.sources.filters import Filter, apply_common_filters
from stix2.utils import deduplicate
TAXII_FILTERS = ['added_after', 'id', 'type', 'version']
class TAXIICollectionStore(DataStore):
"""TAXIICollectionStore
Provides an interface to a local/remote TAXII Collection
"""Provides an interface to a local/remote TAXII Collection
of STIX data. TAXIICollectionStore is a wrapper
around a paired TAXIICollectionSink and TAXIICollectionSource.
@ -35,9 +34,7 @@ class TAXIICollectionStore(DataStore):
class TAXIICollectionSink(DataSink):
"""TAXIICollectionSink
Provides an interface for pushing STIX objects to a local/remote
"""Provides an interface for pushing STIX objects to a local/remote
TAXII Collection endpoint.
Args:
@ -73,6 +70,7 @@ class TAXIICollectionSink(DataSink):
# adding list of something - recurse on each
for obj in stix_data:
self.add(obj)
elif isinstance(stix_data, str):
# adding json encoded string of STIX content
stix_data = parse(stix_data)
@ -81,21 +79,14 @@ class TAXIICollectionSink(DataSink):
else:
bundle = dict(Bundle(stix_data))
self.collection.add_objects(bundle)
else:
raise TypeError("stix_data must be as STIX object(or list of),json formatted STIX (or list of), or a json formatted STIX bundle")
@staticmethod
def create_bundle(objects):
"""TODO: Remove?"""
return dict(id="bundle--%s" % make_id(),
objects=objects,
spec_version="2.0",
type="bundle")
self.collection.add_objects(bundle)
class TAXIICollectionSource(DataSource):
"""TAXIICollectionSource
Provides an interface for searching/retrieving STIX objects
"""Provides an interface for searching/retrieving STIX objects
from a local/remote TAXII Collection endpoint.
Args:
@ -125,8 +116,8 @@ class TAXIICollectionSource(DataSource):
"""
# combine all query filters
query = set()
if self._filters:
query.update(self._filters)
if self.filters:
query.update(self.filters)
if _composite_filters:
query.update(_composite_filters)
@ -135,9 +126,9 @@ class TAXIICollectionSource(DataSource):
stix_objs = self.collection.get_object(stix_id, taxii_filters)["objects"]
stix_obj = self.apply_common_filters(stix_objs, query)
stix_obj = [stix_obj for stix_obj in apply_common_filters(stix_objs, query)]
if len(stix_obj) > 0:
if len(stix_obj):
stix_obj = stix_obj[0]
else:
stix_obj = None
@ -198,8 +189,8 @@ class TAXIICollectionSource(DataSource):
query = set(query)
# combine all query filters
if self._filters:
query.update(self.filters.values())
if self.filters:
query.update(self.filters)
if _composite_filters:
query.update(_composite_filters)
@ -210,10 +201,10 @@ class TAXIICollectionSource(DataSource):
all_data = self.collection.get_objects(filters=taxii_filters)["objects"]
# deduplicate data (before filtering as reduces wasted filtering)
all_data = self.deduplicate(all_data)
all_data = deduplicate(all_data)
# apply local (CompositeDataSource, TAXIICollectionSource and query filters)
all_data = self.apply_common_filters(all_data, query)
all_data = [stix_obj for stix_obj in apply_common_filters(all_data, query)]
# parse python STIX objects from the STIX object dicts
stix_objs = [parse(stix_obj_dict) for stix_obj_dict in all_data]

View File

@ -2,10 +2,10 @@ import pytest
from taxii2client import Collection
from stix2.sources import (CompositeDataSource, DataSink, DataSource,
DataStore, apply_common_filters, deduplicate,
make_id, taxii)
from stix2.sources.filters import Filter
DataStore, make_id, taxii)
from stix2.sources.filters import Filter, apply_common_filters
from stix2.sources.memory import MemorySource, MemoryStore
from stix2.utils import deduplicate
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
@ -207,47 +207,43 @@ def test_add_get_remove_filter(ds):
Filter('id', '!=', 'stix object id'),
Filter('labels', 'in', ["heartbleed", "malicious-activity"]),
]
invalid_filters = [
Filter('description', '=', 'not supported field - just place holder'),
Filter('modified', '*', 'not supported operator - just place holder'),
Filter('created', '=', object()),
]
assert len(ds._filters) == 0
# Invalid filters - wont pass creation
# these filters will not be allowed to be created
# check proper errors are raised when trying to create them
ds.add_filters(valid_filters[0])
assert len(ds._filters) == 1
with pytest.raises(ValueError) as excinfo:
# create Filter that has an operator that is not allowed
Filter('modified', '*', 'not supported operator - just place holder')
assert str(excinfo.value) == "Filter operator '*' not supported for specified field: 'modified'"
with pytest.raises(TypeError) as excinfo:
# create Filter that has a value type that is not allowed
Filter('created', '=', object())
assert str(excinfo.value) == "Filter value type '<type 'object'>' is not supported. The type must be a python immutable type or dictionary"
assert len(ds.filters) == 0
ds.filters.add(valid_filters[0])
assert len(ds.filters) == 1
# Addin the same filter again will have no effect since `filters` uses a set
ds.add_filters(valid_filters[0])
assert len(ds._filters) == 1
ds.filters.add(valid_filters[0])
assert len(ds.filters) == 1
ds.add_filters(valid_filters[1])
assert len(ds._filters) == 2
ds.add_filters(valid_filters[2])
assert len(ds._filters) == 3
ds.filters.add(valid_filters[1])
assert len(ds.filters) == 2
ds.filters.add(valid_filters[2])
assert len(ds.filters) == 3
# TODO: make better error messages
with pytest.raises(ValueError) as excinfo:
ds.add_filters(invalid_filters[0])
assert str(excinfo.value) == "Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported"
with pytest.raises(ValueError) as excinfo:
ds.add_filters(invalid_filters[1])
assert str(excinfo.value) == "Filter operation (from 'op' field) not supported"
with pytest.raises(ValueError) as excinfo:
ds.add_filters(invalid_filters[2])
assert str(excinfo.value) == "Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary"
assert set(valid_filters) == ds._filters
assert set(valid_filters) == ds.filters
# remove
ds._filters.remove(valid_filters[0])
ds.filters.remove(valid_filters[0])
assert len(ds._filters) == 2
assert len(ds.filters) == 2
ds.add_filters(valid_filters)
ds.filters.update(valid_filters)
def test_apply_common_filters(ds):
@ -321,7 +317,6 @@ def test_apply_common_filters(ds):
Filter("created", ">", "2015-01-01T01:00:00.000Z"),
Filter("revoked", "=", True),
Filter("revoked", "!=", True),
Filter("revoked", "?", False),
Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"),
Filter("granular_markings.selectors", "in", "relationship_type"),
Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"),
@ -333,7 +328,7 @@ def test_apply_common_filters(ds):
]
# "Return any object whose type is not relationship"
resp = apply_common_filters(stix_objs, [filters[0]])
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[0]])]
ids = [r['id'] for r in resp]
assert stix_objs[0]['id'] in ids
assert stix_objs[1]['id'] in ids
@ -341,136 +336,107 @@ def test_apply_common_filters(ds):
assert len(ids) == 3
# "Return any object that matched id relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"
resp = apply_common_filters(stix_objs, [filters[1]])
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[1]])]
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
# "Return any object that contains remote-access-trojan in labels"
resp = apply_common_filters(stix_objs, [filters[2]])
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[2]])]
assert resp[0]['id'] == stix_objs[0]['id']
assert len(resp) == 1
# "Return any object created after 2015-01-01T01:00:00.000Z"
resp = apply_common_filters(stix_objs, [filters[3]])
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[3]])]
assert resp[0]['id'] == stix_objs[0]['id']
assert len(resp) == 2
# "Return any revoked object"
resp = apply_common_filters(stix_objs, [filters[4]])
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[4]])]
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
# "Return any object whose not revoked"
# Note that if 'revoked' property is not present in object.
# Currently we can't use such an expression to filter for... :(
resp = apply_common_filters(stix_objs, [filters[5]])
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[5]])]
assert len(resp) == 0
# Assert unknown operator for _boolean() raises exception.
with pytest.raises(ValueError) as excinfo:
apply_common_filters(stix_objs, [filters[6]])
assert str(excinfo.value) == ("Error, filter operator: {0} not supported "
"for specified field: {1}"
.format(filters[6].op, filters[6].field))
# "Return any object that matches marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9 in object_marking_refs"
resp = apply_common_filters(stix_objs, [filters[7]])
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[6]])]
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
# "Return any object that contains relationship_type in their selectors AND
# also has marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed in marking_ref"
resp = apply_common_filters(stix_objs, [filters[8], filters[9]])
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[7], filters[8]])]
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
# "Return any object that contains CVE-2014-0160,CVE-2017-6608 in their external_id"
resp = apply_common_filters(stix_objs, [filters[10]])
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[9]])]
assert resp[0]['id'] == stix_objs[3]['id']
assert len(resp) == 1
# "Return any object that matches created_by_ref identity--00000000-0000-0000-0000-b8e91df99dc9"
resp = apply_common_filters(stix_objs, [filters[11]])
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[10]])]
assert len(resp) == 1
# "Return any object that matches marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9 in object_marking_refs" (None)
resp = apply_common_filters(stix_objs, [filters[12]])
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[11]])]
assert len(resp) == 0
# "Return any object that contains description in its selectors" (None)
resp = apply_common_filters(stix_objs, [filters[13]])
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[12]])]
assert len(resp) == 0
# "Return any object that object that matches CVE in source_name" (None, case sensitive)
resp = apply_common_filters(stix_objs, [filters[14]])
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[13]])]
assert len(resp) == 0
def test_filters0(ds):
# "Return any object modified before 2017-01-28T13:49:53.935Z"
resp = apply_common_filters(STIX_OBJS2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")])
resp = [stix_obj for stix_obj in apply_common_filters(STIX_OBJS2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")])]
assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2
def test_filters1(ds):
# "Return any object modified after 2017-01-28T13:49:53.935Z"
resp = apply_common_filters(STIX_OBJS2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")])
resp = [stix_obj for stix_obj in apply_common_filters(STIX_OBJS2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")])]
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1
def test_filters2(ds):
# "Return any object modified after or on 2017-01-28T13:49:53.935Z"
resp = apply_common_filters(STIX_OBJS2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")])
resp = [stix_obj for stix_obj in apply_common_filters(STIX_OBJS2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")])]
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 3
def test_filters3(ds):
# "Return any object modified before or on 2017-01-28T13:49:53.935Z"
resp = apply_common_filters(STIX_OBJS2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")])
resp = [stix_obj for stix_obj in apply_common_filters(STIX_OBJS2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")])]
assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2
def test_filters4(ds):
fltr4 = Filter("modified", "?", "2017-01-27T13:49:53.935Z")
# Assert unknown operator for _all() raises exception.
# Assert invalid Filter cannot be created
with pytest.raises(ValueError) as excinfo:
apply_common_filters(STIX_OBJS2, [fltr4])
assert str(excinfo.value) == ("Error, filter operator: {0} not supported "
"for specified field: {1}").format(fltr4.op, fltr4.field)
Filter("modified", "?", "2017-01-27T13:49:53.935Z")
assert str(excinfo.value) == ("Filter operator '?' not supported "
"for specified field: 'modified'")
def test_filters5(ds):
# "Return any object whose id is not indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"
resp = apply_common_filters(STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")])
resp = [stix_obj for stix_obj in apply_common_filters(STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")])]
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1
def test_filters6(ds):
fltr6 = Filter("id", "?", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")
# Assert unknown operator for _id() raises exception.
with pytest.raises(ValueError) as excinfo:
apply_common_filters(STIX_OBJS2, [fltr6])
assert str(excinfo.value) == ("Error, filter operator: {0} not supported "
"for specified field: {1}").format(fltr6.op, fltr6.field)
def test_filters7(ds):
fltr7 = Filter("notacommonproperty", "=", "bar")
# Assert unknown field raises exception.
with pytest.raises(ValueError) as excinfo:
apply_common_filters(STIX_OBJS2, [fltr7])
assert str(excinfo.value) == ("Error, field: {0} is not supported for "
"filtering on.").format(fltr7.field)
def test_deduplicate(ds):
unique = deduplicate(STIX_OBJS1)
@ -495,17 +461,19 @@ def test_add_remove_composite_datasource():
ds2 = DataSource()
ds3 = DataSink()
cds.add_data_source([ds1, ds2, ds1, ds3])
with pytest.raises(TypeError) as excinfo:
cds.add_data_sources([ds1, ds2, ds1, ds3])
assert str(excinfo.value) == ("DataSource (to be added) is not of type "
"stix2.DataSource. DataSource type is '<class 'stix2.sources.DataSink'>'")
cds.add_data_sources([ds1, ds2, ds1])
assert len(cds.get_all_data_sources()) == 2
cds.remove_data_source([ds1.id, ds2.id])
cds.remove_data_sources([ds1.id, ds2.id])
assert len(cds.get_all_data_sources()) == 0
with pytest.raises(ValueError):
cds.remove_data_source([ds3.id])
def test_composite_datasource_operations():
BUNDLE1 = dict(id="bundle--%s" % make_id(),
@ -516,7 +484,7 @@ def test_composite_datasource_operations():
ds1 = MemorySource(stix_data=BUNDLE1)
ds2 = MemorySource(stix_data=STIX_OBJS2)
cds.add_data_source([ds1, ds2])
cds.add_data_sources([ds1, ds2])
indicators = cds.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")

View File

@ -150,13 +150,11 @@ def test_environment_no_datastore():
env.query(INDICATOR_ID)
assert 'Environment has no data source' in str(excinfo.value)
with pytest.raises(AttributeError) as excinfo:
env.add_filters(INDICATOR_ID)
assert 'Environment has no data source' in str(excinfo.value)
with pytest.raises(AttributeError) as excinfo:
def test_environment_add_filters():
env = stix2.Environment(factory=stix2.ObjectFactory())
env.add_filters([INDICATOR_ID])
env.add_filter(INDICATOR_ID)
assert 'Environment has no data source' in str(excinfo.value)
def test_environment_datastore_and_no_object_factory():

View File

@ -33,6 +33,34 @@ class STIXdatetime(dt.datetime):
return "'%s'" % format_datetime(self)
def deduplicate(stix_obj_list):
"""Deduplicate a list of STIX objects to a unique set
Reduces a set of STIX objects to unique set by looking
at 'id' and 'modified' fields - as a unique object version
is determined by the combination of those fields
Note: Be aware, as can be seen in the implementation
of deduplicate(),that if the "stix_obj_list" argument has
multiple STIX objects of the same version, the last object
version found in the list will be the one that is returned.
()
Args:
stix_obj_list (list): list of STIX objects (dicts)
Returns:
A list with a unique set of the passed list of STIX objects.
"""
unique_objs = {}
for obj in stix_obj_list:
unique_objs[(obj['id'], obj['modified'])] = obj
return list(unique_objs.values())
def get_timestamp():
return STIXdatetime.now(tz=pytz.UTC)