diff --git a/stix2/datastore/filesystem.py b/stix2/datastore/filesystem.py index 94978b0..07011f7 100644 --- a/stix2/datastore/filesystem.py +++ b/stix2/datastore/filesystem.py @@ -562,140 +562,6 @@ class FileSystemSource(DataSource): query = [Filter("id", "=", stix_id)] return self.query(query, version=version, _composite_filters=_composite_filters) - def query2(self, query=None, version=None, _composite_filters=None): - """Search and retrieve STIX objects based on the complete query. - - A "complete query" includes the filters from the query, the filters - attached to this FileSystemSource, and any filters passed from a - CompositeDataSource (i.e. _composite_filters). - - Args: - query (list): list of filters to search on - _composite_filters (FilterSet): collection of filters passed from the - CompositeDataSource, not user supplied - version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If - None, use latest version. - - Returns: - (list): list of STIX objects that matches the supplied - query. The STIX objects are loaded from their json files, - parsed into a python STIX objects and then returned. - - """ - - all_data = [] - - query = FilterSet(query) - - # combine all query filters - if self.filters: - query.add(self.filters) - if _composite_filters: - query.add(_composite_filters) - - # extract any filters that are for "type" or "id" , as we can then do - # filtering before reading in the STIX objects. A STIX 'type' filter - # can reduce the query to a single sub-directory. A STIX 'id' filter - # allows for the fast checking of the file names versus loading it. - file_filters = self._parse_file_filters(query) - - # establish which subdirectories can be avoided in query - # by decluding as many as possible. A filter with "type" as the property - # means that certain STIX object types can be ruled out, and thus - # the corresponding subdirectories as well - include_paths = [] - declude_paths = [] - if "type" in [filter.property for filter in file_filters]: - for filter in file_filters: - if filter.property == "type": - if filter.op == "=": - include_paths.append(os.path.join(self._stix_dir, filter.value)) - elif filter.op == "!=": - declude_paths.append(os.path.join(self._stix_dir, filter.value)) - else: - # have to walk entire STIX directory - include_paths.append(self._stix_dir) - - # if a user specifies a "type" filter like "type = ", - # the filter is reducing the search space to single stix object types - # (and thus single directories). This makes such a filter more powerful - # than "type != " bc the latter is substracting - # only one type of stix object type (and thus only one directory), - # As such the former type of filters are given preference over the latter; - # i.e. if both exist in a query, that latter type will be ignored - - if not include_paths: - # user has specified types that are not wanted (i.e. "!=") - # so query will look in all STIX directories that are not - # the specified type. Compile correct dir paths - for dir in os.listdir(self._stix_dir): - if os.path.abspath(os.path.join(self._stix_dir, dir)) not in declude_paths: - include_paths.append(os.path.abspath(os.path.join(self._stix_dir, dir))) - - # grab stix object ID as well - if present in filters, as - # may forgo the loading of STIX content into memory - if "id" in [filter.property for filter in file_filters]: - for filter in file_filters: - if filter.property == "id" and filter.op == "=": - id_ = filter.value - break - else: - id_ = None - else: - id_ = None - - # now iterate through all STIX objs - for path in include_paths: - for root, dirs, files in os.walk(path): - for file_ in files: - if not file_.endswith(".json"): - # skip non '.json' files as more likely to be random non-STIX files - continue - - if not id_ or id_ == file_.split(".")[0]: - # have to load into memory regardless to evaluate other filters - try: - stix_obj = json.load(open(os.path.join(root, file_))) - - if stix_obj["type"] == "bundle": - stix_obj = stix_obj["objects"][0] - - # naive STIX type checking - stix_obj["type"] - stix_obj["id"] - - except (ValueError, KeyError): # likely not a JSON file - raise TypeError("STIX JSON object at '{0}' could either not be parsed to " - "JSON or was not valid STIX JSON".format(os.path.join(root, file_))) - - # check against other filters, add if match - all_data.extend(apply_common_filters([stix_obj], query)) - - all_data = deduplicate(all_data) - - # parse python STIX objects from the STIX object dicts - stix_objs = [parse(stix_obj_dict, allow_custom=self.allow_custom, version=version) for stix_obj_dict in all_data] - - return stix_objs - - def _parse_file_filters(self, query): - """Extract STIX common filters. - - Possibly speeds up querying STIX objects from the file system. - - Extracts filters that are for the "id" and "type" property of - a STIX object. As the file directory is organized by STIX - object type with filenames that are equivalent to the STIX - object ID, these filters can be used first to reduce the - search space of a FileSystemStore (or FileSystemSink). - - """ - file_filters = [] - for filter_ in query: - if filter_.property == "id" or filter_.property == "type": - file_filters.append(filter_) - return file_filters - def query(self, query=None, version=None, _composite_filters=None): """Search and retrieve STIX objects based on the complete query.