From f10308443988e8195b94132a2c8c985f2a25567a Mon Sep 17 00:00:00 2001 From: Chris Lenk Date: Fri, 27 Oct 2017 12:23:57 -0400 Subject: [PATCH] Add `bundlify` parameter to FileSystemStore/Sink This brings back the option (disabled by default) to wrap objects in a bundle when saving them to the filesystem. --- stix2/sources/filesystem.py | 41 +++++++++++++++++++++-------------- stix2/test/test_bundle.py | 1 - stix2/test/test_filesystem.py | 18 +++++++++++++++ 3 files changed, 43 insertions(+), 17 deletions(-) diff --git a/stix2/sources/filesystem.py b/stix2/sources/filesystem.py index 1a8366b..9fd658d 100644 --- a/stix2/sources/filesystem.py +++ b/stix2/sources/filesystem.py @@ -25,17 +25,18 @@ class FileSystemStore(DataStore): Args: stix_dir (str): path to directory of STIX objects + bundlify (bool): Whether to wrap objects in bundles when saving them. + Default: False. Attributes: source (FileSystemSource): FuleSystemSource - sink (FileSystemSink): FileSystemSink """ - def __init__(self, stix_dir): + def __init__(self, stix_dir, bundlify=False): super(FileSystemStore, self).__init__() self.source = FileSystemSource(stix_dir=stix_dir) - self.sink = FileSystemSink(stix_dir=stix_dir) + self.sink = FileSystemSink(stix_dir=stix_dir, bundlify=bundlify) class FileSystemSink(DataSink): @@ -46,12 +47,15 @@ class FileSystemSink(DataSink): components of a FileSystemStore. Args: - stix_dir (str): path to directory of STIX objects + stix_dir (str): path to directory of STIX objects. + bundlify (bool): Whether to wrap objects in bundles when saving them. + Default: False. """ - def __init__(self, stix_dir): + def __init__(self, stix_dir, bundlify=False): super(FileSystemSink, self).__init__() self._stix_dir = os.path.abspath(stix_dir) + self.bundlify = bundlify if not os.path.exists(self._stix_dir): raise ValueError("directory path for STIX data does not exist") @@ -60,6 +64,20 @@ class FileSystemSink(DataSink): def stix_dir(self): return self._stix_dir + def _check_path_and_write(self, stix_obj): + """Write the given STIX object to a file in the STIX file directory. + """ + path = os.path.join(self._stix_dir, stix_obj["type"], stix_obj["id"] + ".json") + + if not os.path.exists(os.path.dirname(path)): + os.makedirs(os.path.dirname(path)) + + if self.bundlify: + stix_obj = Bundle(stix_obj) + + with open(path, "w") as f: + f.write(str(stix_obj)) + def add(self, stix_data=None, allow_custom=False): """Add STIX objects to file directory. @@ -76,18 +94,9 @@ class FileSystemSink(DataSink): the Bundle contained, but not the Bundle itself. """ - def _check_path_and_write(stix_dir, stix_obj): - path = os.path.join(stix_dir, stix_obj["type"], stix_obj["id"] + ".json") - - if not os.path.exists(os.path.dirname(path)): - os.makedirs(os.path.dirname(path)) - - with open(path, "w") as f: - f.write(str(stix_obj)) - if isinstance(stix_data, (STIXDomainObject, STIXRelationshipObject, MarkingDefinition)): # adding python STIX object - _check_path_and_write(self._stix_dir, stix_data) + self._check_path_and_write(stix_data) elif isinstance(stix_data, (str, dict)): stix_data = parse(stix_data, allow_custom) @@ -97,7 +106,7 @@ class FileSystemSink(DataSink): self.add(stix_obj) else: # adding json-formatted STIX - _check_path_and_write(self._stix_dir, stix_data) + self._check_path_and_write(stix_data) elif isinstance(stix_data, Bundle): # recursively add individual STIX objects diff --git a/stix2/test/test_bundle.py b/stix2/test/test_bundle.py index 12b2149..24bbd43 100644 --- a/stix2/test/test_bundle.py +++ b/stix2/test/test_bundle.py @@ -164,5 +164,4 @@ def test_stix_object_property(): prop = stix2.core.STIXObjectProperty() identity = stix2.Identity(name="test", identity_class="individual") - assert prop.clean(identity) == identity assert prop.clean(identity) is identity diff --git a/stix2/test/test_filesystem.py b/stix2/test/test_filesystem.py index 8b9aa22..1e79d05 100644 --- a/stix2/test/test_filesystem.py +++ b/stix2/test/test_filesystem.py @@ -255,6 +255,24 @@ def test_filesystem_store_add(fs_store): os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json")) +def test_filesystem_store_add_as_bundle(): + fs_store = FileSystemStore(FS_PATH, bundlify=True) + + camp1 = Campaign(name="Great Heathen Army", + objective="Targeting the government of United Kingdom and insitutions affiliated with the Church Of England", + aliases=["Ragnar"]) + fs_store.add(camp1) + + with open(os.path.join(FS_PATH, "campaign", camp1.id + ".json")) as bundle_file: + assert '"type": "bundle"' in bundle_file.read() + + camp1_r = fs_store.get(camp1.id) + assert camp1_r.id == camp1.id + assert camp1_r.name == camp1.name + + shutil.rmtree(os.path.join(FS_PATH, "campaign"), True) + + def test_filesystem_add_bundle_object(fs_store): bundle = Bundle() fs_store.add(bundle)