Add some backward-compatibility to filesystem store: versioned

objects are searched for as ID-named json files in the type
directories, in addition to timestamp-named files in ID
directories.

Made a bugfix: fixed improper exception re-raises

Made an efficiency improvement: don't stat() files in
_get_matching_dir_entries() if no st_mode_test callable is given.
master
Michael Chisholm 2018-11-23 15:55:05 -05:00
parent f615161110
commit 63166ab256
1 changed files with 36 additions and 10 deletions

View File

@ -223,14 +223,19 @@ def _get_matching_dir_entries(parent_dir, auth_set, st_mode_test=None, ext=""):
results = []
if auth_set.auth_type == AuthSet.WHITE:
for value in auth_set.values:
filename = value + ext
try:
filename = value + ext
s = os.stat(os.path.join(parent_dir, filename))
if not st_mode_test or st_mode_test(s.st_mode):
if st_mode_test:
s = os.stat(os.path.join(parent_dir, filename))
type_pass = st_mode_test(s.st_mode)
else:
type_pass = True
if type_pass:
results.append(filename)
except OSError as e:
if e.errno != errno.ENOENT:
raise e
raise
# else, file-not-found is ok, just skip
else: # auth_set is a blacklist
@ -246,12 +251,17 @@ def _get_matching_dir_entries(parent_dir, auth_set, st_mode_test=None, ext=""):
continue
try:
s = os.stat(os.path.join(parent_dir, entry))
if not st_mode_test or st_mode_test(s.st_mode):
if st_mode_test:
s = os.stat(os.path.join(parent_dir, entry))
type_pass = st_mode_test(s.st_mode)
else:
type_pass = True
if type_pass:
results.append(entry)
except OSError as e:
if e.errno != errno.ENOENT:
raise e
raise
# else, file-not-found is ok, just skip
return results
@ -325,9 +335,25 @@ def _search_versioned(query, type_path, auth_ids):
results.append(stix_obj)
except IOError as e:
if e.errno != errno.ENOENT:
raise e
raise
# else, file-not-found is ok, just skip
# For backward-compatibility, also search for plain files named after
# object IDs, in the type directory.
id_files = _get_matching_dir_entries(type_path, auth_ids, stat.S_ISREG,
".json")
for id_file in id_files:
id_path = os.path.join(type_path, id_file)
try:
stix_obj = _check_object_from_file(query, id_path)
if stix_obj:
results.append(stix_obj)
except IOError as e:
if e.errno != errno.ENOENT:
raise
# else, file-not-found is ok, just skip
return results
@ -341,7 +367,7 @@ def _search_markings(query, markings_path, auth_ids):
:param auth_ids: Search optimization based on object ID
:return: A list of all matching objects
:raises TypeError: If any objects had invalid content
:raises IOError: If there were any problems opening/reading files
:raises IOError, OSError: If there were any problems opening/reading files
"""
results = []
id_files = _get_matching_dir_entries(markings_path, auth_ids, stat.S_ISREG,
@ -355,7 +381,7 @@ def _search_markings(query, markings_path, auth_ids):
results.append(stix_obj)
except IOError as e:
if e.errno != errno.ENOENT:
raise e
raise
# else, file-not-found is ok, just skip
return results