Changed how filters work, with respect to datetime objects.

Timestamp properties can now be checked against filter values
which are either strings or datetime objects, using datetime
semantics (previously, it reduced to a string compare).
If a stix object property is datetime-valued and the filter
value is a string, the string is parsed to a datetime object,
rather than the other way around.

Filtering in the filesystem store now parses JSON dicts to
_STIXBase objects before applying the filters.

Due to the parsing change, bad JSON content can produce a
different kind of error, so I had to change one of the tests.
master
Michael Chisholm 2018-11-27 17:36:17 -05:00
parent d8a775c60d
commit 3adf7800a8
3 changed files with 64 additions and 52 deletions

View File

@ -267,42 +267,46 @@ def _get_matching_dir_entries(parent_dir, auth_set, st_mode_test=None, ext=""):
return results
def _check_object_from_file(query, filepath):
def _check_object_from_file(query, filepath, allow_custom, version):
"""
Read a STIX object from the given file, and check it against the given
filters.
:param query: Iterable of filters
:param filepath: Path to file to read
:return: The STIX object, as a dict, if the object passes the filters. If
:param allow_custom: Whether to allow custom properties as well unknown
custom objects.
:param version: Which STIX2 version to use. (e.g. "2.0", "2.1"). If None,
use latest version.
:return: The (parsed) STIX object, if the object passes the filters. If
not, None is returned.
:raises TypeError: If the file had invalid content
:raises TypeError: If the file had invalid JSON
:raises IOError: If there are problems opening/reading the file
:raises stix2.exceptions.STIXError: If there were problems creating a STIX
object from the JSON
"""
try:
with open(filepath, "r") as f:
stix_obj = json.load(f)
stix_json = json.load(f)
if stix_obj["type"] == "bundle":
stix_obj = stix_obj["objects"][0]
# naive STIX type checking
stix_obj["type"]
stix_obj["id"]
except (ValueError, KeyError): # likely not a JSON file
except ValueError: # not a JSON file
raise TypeError(
"STIX JSON object at '{0}' could either not be parsed "
"to JSON or was not valid STIX JSON".format(
filepath))
stix_obj = parse(stix_json, allow_custom, version)
if stix_obj["type"] == "bundle":
stix_obj = stix_obj["objects"][0]
# check against other filters, add if match
result = next(apply_common_filters([stix_obj], query), None)
return result
def _search_versioned(query, type_path, auth_ids):
def _search_versioned(query, type_path, auth_ids, allow_custom, version):
"""
Searches the given directory, which contains data for STIX objects of a
particular versioned type (i.e. not markings), and return any which match
@ -311,8 +315,13 @@ def _search_versioned(query, type_path, auth_ids):
:param query: The query to match against
:param type_path: The directory with type-specific STIX object files
:param auth_ids: Search optimization based on object ID
:param allow_custom: Whether to allow custom properties as well unknown
custom objects.
:param version: Which STIX2 version to use. (e.g. "2.0", "2.1"). If None,
use latest version.
:return: A list of all matching objects
:raises TypeError: If any objects had invalid content
:raises TypeError, stix2.exceptions.STIXError: If any objects had invalid
content
:raises IOError, OSError: If there were any problems opening/reading files
"""
results = []
@ -330,7 +339,8 @@ def _search_versioned(query, type_path, auth_ids):
version_path = os.path.join(id_path, version_file)
try:
stix_obj = _check_object_from_file(query, version_path)
stix_obj = _check_object_from_file(query, version_path,
allow_custom, version)
if stix_obj:
results.append(stix_obj)
except IOError as e:
@ -346,7 +356,8 @@ def _search_versioned(query, type_path, auth_ids):
id_path = os.path.join(type_path, id_file)
try:
stix_obj = _check_object_from_file(query, id_path)
stix_obj = _check_object_from_file(query, id_path, allow_custom,
version)
if stix_obj:
results.append(stix_obj)
except IOError as e:
@ -357,7 +368,7 @@ def _search_versioned(query, type_path, auth_ids):
return results
def _search_markings(query, markings_path, auth_ids):
def _search_markings(query, markings_path, auth_ids, allow_custom, version):
"""
Searches the given directory, which contains markings data, and return any
which match the query.
@ -365,8 +376,13 @@ def _search_markings(query, markings_path, auth_ids):
:param query: The query to match against
:param markings_path: The directory with STIX markings files
:param auth_ids: Search optimization based on object ID
:param allow_custom: Whether to allow custom properties as well unknown
custom objects.
:param version: Which STIX2 version to use. (e.g. "2.0", "2.1"). If None,
use latest version.
:return: A list of all matching objects
:raises TypeError: If any objects had invalid content
:raises TypeError, stix2.exceptions.STIXError: If any objects had invalid
content
:raises IOError, OSError: If there were any problems opening/reading files
"""
results = []
@ -376,7 +392,8 @@ def _search_markings(query, markings_path, auth_ids):
id_path = os.path.join(markings_path, id_file)
try:
stix_obj = _check_object_from_file(query, id_path)
stix_obj = _check_object_from_file(query, id_path, allow_custom,
version)
if stix_obj:
results.append(stix_obj)
except IOError as e:
@ -627,17 +644,12 @@ class FileSystemSource(DataSource):
type_path = os.path.join(self._stix_dir, type_dir)
if type_dir == "marking-definition":
type_results = _search_markings(query, type_path, auth_ids)
type_results = _search_markings(query, type_path, auth_ids,
self.allow_custom, version)
else:
type_results = _search_versioned(query, type_path, auth_ids)
type_results = _search_versioned(query, type_path, auth_ids,
self.allow_custom, version)
all_data.extend(type_results)
# parse python STIX objects from the STIX object dicts
stix_objs = [
parse(stix_obj_dict, allow_custom=self.allow_custom,
version=version)
for stix_obj_dict in all_data
]
return stix_objs
return all_data

View File

@ -5,6 +5,8 @@ Filters for Python STIX 2.0 DataSources, DataSinks, DataStores
import collections
from datetime import datetime
import six
import stix2.utils
from stix2.utils import format_datetime
@ -12,7 +14,7 @@ from stix2.utils import format_datetime
FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=', 'contains']
"""Supported filter value types"""
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple]
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple, datetime]
try:
FILTER_VALUE_TYPES.append(unicode)
except NameError:
@ -69,10 +71,6 @@ class Filter(collections.namedtuple("Filter", ['property', 'op', 'value'])):
if isinstance(value, list):
value = tuple(value)
if isinstance(value, datetime):
# if value is a datetime obj, convert to str
value = format_datetime(value)
_check_filter_components(prop, op, value)
self = super(Filter, cls).__new__(cls, prop, op, value)
@ -88,31 +86,33 @@ class Filter(collections.namedtuple("Filter", ['property', 'op', 'value'])):
True if property matches the filter,
False otherwise.
"""
if isinstance(stix_obj_property, datetime):
# if a datetime obj, convert to str format before comparison
# NOTE: this check seems like it should be done upstream
# but will put here for now
stix_obj_property = format_datetime(stix_obj_property)
# If filtering on a timestamp property and the filter value is a string,
# try to convert the filter value to a datetime instance.
if isinstance(stix_obj_property, datetime) and \
isinstance(self.value, six.string_types):
filter_value = stix2.utils.parse_into_datetime(stix_obj_property)
else:
filter_value = self.value
if self.op == "=":
return stix_obj_property == self.value
return stix_obj_property == filter_value
elif self.op == "!=":
return stix_obj_property != self.value
return stix_obj_property != filter_value
elif self.op == "in":
return stix_obj_property in self.value
return stix_obj_property in filter_value
elif self.op == "contains":
if isinstance(self.value, dict):
return self.value in stix_obj_property.values()
if isinstance(filter_value, dict):
return filter_value in stix_obj_property.values()
else:
return self.value in stix_obj_property
return filter_value in stix_obj_property
elif self.op == ">":
return stix_obj_property > self.value
return stix_obj_property > filter_value
elif self.op == "<":
return stix_obj_property < self.value
return stix_obj_property < filter_value
elif self.op == ">=":
return stix_obj_property >= self.value
return stix_obj_property >= filter_value
elif self.op == "<=":
return stix_obj_property <= self.value
return stix_obj_property <= filter_value
else:
raise ValueError("Filter operator: {0} not supported for specified property: {1}".format(self.op, self.property))

View File

@ -15,6 +15,7 @@ from stix2 import (Bundle, Campaign, CustomObject, FileSystemSink,
from stix2.datastore.filesystem import (AuthSet, _find_search_optimizations,
_get_matching_dir_entries,
_timestamp2filename)
from stix2.exceptions import STIXError
from stix2.test.constants import (CAMPAIGN_ID, CAMPAIGN_KWARGS, IDENTITY_ID,
IDENTITY_KWARGS, INDICATOR_ID,
INDICATOR_KWARGS, MALWARE_ID, MALWARE_KWARGS,
@ -148,9 +149,8 @@ def test_filesystem_source_bad_stix_file(fs_source, bad_stix_files):
# this tests handling of bad STIX json object
try:
fs_source.get("intrusion-set--test-non-stix")
except TypeError as e:
assert "intrusion-set--test-non-stix" in str(e)
assert "could either not be parsed to JSON or was not valid STIX JSON" in str(e)
except STIXError as e:
assert "Can't parse object with no 'type' property" in str(e)
def test_filesystem_source_get_object(fs_source):