From 3adf7800a8a4fa7b2d8e84c35360ed255caaeaf7 Mon Sep 17 00:00:00 2001 From: Michael Chisholm Date: Tue, 27 Nov 2018 17:36:17 -0500 Subject: [PATCH] Changed how filters work, with respect to datetime objects. Timestamp properties can now be checked against filter values which are either strings or datetime objects, using datetime semantics (previously, it reduced to a string compare). If a stix object property is datetime-valued and the filter value is a string, the string is parsed to a datetime object, rather than the other way around. Filtering in the filesystem store now parses JSON dicts to _STIXBase objects before applying the filters. Due to the parsing change, bad JSON content can produce a different kind of error, so I had to change one of the tests. --- stix2/datastore/filesystem.py | 70 +++++++++++++++---------- stix2/datastore/filters.py | 40 +++++++------- stix2/test/test_datastore_filesystem.py | 6 +-- 3 files changed, 64 insertions(+), 52 deletions(-) diff --git a/stix2/datastore/filesystem.py b/stix2/datastore/filesystem.py index 3b518b7..ac1bedc 100644 --- a/stix2/datastore/filesystem.py +++ b/stix2/datastore/filesystem.py @@ -267,42 +267,46 @@ def _get_matching_dir_entries(parent_dir, auth_set, st_mode_test=None, ext=""): return results -def _check_object_from_file(query, filepath): +def _check_object_from_file(query, filepath, allow_custom, version): """ Read a STIX object from the given file, and check it against the given filters. :param query: Iterable of filters :param filepath: Path to file to read - :return: The STIX object, as a dict, if the object passes the filters. If + :param allow_custom: Whether to allow custom properties as well unknown + custom objects. + :param version: Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, + use latest version. + :return: The (parsed) STIX object, if the object passes the filters. If not, None is returned. - :raises TypeError: If the file had invalid content + :raises TypeError: If the file had invalid JSON :raises IOError: If there are problems opening/reading the file + :raises stix2.exceptions.STIXError: If there were problems creating a STIX + object from the JSON """ try: with open(filepath, "r") as f: - stix_obj = json.load(f) + stix_json = json.load(f) - if stix_obj["type"] == "bundle": - stix_obj = stix_obj["objects"][0] - - # naive STIX type checking - stix_obj["type"] - stix_obj["id"] - - except (ValueError, KeyError): # likely not a JSON file + except ValueError: # not a JSON file raise TypeError( "STIX JSON object at '{0}' could either not be parsed " "to JSON or was not valid STIX JSON".format( filepath)) + stix_obj = parse(stix_json, allow_custom, version) + + if stix_obj["type"] == "bundle": + stix_obj = stix_obj["objects"][0] + # check against other filters, add if match result = next(apply_common_filters([stix_obj], query), None) return result -def _search_versioned(query, type_path, auth_ids): +def _search_versioned(query, type_path, auth_ids, allow_custom, version): """ Searches the given directory, which contains data for STIX objects of a particular versioned type (i.e. not markings), and return any which match @@ -311,8 +315,13 @@ def _search_versioned(query, type_path, auth_ids): :param query: The query to match against :param type_path: The directory with type-specific STIX object files :param auth_ids: Search optimization based on object ID + :param allow_custom: Whether to allow custom properties as well unknown + custom objects. + :param version: Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, + use latest version. :return: A list of all matching objects - :raises TypeError: If any objects had invalid content + :raises TypeError, stix2.exceptions.STIXError: If any objects had invalid + content :raises IOError, OSError: If there were any problems opening/reading files """ results = [] @@ -330,7 +339,8 @@ def _search_versioned(query, type_path, auth_ids): version_path = os.path.join(id_path, version_file) try: - stix_obj = _check_object_from_file(query, version_path) + stix_obj = _check_object_from_file(query, version_path, + allow_custom, version) if stix_obj: results.append(stix_obj) except IOError as e: @@ -346,7 +356,8 @@ def _search_versioned(query, type_path, auth_ids): id_path = os.path.join(type_path, id_file) try: - stix_obj = _check_object_from_file(query, id_path) + stix_obj = _check_object_from_file(query, id_path, allow_custom, + version) if stix_obj: results.append(stix_obj) except IOError as e: @@ -357,7 +368,7 @@ def _search_versioned(query, type_path, auth_ids): return results -def _search_markings(query, markings_path, auth_ids): +def _search_markings(query, markings_path, auth_ids, allow_custom, version): """ Searches the given directory, which contains markings data, and return any which match the query. @@ -365,8 +376,13 @@ def _search_markings(query, markings_path, auth_ids): :param query: The query to match against :param markings_path: The directory with STIX markings files :param auth_ids: Search optimization based on object ID + :param allow_custom: Whether to allow custom properties as well unknown + custom objects. + :param version: Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, + use latest version. :return: A list of all matching objects - :raises TypeError: If any objects had invalid content + :raises TypeError, stix2.exceptions.STIXError: If any objects had invalid + content :raises IOError, OSError: If there were any problems opening/reading files """ results = [] @@ -376,7 +392,8 @@ def _search_markings(query, markings_path, auth_ids): id_path = os.path.join(markings_path, id_file) try: - stix_obj = _check_object_from_file(query, id_path) + stix_obj = _check_object_from_file(query, id_path, allow_custom, + version) if stix_obj: results.append(stix_obj) except IOError as e: @@ -627,17 +644,12 @@ class FileSystemSource(DataSource): type_path = os.path.join(self._stix_dir, type_dir) if type_dir == "marking-definition": - type_results = _search_markings(query, type_path, auth_ids) + type_results = _search_markings(query, type_path, auth_ids, + self.allow_custom, version) else: - type_results = _search_versioned(query, type_path, auth_ids) + type_results = _search_versioned(query, type_path, auth_ids, + self.allow_custom, version) all_data.extend(type_results) - # parse python STIX objects from the STIX object dicts - stix_objs = [ - parse(stix_obj_dict, allow_custom=self.allow_custom, - version=version) - for stix_obj_dict in all_data - ] - - return stix_objs + return all_data diff --git a/stix2/datastore/filters.py b/stix2/datastore/filters.py index 0172a50..219f99a 100644 --- a/stix2/datastore/filters.py +++ b/stix2/datastore/filters.py @@ -5,6 +5,8 @@ Filters for Python STIX 2.0 DataSources, DataSinks, DataStores import collections from datetime import datetime +import six +import stix2.utils from stix2.utils import format_datetime @@ -12,7 +14,7 @@ from stix2.utils import format_datetime FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=', 'contains'] """Supported filter value types""" -FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple] +FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple, datetime] try: FILTER_VALUE_TYPES.append(unicode) except NameError: @@ -69,10 +71,6 @@ class Filter(collections.namedtuple("Filter", ['property', 'op', 'value'])): if isinstance(value, list): value = tuple(value) - if isinstance(value, datetime): - # if value is a datetime obj, convert to str - value = format_datetime(value) - _check_filter_components(prop, op, value) self = super(Filter, cls).__new__(cls, prop, op, value) @@ -88,31 +86,33 @@ class Filter(collections.namedtuple("Filter", ['property', 'op', 'value'])): True if property matches the filter, False otherwise. """ - if isinstance(stix_obj_property, datetime): - # if a datetime obj, convert to str format before comparison - # NOTE: this check seems like it should be done upstream - # but will put here for now - stix_obj_property = format_datetime(stix_obj_property) + # If filtering on a timestamp property and the filter value is a string, + # try to convert the filter value to a datetime instance. + if isinstance(stix_obj_property, datetime) and \ + isinstance(self.value, six.string_types): + filter_value = stix2.utils.parse_into_datetime(stix_obj_property) + else: + filter_value = self.value if self.op == "=": - return stix_obj_property == self.value + return stix_obj_property == filter_value elif self.op == "!=": - return stix_obj_property != self.value + return stix_obj_property != filter_value elif self.op == "in": - return stix_obj_property in self.value + return stix_obj_property in filter_value elif self.op == "contains": - if isinstance(self.value, dict): - return self.value in stix_obj_property.values() + if isinstance(filter_value, dict): + return filter_value in stix_obj_property.values() else: - return self.value in stix_obj_property + return filter_value in stix_obj_property elif self.op == ">": - return stix_obj_property > self.value + return stix_obj_property > filter_value elif self.op == "<": - return stix_obj_property < self.value + return stix_obj_property < filter_value elif self.op == ">=": - return stix_obj_property >= self.value + return stix_obj_property >= filter_value elif self.op == "<=": - return stix_obj_property <= self.value + return stix_obj_property <= filter_value else: raise ValueError("Filter operator: {0} not supported for specified property: {1}".format(self.op, self.property)) diff --git a/stix2/test/test_datastore_filesystem.py b/stix2/test/test_datastore_filesystem.py index 7d29127..1c51fa1 100644 --- a/stix2/test/test_datastore_filesystem.py +++ b/stix2/test/test_datastore_filesystem.py @@ -15,6 +15,7 @@ from stix2 import (Bundle, Campaign, CustomObject, FileSystemSink, from stix2.datastore.filesystem import (AuthSet, _find_search_optimizations, _get_matching_dir_entries, _timestamp2filename) +from stix2.exceptions import STIXError from stix2.test.constants import (CAMPAIGN_ID, CAMPAIGN_KWARGS, IDENTITY_ID, IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS, MALWARE_ID, MALWARE_KWARGS, @@ -148,9 +149,8 @@ def test_filesystem_source_bad_stix_file(fs_source, bad_stix_files): # this tests handling of bad STIX json object try: fs_source.get("intrusion-set--test-non-stix") - except TypeError as e: - assert "intrusion-set--test-non-stix" in str(e) - assert "could either not be parsed to JSON or was not valid STIX JSON" in str(e) + except STIXError as e: + assert "Can't parse object with no 'type' property" in str(e) def test_filesystem_source_get_object(fs_source):