Merge pull request #95 from oasis-open/filters
Change filters to allow filtering all propertiesstix2.0
commit
f029a01f9a
|
@ -238,14 +238,14 @@ class FileSystemSource(DataSource):
|
||||||
file_filters = self._parse_file_filters(query)
|
file_filters = self._parse_file_filters(query)
|
||||||
|
|
||||||
# establish which subdirectories can be avoided in query
|
# establish which subdirectories can be avoided in query
|
||||||
# by decluding as many as possible. A filter with "type" as the field
|
# by decluding as many as possible. A filter with "type" as the property
|
||||||
# means that certain STIX object types can be ruled out, and thus
|
# means that certain STIX object types can be ruled out, and thus
|
||||||
# the corresponding subdirectories as well
|
# the corresponding subdirectories as well
|
||||||
include_paths = []
|
include_paths = []
|
||||||
declude_paths = []
|
declude_paths = []
|
||||||
if "type" in [filter.field for filter in file_filters]:
|
if "type" in [filter.property for filter in file_filters]:
|
||||||
for filter in file_filters:
|
for filter in file_filters:
|
||||||
if filter.field == "type":
|
if filter.property == "type":
|
||||||
if filter.op == "=":
|
if filter.op == "=":
|
||||||
include_paths.append(os.path.join(self._stix_dir, filter.value))
|
include_paths.append(os.path.join(self._stix_dir, filter.value))
|
||||||
elif filter.op == "!=":
|
elif filter.op == "!=":
|
||||||
|
@ -272,9 +272,9 @@ class FileSystemSource(DataSource):
|
||||||
|
|
||||||
# grab stix object ID as well - if present in filters, as
|
# grab stix object ID as well - if present in filters, as
|
||||||
# may forgo the loading of STIX content into memory
|
# may forgo the loading of STIX content into memory
|
||||||
if "id" in [filter.field for filter in file_filters]:
|
if "id" in [filter.property for filter in file_filters]:
|
||||||
for filter in file_filters:
|
for filter in file_filters:
|
||||||
if filter.field == "id" and filter.op == "=":
|
if filter.property == "id" and filter.op == "=":
|
||||||
id_ = filter.value
|
id_ = filter.value
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
|
@ -306,7 +306,7 @@ class FileSystemSource(DataSource):
|
||||||
|
|
||||||
Possibly speeds up querying STIX objects from the file system.
|
Possibly speeds up querying STIX objects from the file system.
|
||||||
|
|
||||||
Extracts filters that are for the "id" and "type" field of
|
Extracts filters that are for the "id" and "type" property of
|
||||||
a STIX object. As the file directory is organized by STIX
|
a STIX object. As the file directory is organized by STIX
|
||||||
object type with filenames that are equivalent to the STIX
|
object type with filenames that are equivalent to the STIX
|
||||||
object ID, these filters can be used first to reduce the
|
object ID, these filters can be used first to reduce the
|
||||||
|
@ -315,6 +315,6 @@ class FileSystemSource(DataSource):
|
||||||
"""
|
"""
|
||||||
file_filters = set()
|
file_filters = set()
|
||||||
for filter_ in query:
|
for filter_ in query:
|
||||||
if filter_.field == "id" or filter_.field == "type":
|
if filter_.property == "id" or filter_.property == "type":
|
||||||
file_filters.add(filter_)
|
file_filters.add(filter_)
|
||||||
return file_filters
|
return file_filters
|
||||||
|
|
|
@ -4,29 +4,6 @@ Filters for Python STIX 2.0 DataSources, DataSinks, DataStores
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
import types
|
|
||||||
|
|
||||||
# Currently, only STIX 2.0 common SDO fields (that are not complex objects)
|
|
||||||
# are supported for filtering on
|
|
||||||
|
|
||||||
"""Supported STIX properties"""
|
|
||||||
STIX_COMMON_FIELDS = [
|
|
||||||
"created",
|
|
||||||
"created_by_ref",
|
|
||||||
"external_references.source_name",
|
|
||||||
"external_references.description",
|
|
||||||
"external_references.url",
|
|
||||||
"external_references.hashes",
|
|
||||||
"external_references.external_id",
|
|
||||||
"granular_markings.marking_ref",
|
|
||||||
"granular_markings.selectors",
|
|
||||||
"id",
|
|
||||||
"labels",
|
|
||||||
"modified",
|
|
||||||
"object_marking_refs",
|
|
||||||
"revoked",
|
|
||||||
"type"
|
|
||||||
]
|
|
||||||
|
|
||||||
"""Supported filter operations"""
|
"""Supported filter operations"""
|
||||||
FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
|
FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
|
||||||
|
@ -34,46 +11,40 @@ FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
|
||||||
"""Supported filter value types"""
|
"""Supported filter value types"""
|
||||||
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple]
|
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple]
|
||||||
|
|
||||||
# filter lookup map - STIX 2 common fields -> filter method
|
|
||||||
STIX_COMMON_FILTERS_MAP = {}
|
|
||||||
|
|
||||||
|
def _check_filter_components(prop, op, value):
|
||||||
|
"""Check that filter meets minimum validity.
|
||||||
|
|
||||||
def _check_filter_components(field, op, value):
|
Note:
|
||||||
"""check filter meets minimum validity
|
Currently can create Filters that are not valid STIX2 object common
|
||||||
|
properties, as filter.prop value is not checked, only filter.op,
|
||||||
|
filter value are checked here. They are just ignored when applied
|
||||||
|
within the DataSource API. For example, a user can add a TAXII Filter,
|
||||||
|
that is extracted and sent to a TAXII endpoint within TAXIICollection
|
||||||
|
and not applied locally (within this API).
|
||||||
|
|
||||||
Note: Currently can create Filters that are not valid
|
|
||||||
STIX2 object common properties, as filter.field value
|
|
||||||
is not checked, only filter.op, filter.value are checked
|
|
||||||
here. They are just ignored when
|
|
||||||
applied within the DataSource API. For example, a user
|
|
||||||
can add a TAXII Filter, that is extracted and sent to
|
|
||||||
a TAXII endpoint within TAXIICollection and not applied
|
|
||||||
locally (within this API).
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if op not in FILTER_OPS:
|
if op not in FILTER_OPS:
|
||||||
# check filter operator is supported
|
# check filter operator is supported
|
||||||
raise ValueError("Filter operator '%s' not supported for specified field: '%s'" % (op, field))
|
raise ValueError("Filter operator '%s' not supported for specified property: '%s'" % (op, prop))
|
||||||
|
|
||||||
if type(value) not in FILTER_VALUE_TYPES:
|
if type(value) not in FILTER_VALUE_TYPES:
|
||||||
# check filter value type is supported
|
# check filter value type is supported
|
||||||
raise TypeError("Filter value type '%s' is not supported. The type must be a python immutable type or dictionary" % type(value))
|
raise TypeError("Filter value type '%s' is not supported. The type must be a Python immutable type or dictionary" % type(value))
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])):
|
class Filter(collections.namedtuple("Filter", ['property', 'op', 'value'])):
|
||||||
"""STIX 2 filters that support the querying functionality of STIX 2
|
"""STIX 2 filters that support the querying functionality of STIX 2
|
||||||
DataStores and DataSources.
|
DataStores and DataSources.
|
||||||
|
|
||||||
Initialized like a python tuple
|
Initialized like a Python tuple.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
field (str): filter field name, corresponds to STIX 2 object property
|
property (str): filter property name, corresponds to STIX 2 object property
|
||||||
|
|
||||||
op (str): operator of the filter
|
op (str): operator of the filter
|
||||||
|
value (str): filter property value
|
||||||
value (str): filter field value
|
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
Filter("id", "=", "malware--0f862b01-99da-47cc-9bdb-db4a86a95bb1")
|
Filter("id", "=", "malware--0f862b01-99da-47cc-9bdb-db4a86a95bb1")
|
||||||
|
@ -81,235 +52,110 @@ class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])):
|
||||||
"""
|
"""
|
||||||
__slots__ = ()
|
__slots__ = ()
|
||||||
|
|
||||||
def __new__(cls, field, op, value):
|
def __new__(cls, prop, op, value):
|
||||||
# If value is a list, convert it to a tuple so it is hashable.
|
# If value is a list, convert it to a tuple so it is hashable.
|
||||||
if isinstance(value, list):
|
if isinstance(value, list):
|
||||||
value = tuple(value)
|
value = tuple(value)
|
||||||
|
|
||||||
_check_filter_components(field, op, value)
|
_check_filter_components(prop, op, value)
|
||||||
|
|
||||||
self = super(Filter, cls).__new__(cls, field, op, value)
|
self = super(Filter, cls).__new__(cls, prop, op, value)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
@property
|
def _check_property(self, stix_obj_property):
|
||||||
def common(self):
|
"""Check a property of a STIX Object against this filter.
|
||||||
"""return whether Filter is valid STIX2 Object common property
|
|
||||||
|
|
||||||
Note: The Filter operator and Filter value type are checked when
|
Args:
|
||||||
the filter is created, thus only leaving the Filter field to be
|
stix_obj_property: value to check this filter against
|
||||||
checked to make sure a valid STIX2 Object common property.
|
|
||||||
|
|
||||||
Note: Filters that are not valid STIX2 Object common property
|
Returns:
|
||||||
Filters are still allowed to be created for extended usage of
|
True if property matches the filter,
|
||||||
Filter. (e.g. TAXII specific filters can be created, which are
|
False otherwise.
|
||||||
then extracted and sent to TAXII endpoint.)
|
|
||||||
"""
|
"""
|
||||||
return self.field in STIX_COMMON_FIELDS
|
if self.op == "=":
|
||||||
|
return stix_obj_property == self.value
|
||||||
|
elif self.op == "!=":
|
||||||
|
return stix_obj_property != self.value
|
||||||
|
elif self.op == "in":
|
||||||
|
return stix_obj_property in self.value
|
||||||
|
elif self.op == ">":
|
||||||
|
return stix_obj_property > self.value
|
||||||
|
elif self.op == "<":
|
||||||
|
return stix_obj_property < self.value
|
||||||
|
elif self.op == ">=":
|
||||||
|
return stix_obj_property >= self.value
|
||||||
|
elif self.op == "<=":
|
||||||
|
return stix_obj_property <= self.value
|
||||||
|
else:
|
||||||
|
raise ValueError("Filter operator: {0} not supported for specified property: {1}".format(self.op, self.property))
|
||||||
|
|
||||||
|
|
||||||
def apply_common_filters(stix_objs, query):
|
def apply_common_filters(stix_objs, query):
|
||||||
"""Evaluate filters against a set of STIX 2.0 objects.
|
"""Evaluate filters against a set of STIX 2.0 objects.
|
||||||
|
|
||||||
Supports only STIX 2.0 common property fields
|
Supports only STIX 2.0 common property properties.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_objs (list): list of STIX objects to apply the query to
|
stix_objs (list): list of STIX objects to apply the query to
|
||||||
query (set): set of filters (combined form complete query)
|
query (set): set of filters (combined form complete query)
|
||||||
|
|
||||||
Returns:
|
Yields:
|
||||||
(generator): of STIX objects that successfully evaluate against
|
STIX objects that successfully evaluate against the query.
|
||||||
the query.
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
for stix_obj in stix_objs:
|
for stix_obj in stix_objs:
|
||||||
clean = True
|
clean = True
|
||||||
for filter_ in query:
|
for filter_ in query:
|
||||||
if not filter_.common:
|
match = _check_filter(filter_, stix_obj)
|
||||||
# skip filter as it is not a STIX2 Object common property filter
|
|
||||||
continue
|
|
||||||
|
|
||||||
if "." in filter_.field:
|
|
||||||
# For properties like granular_markings and external_references
|
|
||||||
# need to extract the first property from the string.
|
|
||||||
field = filter_.field.split(".")[0]
|
|
||||||
else:
|
|
||||||
field = filter_.field
|
|
||||||
|
|
||||||
if field not in stix_obj.keys():
|
|
||||||
# check filter "field" is in STIX object - if cant be
|
|
||||||
# applied to STIX object, STIX object is discarded
|
|
||||||
# (i.e. did not make it through the filter)
|
|
||||||
clean = False
|
|
||||||
break
|
|
||||||
|
|
||||||
match = STIX_COMMON_FILTERS_MAP[filter_.field.split('.')[0]](filter_, stix_obj)
|
|
||||||
|
|
||||||
if not match:
|
if not match:
|
||||||
clean = False
|
clean = False
|
||||||
break
|
break
|
||||||
elif match == -1:
|
|
||||||
raise ValueError("Error, filter operator: {0} not supported for specified field: {1}".format(filter_.op, filter_.field))
|
|
||||||
|
|
||||||
# if object unmarked after all filters, add it
|
# if object unmarked after all filters, add it
|
||||||
if clean:
|
if clean:
|
||||||
yield stix_obj
|
yield stix_obj
|
||||||
|
|
||||||
|
|
||||||
"""Base type filters"""
|
def _check_filter(filter_, stix_obj):
|
||||||
|
"""Evaluate a single filter against a single STIX 2.0 object.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filter_ (Filter): filter to match against
|
||||||
|
stix_obj: STIX object to apply the filter to
|
||||||
|
|
||||||
def _all_filter(filter_, stix_obj_field):
|
Returns:
|
||||||
"""all filter operations (for filters whose value type can be applied to any operation type)"""
|
True if the stix_obj matches the filter,
|
||||||
if filter_.op == "=":
|
False if not.
|
||||||
return stix_obj_field == filter_.value
|
|
||||||
elif filter_.op == "!=":
|
"""
|
||||||
return stix_obj_field != filter_.value
|
# For properties like granular_markings and external_references
|
||||||
elif filter_.op == "in":
|
# need to extract the first property from the string.
|
||||||
return stix_obj_field in filter_.value
|
prop = filter_.property.split(".")[0]
|
||||||
elif filter_.op == ">":
|
|
||||||
return stix_obj_field > filter_.value
|
if prop not in stix_obj.keys():
|
||||||
elif filter_.op == "<":
|
# check filter "property" is in STIX object - if cant be
|
||||||
return stix_obj_field < filter_.value
|
# applied to STIX object, STIX object is discarded
|
||||||
elif filter_.op == ">=":
|
# (i.e. did not make it through the filter)
|
||||||
return stix_obj_field >= filter_.value
|
return False
|
||||||
elif filter_.op == "<=":
|
|
||||||
return stix_obj_field <= filter_.value
|
if "." in filter_.property:
|
||||||
|
# Check embedded properties, from e.g. granular_markings or external_references
|
||||||
|
sub_property = filter_.property.split(".", 1)[1]
|
||||||
|
sub_filter = filter_._replace(property=sub_property)
|
||||||
|
if isinstance(stix_obj[prop], list):
|
||||||
|
for elem in stix_obj[prop]:
|
||||||
|
if _check_filter(sub_filter, elem) is True:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
else:
|
else:
|
||||||
return -1
|
return _check_filter(sub_filter, stix_obj[prop])
|
||||||
|
elif isinstance(stix_obj[prop], list):
|
||||||
|
# Check each item in list property to see if it matches
|
||||||
def _id_filter(filter_, stix_obj_id):
|
for elem in stix_obj[prop]:
|
||||||
"""base STIX id filter"""
|
if filter_._check_property(elem) is True:
|
||||||
if filter_.op == "=":
|
return True
|
||||||
return stix_obj_id == filter_.value
|
return False
|
||||||
elif filter_.op == "!=":
|
|
||||||
return stix_obj_id != filter_.value
|
|
||||||
else:
|
else:
|
||||||
return -1
|
# Check if property matches
|
||||||
|
return filter_._check_property(stix_obj[prop])
|
||||||
|
|
||||||
def _boolean_filter(filter_, stix_obj_field):
|
|
||||||
"""base boolean filter"""
|
|
||||||
if filter_.op == "=":
|
|
||||||
return stix_obj_field == filter_.value
|
|
||||||
elif filter_.op == "!=":
|
|
||||||
return stix_obj_field != filter_.value
|
|
||||||
else:
|
|
||||||
return -1
|
|
||||||
|
|
||||||
|
|
||||||
def _string_filter(filter_, stix_obj_field):
|
|
||||||
"""base string filter"""
|
|
||||||
return _all_filter(filter_, stix_obj_field)
|
|
||||||
|
|
||||||
|
|
||||||
def _timestamp_filter(filter_, stix_obj_timestamp):
|
|
||||||
"""base STIX 2 timestamp filter"""
|
|
||||||
return _all_filter(filter_, stix_obj_timestamp)
|
|
||||||
|
|
||||||
|
|
||||||
"""STIX 2.0 Common Property Filters
|
|
||||||
|
|
||||||
The naming of these functions is important as
|
|
||||||
they are used to index a mapping dictionary from
|
|
||||||
STIX common field names to these filter functions.
|
|
||||||
|
|
||||||
REQUIRED naming scheme:
|
|
||||||
"check_<STIX field name>_filter"
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
def check_created_filter(filter_, stix_obj):
|
|
||||||
return _timestamp_filter(filter_, stix_obj["created"])
|
|
||||||
|
|
||||||
|
|
||||||
def check_created_by_ref_filter(filter_, stix_obj):
|
|
||||||
return _id_filter(filter_, stix_obj["created_by_ref"])
|
|
||||||
|
|
||||||
|
|
||||||
def check_external_references_filter(filter_, stix_obj):
|
|
||||||
"""
|
|
||||||
STIX object's can have a list of external references
|
|
||||||
|
|
||||||
external_references properties supported:
|
|
||||||
external_references.source_name (string)
|
|
||||||
external_references.description (string)
|
|
||||||
external_references.url (string)
|
|
||||||
external_references.external_id (string)
|
|
||||||
|
|
||||||
external_references properties not supported:
|
|
||||||
external_references.hashes
|
|
||||||
|
|
||||||
"""
|
|
||||||
for er in stix_obj["external_references"]:
|
|
||||||
# grab er property name from filter field
|
|
||||||
filter_field = filter_.field.split(".")[1]
|
|
||||||
if filter_field in er:
|
|
||||||
r = _string_filter(filter_, er[filter_field])
|
|
||||||
if r:
|
|
||||||
return r
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def check_granular_markings_filter(filter_, stix_obj):
|
|
||||||
"""
|
|
||||||
STIX object's can have a list of granular marking references
|
|
||||||
|
|
||||||
granular_markings properties:
|
|
||||||
granular_markings.marking_ref (id)
|
|
||||||
granular_markings.selectors (string)
|
|
||||||
|
|
||||||
"""
|
|
||||||
for gm in stix_obj["granular_markings"]:
|
|
||||||
# grab gm property name from filter field
|
|
||||||
filter_field = filter_.field.split(".")[1]
|
|
||||||
|
|
||||||
if filter_field == "marking_ref":
|
|
||||||
return _id_filter(filter_, gm[filter_field])
|
|
||||||
|
|
||||||
elif filter_field == "selectors":
|
|
||||||
for selector in gm[filter_field]:
|
|
||||||
r = _string_filter(filter_, selector)
|
|
||||||
if r:
|
|
||||||
return r
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def check_id_filter(filter_, stix_obj):
|
|
||||||
return _id_filter(filter_, stix_obj["id"])
|
|
||||||
|
|
||||||
|
|
||||||
def check_labels_filter(filter_, stix_obj):
|
|
||||||
for label in stix_obj["labels"]:
|
|
||||||
r = _string_filter(filter_, label)
|
|
||||||
if r:
|
|
||||||
return r
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def check_modified_filter(filter_, stix_obj):
|
|
||||||
return _timestamp_filter(filter_, stix_obj["modified"])
|
|
||||||
|
|
||||||
|
|
||||||
def check_object_marking_refs_filter(filter_, stix_obj):
|
|
||||||
for marking_id in stix_obj["object_marking_refs"]:
|
|
||||||
r = _id_filter(filter_, marking_id)
|
|
||||||
if r:
|
|
||||||
return r
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def check_revoked_filter(filter_, stix_obj):
|
|
||||||
return _boolean_filter(filter_, stix_obj["revoked"])
|
|
||||||
|
|
||||||
|
|
||||||
def check_type_filter(filter_, stix_obj):
|
|
||||||
return _string_filter(filter_, stix_obj["type"])
|
|
||||||
|
|
||||||
|
|
||||||
# Create mapping of field names to filter functions
|
|
||||||
for name, obj in dict(globals()).items():
|
|
||||||
if "check_" in name and isinstance(obj, types.FunctionType):
|
|
||||||
field_name = "_".join(name.split("_")[1:-1])
|
|
||||||
STIX_COMMON_FILTERS_MAP[field_name] = obj
|
|
||||||
|
|
|
@ -234,10 +234,10 @@ class TAXIICollectionSource(DataSource):
|
||||||
params = {}
|
params = {}
|
||||||
|
|
||||||
for filter_ in query:
|
for filter_ in query:
|
||||||
if filter_.field in TAXII_FILTERS:
|
if filter_.property in TAXII_FILTERS:
|
||||||
if filter_.field == "added_after":
|
if filter_.property == "added_after":
|
||||||
params[filter_.field] = filter_.value
|
params[filter_.property] = filter_.value
|
||||||
else:
|
else:
|
||||||
taxii_field = "match[%s]" % filter_.field
|
taxii_field = "match[%s]" % filter_.property
|
||||||
params[taxii_field] = filter_.value
|
params[taxii_field] = filter_.value
|
||||||
return params
|
return params
|
||||||
|
|
|
@ -179,7 +179,7 @@ def test_parse_taxii_filters():
|
||||||
|
|
||||||
def test_add_get_remove_filter(ds):
|
def test_add_get_remove_filter(ds):
|
||||||
|
|
||||||
# First 3 filters are valid, remaining fields are erroneous in some way
|
# First 3 filters are valid, remaining properties are erroneous in some way
|
||||||
valid_filters = [
|
valid_filters = [
|
||||||
Filter('type', '=', 'malware'),
|
Filter('type', '=', 'malware'),
|
||||||
Filter('id', '!=', 'stix object id'),
|
Filter('id', '!=', 'stix object id'),
|
||||||
|
@ -193,14 +193,14 @@ def test_add_get_remove_filter(ds):
|
||||||
with pytest.raises(ValueError) as excinfo:
|
with pytest.raises(ValueError) as excinfo:
|
||||||
# create Filter that has an operator that is not allowed
|
# create Filter that has an operator that is not allowed
|
||||||
Filter('modified', '*', 'not supported operator - just place holder')
|
Filter('modified', '*', 'not supported operator - just place holder')
|
||||||
assert str(excinfo.value) == "Filter operator '*' not supported for specified field: 'modified'"
|
assert str(excinfo.value) == "Filter operator '*' not supported for specified property: 'modified'"
|
||||||
|
|
||||||
with pytest.raises(TypeError) as excinfo:
|
with pytest.raises(TypeError) as excinfo:
|
||||||
# create Filter that has a value type that is not allowed
|
# create Filter that has a value type that is not allowed
|
||||||
Filter('created', '=', object())
|
Filter('created', '=', object())
|
||||||
# On Python 2, the type of object() is `<type 'object'>` On Python 3, it's `<class 'object'>`.
|
# On Python 2, the type of object() is `<type 'object'>` On Python 3, it's `<class 'object'>`.
|
||||||
assert str(excinfo.value).startswith("Filter value type")
|
assert str(excinfo.value).startswith("Filter value type")
|
||||||
assert str(excinfo.value).endswith("is not supported. The type must be a python immutable type or dictionary")
|
assert str(excinfo.value).endswith("is not supported. The type must be a Python immutable type or dictionary")
|
||||||
|
|
||||||
assert len(ds.filters) == 0
|
assert len(ds.filters) == 0
|
||||||
|
|
||||||
|
@ -407,7 +407,7 @@ def test_filters4(ds):
|
||||||
with pytest.raises(ValueError) as excinfo:
|
with pytest.raises(ValueError) as excinfo:
|
||||||
Filter("modified", "?", "2017-01-27T13:49:53.935Z")
|
Filter("modified", "?", "2017-01-27T13:49:53.935Z")
|
||||||
assert str(excinfo.value) == ("Filter operator '?' not supported "
|
assert str(excinfo.value) == ("Filter operator '?' not supported "
|
||||||
"for specified field: 'modified'")
|
"for specified property: 'modified'")
|
||||||
|
|
||||||
|
|
||||||
def test_filters5(ds):
|
def test_filters5(ds):
|
||||||
|
@ -417,6 +417,52 @@ def test_filters5(ds):
|
||||||
assert len(resp) == 1
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_filters6(ds):
|
||||||
|
# Test filtering on non-common property
|
||||||
|
resp = list(apply_common_filters(STIX_OBJS2, [Filter("name", "=", "Malicious site hosting downloader")]))
|
||||||
|
assert resp[0]['id'] == STIX_OBJS2[0]['id']
|
||||||
|
assert len(resp) == 3
|
||||||
|
|
||||||
|
|
||||||
|
def test_filters7(ds):
|
||||||
|
# Test filtering on embedded property
|
||||||
|
stix_objects = list(STIX_OBJS2) + [{
|
||||||
|
"type": "observed-data",
|
||||||
|
"id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
|
||||||
|
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
|
||||||
|
"created": "2016-04-06T19:58:16.000Z",
|
||||||
|
"modified": "2016-04-06T19:58:16.000Z",
|
||||||
|
"first_observed": "2015-12-21T19:00:00Z",
|
||||||
|
"last_observed": "2015-12-21T19:00:00Z",
|
||||||
|
"number_observed": 50,
|
||||||
|
"objects": {
|
||||||
|
"0": {
|
||||||
|
"type": "file",
|
||||||
|
"hashes": {
|
||||||
|
"SHA-256": "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100f"
|
||||||
|
},
|
||||||
|
"extensions": {
|
||||||
|
"pdf-ext": {
|
||||||
|
"version": "1.7",
|
||||||
|
"document_info_dict": {
|
||||||
|
"Title": "Sample document",
|
||||||
|
"Author": "Adobe Systems Incorporated",
|
||||||
|
"Creator": "Adobe FrameMaker 5.5.3 for Power Macintosh",
|
||||||
|
"Producer": "Acrobat Distiller 3.01 for Power Macintosh",
|
||||||
|
"CreationDate": "20070412090123-02"
|
||||||
|
},
|
||||||
|
"pdfid0": "DFCE52BD827ECF765649852119D",
|
||||||
|
"pdfid1": "57A1E0F9ED2AE523E313C"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}]
|
||||||
|
resp = list(apply_common_filters(stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")]))
|
||||||
|
assert resp[0]['id'] == stix_objects[3]['id']
|
||||||
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
|
||||||
def test_deduplicate(ds):
|
def test_deduplicate(ds):
|
||||||
unique = deduplicate(STIX_OBJS1)
|
unique = deduplicate(STIX_OBJS1)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue