Merge pull request #240 from khdesai/fix_issue_232

Fix issue 232, raise DataSourceError when FileSystemStore attempts to overwrite an existing file
master
Emmanuelle Vargas-Gonzalez 2019-01-11 11:10:42 -05:00 committed by GitHub
commit e1356437fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 7 deletions

View File

@ -1,21 +1,19 @@
"""Python STIX2 FileSystem Source/Sink"""
# Temporary while we address TODO statement
from __future__ import print_function
import errno
import io
import json
import os
import re
import stat
import sys
import six
from stix2 import v20, v21
from stix2.base import _STIXBase
from stix2.core import parse
from stix2.datastore import DataSink, DataSource, DataStoreMixin
from stix2.datastore import (
DataSink, DataSource, DataSourceError, DataStoreMixin,
)
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
from stix2.utils import format_datetime, get_type_from_id, is_marking
@ -544,9 +542,8 @@ class FileSystemSink(DataSink):
else:
stix_obj = v20.Bundle(stix_obj, allow_custom=self.allow_custom)
# TODO: Better handling of the overwriting case.
if os.path.isfile(file_path):
print("Attempted to overwrite file!", file_path, file=sys.stderr)
raise DataSourceError("Attempted to overwrite file (!) at: {}".format(file_path))
else:
with io.open(file_path, 'w', encoding=encoding) as f:
stix_obj = stix_obj.serialize(pretty=True, encoding=encoding, ensure_ascii=False)

View File

@ -9,6 +9,7 @@ import pytest
import pytz
import stix2
from stix2.datastore import DataSourceError
from stix2.datastore.filesystem import (
AuthSet, _find_search_optimizations, _get_matching_dir_entries,
_timestamp2filename,
@ -420,6 +421,33 @@ def test_filesystem_sink_add_objects_list(fs_sink, fs_source):
os.remove(camp7filepath)
def test_filesystem_attempt_stix_file_overwrite(fs_store):
# add python stix object
camp8 = stix2.v20.Campaign(
name="George Washington",
objective="Create an awesome country",
aliases=["Georgey"],
)
fs_store.add(camp8)
camp8_r = fs_store.get(camp8.id)
assert camp8_r.id == camp8_r.id
assert camp8_r.name == camp8.name
filepath = os.path.join(
FS_PATH, "campaign", camp8_r.id,
_timestamp2filename(camp8_r.modified) + ".json",
)
# Now attempt to overwrite the existing file
with pytest.raises(DataSourceError) as excinfo:
fs_store.add(camp8)
assert "Attempted to overwrite file" in str(excinfo)
os.remove(filepath)
def test_filesystem_sink_marking(fs_sink):
marking = stix2.v20.MarkingDefinition(
definition_type="tlp",