Clean up filters

- Simplify an if statement since split() with no matches returns single
  item list
- Rename _all_filter -> _check_property and make it a method on Filter
- Raise an error instead of returning -1
- s/field/property
stix2.0
Chris Lenk 2017-11-01 10:40:10 -04:00
parent 924c72e98a
commit c2d1e9777b
4 changed files with 85 additions and 109 deletions

View File

@ -225,14 +225,14 @@ class FileSystemSource(DataSource):
file_filters = self._parse_file_filters(query) file_filters = self._parse_file_filters(query)
# establish which subdirectories can be avoided in query # establish which subdirectories can be avoided in query
# by decluding as many as possible. A filter with "type" as the field # by decluding as many as possible. A filter with "type" as the property
# means that certain STIX object types can be ruled out, and thus # means that certain STIX object types can be ruled out, and thus
# the corresponding subdirectories as well # the corresponding subdirectories as well
include_paths = [] include_paths = []
declude_paths = [] declude_paths = []
if "type" in [filter.field for filter in file_filters]: if "type" in [filter.property for filter in file_filters]:
for filter in file_filters: for filter in file_filters:
if filter.field == "type": if filter.property == "type":
if filter.op == "=": if filter.op == "=":
include_paths.append(os.path.join(self._stix_dir, filter.value)) include_paths.append(os.path.join(self._stix_dir, filter.value))
elif filter.op == "!=": elif filter.op == "!=":
@ -259,9 +259,9 @@ class FileSystemSource(DataSource):
# grab stix object ID as well - if present in filters, as # grab stix object ID as well - if present in filters, as
# may forgo the loading of STIX content into memory # may forgo the loading of STIX content into memory
if "id" in [filter.field for filter in file_filters]: if "id" in [filter.property for filter in file_filters]:
for filter in file_filters: for filter in file_filters:
if filter.field == "id" and filter.op == "=": if filter.property == "id" and filter.op == "=":
id_ = filter.value id_ = filter.value
break break
else: else:
@ -296,7 +296,7 @@ class FileSystemSource(DataSource):
that can used to possibly speed up querying STIX objects that can used to possibly speed up querying STIX objects
from the file system from the file system
Extracts filters that are for the "id" and "type" field of Extracts filters that are for the "id" and "type" property of
a STIX object. As the file directory is organized by STIX a STIX object. As the file directory is organized by STIX
object type with filenames that are equivalent to the STIX object type with filenames that are equivalent to the STIX
object ID, these filters can be used first to reduce the object ID, these filters can be used first to reduce the
@ -304,6 +304,6 @@ class FileSystemSource(DataSource):
""" """
file_filters = set() file_filters = set()
for filter_ in query: for filter_ in query:
if filter_.field == "id" or filter_.field == "type": if filter_.property == "id" or filter_.property == "type":
file_filters.add(filter_) file_filters.add(filter_)
return file_filters return file_filters

View File

@ -5,28 +5,6 @@ Filters for Python STIX 2.0 DataSources, DataSinks, DataStores
import collections import collections
# Currently, only STIX 2.0 common SDO fields (that are not complex objects)
# are supported for filtering on
"""Supported STIX properties"""
STIX_COMMON_FIELDS = [
"created",
"created_by_ref",
"external_references.source_name",
"external_references.description",
"external_references.url",
"external_references.hashes",
"external_references.external_id",
"granular_markings.marking_ref",
"granular_markings.selectors",
"id",
"labels",
"modified",
"object_marking_refs",
"revoked",
"type"
]
"""Supported filter operations""" """Supported filter operations"""
FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<='] FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
@ -34,40 +12,39 @@ FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple] FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple]
def _check_filter_components(field, op, value): def _check_filter_components(prop, op, value):
"""Check that filter meets minimum validity. """Check that filter meets minimum validity.
Note: Currently can create Filters that are not valid Note:
STIX2 object common properties, as filter.field value Currently can create Filters that are not valid STIX2 object common
is not checked, only filter.op, filter.value are checked properties, as filter.prop value is not checked, only filter.op,
here. They are just ignored when filter value are checked here. They are just ignored when applied
applied within the DataSource API. For example, a user within the DataSource API. For example, a user can add a TAXII Filter,
can add a TAXII Filter, that is extracted and sent to that is extracted and sent to a TAXII endpoint within TAXIICollection
a TAXII endpoint within TAXIICollection and not applied and not applied locally (within this API).
locally (within this API).
"""
if op not in FILTER_OPS: """
# check filter operator is supported if op not in FILTER_OPS:
raise ValueError("Filter operator '%s' not supported for specified field: '%s'" % (op, field)) # check filter operator is supported
raise ValueError("Filter operator '%s' not supported for specified property: '%s'" % (op, prop))
if type(value) not in FILTER_VALUE_TYPES: if type(value) not in FILTER_VALUE_TYPES:
# check filter value type is supported # check filter value type is supported
raise TypeError("Filter value type '%s' is not supported. The type must be a Python immutable type or dictionary" % type(value)) raise TypeError("Filter value type '%s' is not supported. The type must be a Python immutable type or dictionary" % type(value))
return True return True
class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])): class Filter(collections.namedtuple("Filter", ['property', 'op', 'value'])):
"""STIX 2 filters that support the querying functionality of STIX 2 """STIX 2 filters that support the querying functionality of STIX 2
DataStores and DataSources. DataStores and DataSources.
Initialized like a Python tuple. Initialized like a Python tuple.
Args: Args:
field (str): filter field name, corresponds to STIX 2 object property property (str): filter property name, corresponds to STIX 2 object property
op (str): operator of the filter op (str): operator of the filter
value (str): filter field value value (str): filter property value
Example: Example:
Filter("id", "=", "malware--0f862b01-99da-47cc-9bdb-db4a86a95bb1") Filter("id", "=", "malware--0f862b01-99da-47cc-9bdb-db4a86a95bb1")
@ -75,29 +52,55 @@ class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])):
""" """
__slots__ = () __slots__ = ()
def __new__(cls, field, op, value): def __new__(cls, prop, op, value):
# If value is a list, convert it to a tuple so it is hashable. # If value is a list, convert it to a tuple so it is hashable.
if isinstance(value, list): if isinstance(value, list):
value = tuple(value) value = tuple(value)
_check_filter_components(field, op, value) _check_filter_components(prop, op, value)
self = super(Filter, cls).__new__(cls, field, op, value) self = super(Filter, cls).__new__(cls, prop, op, value)
return self return self
def _check_property(self, stix_obj_property):
"""Check a property of a STIX Object against this filter.
Args:
stix_obj_property: value to check this filter against
Returns:
True if property matches the filter,
False otherwise.
"""
if self.op == "=":
return stix_obj_property == self.value
elif self.op == "!=":
return stix_obj_property != self.value
elif self.op == "in":
return stix_obj_property in self.value
elif self.op == ">":
return stix_obj_property > self.value
elif self.op == "<":
return stix_obj_property < self.value
elif self.op == ">=":
return stix_obj_property >= self.value
elif self.op == "<=":
return stix_obj_property <= self.value
else:
raise ValueError("Filter operator: {0} not supported for specified property: {1}".format(self.op, self.property))
def apply_common_filters(stix_objs, query): def apply_common_filters(stix_objs, query):
"""Evaluate filters against a set of STIX 2.0 objects. """Evaluate filters against a set of STIX 2.0 objects.
Supports only STIX 2.0 common property fields. Supports only STIX 2.0 common property properties.
Args: Args:
stix_objs (list): list of STIX objects to apply the query to stix_objs (list): list of STIX objects to apply the query to
query (set): set of filters (combined form complete query) query (set): set of filters (combined form complete query)
Returns: Yields:
(generator): of STIX objects that successfully evaluate against STIX objects that successfully evaluate against the query.
the query.
""" """
for stix_obj in stix_objs: for stix_obj in stix_objs:
@ -108,8 +111,6 @@ def apply_common_filters(stix_objs, query):
if not match: if not match:
clean = False clean = False
break break
elif match == -1:
raise ValueError("Error, filter operator: {0} not supported for specified field: {1}".format(filter_.op, filter_.field))
# if object unmarked after all filters, add it # if object unmarked after all filters, add it
if clean: if clean:
@ -128,58 +129,33 @@ def _check_filter(filter_, stix_obj):
False if not. False if not.
""" """
if "." in filter_.field: # For properties like granular_markings and external_references
# For properties like granular_markings and external_references # need to extract the first property from the string.
# need to extract the first property from the string. prop = filter_.property.split(".")[0]
field = filter_.field.split(".")[0]
else:
field = filter_.field
if field not in stix_obj.keys(): if prop not in stix_obj.keys():
# check filter "field" is in STIX object - if cant be # check filter "property" is in STIX object - if cant be
# applied to STIX object, STIX object is discarded # applied to STIX object, STIX object is discarded
# (i.e. did not make it through the filter) # (i.e. did not make it through the filter)
return False return False
if "." in filter_.field: if "." in filter_.property:
# Check embedded properties, from e.g. granular_markings or external_references # Check embedded properties, from e.g. granular_markings or external_references
sub_field = filter_.field.split(".", 1)[1] sub_property = filter_.property.split(".", 1)[1]
sub_filter = filter_._replace(field=sub_field) sub_filter = filter_._replace(property=sub_property)
if isinstance(stix_obj[field], list): if isinstance(stix_obj[prop], list):
for elem in stix_obj[field]: for elem in stix_obj[prop]:
r = _check_filter(sub_filter, elem) if _check_filter(sub_filter, elem) is True:
if r: return True
return r
return False return False
else: else:
return _check_filter(sub_filter, stix_obj[field]) return _check_filter(sub_filter, stix_obj[prop])
elif isinstance(stix_obj[field], list): elif isinstance(stix_obj[prop], list):
# Check each item in list property to see if it matches # Check each item in list property to see if it matches
for elem in stix_obj[field]: for elem in stix_obj[prop]:
r = _all_filter(filter_, elem) if filter_._check_property(elem) is True:
if r: return True
return r
return False return False
else: else:
# Check if property matches # Check if property matches
return _all_filter(filter_, stix_obj[field]) return filter_._check_property(stix_obj[prop])
def _all_filter(filter_, stix_obj_field):
"""all filter operations (for filters whose value type can be applied to any operation type)"""
if filter_.op == "=":
return stix_obj_field == filter_.value
elif filter_.op == "!=":
return stix_obj_field != filter_.value
elif filter_.op == "in":
return stix_obj_field in filter_.value
elif filter_.op == ">":
return stix_obj_field > filter_.value
elif filter_.op == "<":
return stix_obj_field < filter_.value
elif filter_.op == ">=":
return stix_obj_field >= filter_.value
elif filter_.op == "<=":
return stix_obj_field <= filter_.value
else:
return -1

View File

@ -229,10 +229,10 @@ class TAXIICollectionSource(DataSource):
params = {} params = {}
for filter_ in query: for filter_ in query:
if filter_.field in TAXII_FILTERS: if filter_.property in TAXII_FILTERS:
if filter_.field == "added_after": if filter_.property == "added_after":
params[filter_.field] = filter_.value params[filter_.property] = filter_.value
else: else:
taxii_field = "match[%s]" % filter_.field taxii_field = "match[%s]" % filter_.property
params[taxii_field] = filter_.value params[taxii_field] = filter_.value
return params return params

View File

@ -205,7 +205,7 @@ def test_parse_taxii_filters():
def test_add_get_remove_filter(ds): def test_add_get_remove_filter(ds):
# First 3 filters are valid, remaining fields are erroneous in some way # First 3 filters are valid, remaining properties are erroneous in some way
valid_filters = [ valid_filters = [
Filter('type', '=', 'malware'), Filter('type', '=', 'malware'),
Filter('id', '!=', 'stix object id'), Filter('id', '!=', 'stix object id'),
@ -219,7 +219,7 @@ def test_add_get_remove_filter(ds):
with pytest.raises(ValueError) as excinfo: with pytest.raises(ValueError) as excinfo:
# create Filter that has an operator that is not allowed # create Filter that has an operator that is not allowed
Filter('modified', '*', 'not supported operator - just place holder') Filter('modified', '*', 'not supported operator - just place holder')
assert str(excinfo.value) == "Filter operator '*' not supported for specified field: 'modified'" assert str(excinfo.value) == "Filter operator '*' not supported for specified property: 'modified'"
with pytest.raises(TypeError) as excinfo: with pytest.raises(TypeError) as excinfo:
# create Filter that has a value type that is not allowed # create Filter that has a value type that is not allowed
@ -433,7 +433,7 @@ def test_filters4(ds):
with pytest.raises(ValueError) as excinfo: with pytest.raises(ValueError) as excinfo:
Filter("modified", "?", "2017-01-27T13:49:53.935Z") Filter("modified", "?", "2017-01-27T13:49:53.935Z")
assert str(excinfo.value) == ("Filter operator '?' not supported " assert str(excinfo.value) == ("Filter operator '?' not supported "
"for specified field: 'modified'") "for specified property: 'modified'")
def test_filters5(ds): def test_filters5(ds):