Implemented clenk's suggested changes in multi-version filesystem
store: - Use utils.get_type_from_id() instead of my own (I didn't know it was already there) - Use dict-style instead of attribute-style access to get stix object properties - Convert timezone-aware timestamps to UTC in _timestamp2filename() to ensure that different times always result in different filenames. Also added a couple new tests for _timestamp2filename(), which exercises the timezone conversion code.revert-222-multi_version_filesystem_store
parent
51668a9a04
commit
ee57596d6a
|
@ -6,6 +6,7 @@ Python STIX 2.0 FileSystem Source/Sink
|
|||
import errno
|
||||
import json
|
||||
import os
|
||||
import pytz
|
||||
import stat
|
||||
|
||||
import six
|
||||
|
@ -14,7 +15,7 @@ from stix2.base import _STIXBase
|
|||
from stix2.core import Bundle, parse
|
||||
from stix2.datastore import DataSink, DataSource, DataStoreMixin
|
||||
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
|
||||
from stix2.utils import deduplicate, is_marking
|
||||
from stix2.utils import is_marking, get_type_from_id
|
||||
|
||||
|
||||
def _timestamp2filename(timestamp):
|
||||
|
@ -25,20 +26,14 @@ def _timestamp2filename(timestamp):
|
|||
:param timestamp: A timestamp, as a datetime.datetime object.
|
||||
"""
|
||||
# Different times will only produce different file names if all timestamps
|
||||
# are in the same time zone! (Should I convert to UTC just to be safe?)
|
||||
# are in the same time zone! So if timestamp is timezone-aware convert
|
||||
# to UTC just to be safe. If naive, just use as-is.
|
||||
if timestamp.tzinfo is not None:
|
||||
timestamp = timestamp.astimezone(pytz.utc)
|
||||
|
||||
return timestamp.strftime("%Y%m%d%H%M%S%f")
|
||||
|
||||
|
||||
def _type_from_id(id_):
|
||||
"""Extract the type from a STIX identifier"""
|
||||
dd_idx = id_.find("--")
|
||||
if dd_idx == -1:
|
||||
raise Exception(
|
||||
"Invalid ID: {}. Must have format <type>--<uuid>.".format(id_)
|
||||
)
|
||||
return id_[:dd_idx]
|
||||
|
||||
|
||||
class AuthSet(object):
|
||||
"""
|
||||
Represents either a whitelist or blacklist of values, where/what we
|
||||
|
@ -164,13 +159,13 @@ def _find_search_optimizations(filters):
|
|||
# contain types within them.
|
||||
allowed_ids = _update_allow(allowed_ids, filter_.value)
|
||||
allowed_types = _update_allow(allowed_types,
|
||||
_type_from_id(filter_.value))
|
||||
get_type_from_id(filter_.value))
|
||||
elif filter_.op == "!=":
|
||||
prohibited_ids.add(filter_.value)
|
||||
elif filter_.op == "in":
|
||||
allowed_ids = _update_allow(allowed_ids, filter_.value)
|
||||
allowed_types = _update_allow(allowed_types, (
|
||||
_type_from_id(id_) for id_ in filter_.value
|
||||
get_type_from_id(id_) for id_ in filter_.value
|
||||
))
|
||||
|
||||
opt_types = AuthSet(allowed_types, prohibited_types)
|
||||
|
@ -185,12 +180,12 @@ def _find_search_optimizations(filters):
|
|||
opt_ids.auth_type == AuthSet.WHITE:
|
||||
|
||||
opt_types.values.intersection_update(
|
||||
_type_from_id(id_) for id_ in opt_ids.values
|
||||
get_type_from_id(id_) for id_ in opt_ids.values
|
||||
)
|
||||
|
||||
opt_ids.values.intersection_update(
|
||||
id_ for id_ in opt_ids.values
|
||||
if _type_from_id(id_) in opt_types.values
|
||||
if get_type_from_id(id_) in opt_types.values
|
||||
)
|
||||
|
||||
return opt_types, opt_ids
|
||||
|
@ -433,10 +428,10 @@ class FileSystemSink(DataSink):
|
|||
"""
|
||||
type_dir = os.path.join(self._stix_dir, stix_obj["type"])
|
||||
if is_marking(stix_obj):
|
||||
filename = stix_obj.id
|
||||
filename = stix_obj["id"]
|
||||
obj_dir = type_dir
|
||||
else:
|
||||
filename = _timestamp2filename(stix_obj.modified)
|
||||
filename = _timestamp2filename(stix_obj["modified"])
|
||||
obj_dir = os.path.join(type_dir, stix_obj["id"])
|
||||
|
||||
file_path = os.path.join(obj_dir, filename + ".json")
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
import datetime
|
||||
import errno
|
||||
import json
|
||||
import os
|
||||
import pytz
|
||||
import shutil
|
||||
import stat
|
||||
|
||||
|
@ -926,3 +928,26 @@ def test_search_auth_set_black_empty(rel_fs_store):
|
|||
"relationship",
|
||||
"tool"
|
||||
}
|
||||
|
||||
|
||||
def test_timestamp2filename_naive():
|
||||
dt = datetime.datetime(
|
||||
2010, 6, 15,
|
||||
8, 30, 10, 1234
|
||||
)
|
||||
|
||||
filename = _timestamp2filename(dt)
|
||||
assert filename == "20100615083010001234"
|
||||
|
||||
|
||||
def test_timestamp2filename_tz():
|
||||
# one hour west of UTC (i.e. an hour earlier)
|
||||
tz = pytz.FixedOffset(-60)
|
||||
dt = datetime.datetime(
|
||||
2010, 6, 15,
|
||||
7, 30, 10, 1234,
|
||||
tz
|
||||
)
|
||||
|
||||
filename = _timestamp2filename(dt)
|
||||
assert filename == "20100615083010001234"
|
||||
|
|
Loading…
Reference in New Issue