Removed the old FileSystemSource.query method. I'd renamed it

"query2" and forgot about it and left it there...
revert-222-multi_version_filesystem_store
Michael Chisholm 2018-10-26 10:40:07 -04:00
parent 9486b46f77
commit 51668a9a04
1 changed files with 0 additions and 134 deletions

View File

@ -562,140 +562,6 @@ class FileSystemSource(DataSource):
query = [Filter("id", "=", stix_id)]
return self.query(query, version=version, _composite_filters=_composite_filters)
def query2(self, query=None, version=None, _composite_filters=None):
"""Search and retrieve STIX objects based on the complete query.
A "complete query" includes the filters from the query, the filters
attached to this FileSystemSource, and any filters passed from a
CompositeDataSource (i.e. _composite_filters).
Args:
query (list): list of filters to search on
_composite_filters (FilterSet): collection of filters passed from the
CompositeDataSource, not user supplied
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
(list): list of STIX objects that matches the supplied
query. The STIX objects are loaded from their json files,
parsed into a python STIX objects and then returned.
"""
all_data = []
query = FilterSet(query)
# combine all query filters
if self.filters:
query.add(self.filters)
if _composite_filters:
query.add(_composite_filters)
# extract any filters that are for "type" or "id" , as we can then do
# filtering before reading in the STIX objects. A STIX 'type' filter
# can reduce the query to a single sub-directory. A STIX 'id' filter
# allows for the fast checking of the file names versus loading it.
file_filters = self._parse_file_filters(query)
# establish which subdirectories can be avoided in query
# by decluding as many as possible. A filter with "type" as the property
# means that certain STIX object types can be ruled out, and thus
# the corresponding subdirectories as well
include_paths = []
declude_paths = []
if "type" in [filter.property for filter in file_filters]:
for filter in file_filters:
if filter.property == "type":
if filter.op == "=":
include_paths.append(os.path.join(self._stix_dir, filter.value))
elif filter.op == "!=":
declude_paths.append(os.path.join(self._stix_dir, filter.value))
else:
# have to walk entire STIX directory
include_paths.append(self._stix_dir)
# if a user specifies a "type" filter like "type = <stix-object_type>",
# the filter is reducing the search space to single stix object types
# (and thus single directories). This makes such a filter more powerful
# than "type != <stix-object_type>" bc the latter is substracting
# only one type of stix object type (and thus only one directory),
# As such the former type of filters are given preference over the latter;
# i.e. if both exist in a query, that latter type will be ignored
if not include_paths:
# user has specified types that are not wanted (i.e. "!=")
# so query will look in all STIX directories that are not
# the specified type. Compile correct dir paths
for dir in os.listdir(self._stix_dir):
if os.path.abspath(os.path.join(self._stix_dir, dir)) not in declude_paths:
include_paths.append(os.path.abspath(os.path.join(self._stix_dir, dir)))
# grab stix object ID as well - if present in filters, as
# may forgo the loading of STIX content into memory
if "id" in [filter.property for filter in file_filters]:
for filter in file_filters:
if filter.property == "id" and filter.op == "=":
id_ = filter.value
break
else:
id_ = None
else:
id_ = None
# now iterate through all STIX objs
for path in include_paths:
for root, dirs, files in os.walk(path):
for file_ in files:
if not file_.endswith(".json"):
# skip non '.json' files as more likely to be random non-STIX files
continue
if not id_ or id_ == file_.split(".")[0]:
# have to load into memory regardless to evaluate other filters
try:
stix_obj = json.load(open(os.path.join(root, file_)))
if stix_obj["type"] == "bundle":
stix_obj = stix_obj["objects"][0]
# naive STIX type checking
stix_obj["type"]
stix_obj["id"]
except (ValueError, KeyError): # likely not a JSON file
raise TypeError("STIX JSON object at '{0}' could either not be parsed to "
"JSON or was not valid STIX JSON".format(os.path.join(root, file_)))
# check against other filters, add if match
all_data.extend(apply_common_filters([stix_obj], query))
all_data = deduplicate(all_data)
# parse python STIX objects from the STIX object dicts
stix_objs = [parse(stix_obj_dict, allow_custom=self.allow_custom, version=version) for stix_obj_dict in all_data]
return stix_objs
def _parse_file_filters(self, query):
"""Extract STIX common filters.
Possibly speeds up querying STIX objects from the file system.
Extracts filters that are for the "id" and "type" property of
a STIX object. As the file directory is organized by STIX
object type with filenames that are equivalent to the STIX
object ID, these filters can be used first to reduce the
search space of a FileSystemStore (or FileSystemSink).
"""
file_filters = []
for filter_ in query:
if filter_.property == "id" or filter_.property == "type":
file_filters.append(filter_)
return file_filters
def query(self, query=None, version=None, _composite_filters=None):
"""Search and retrieve STIX objects based on the complete query.