Add FileSystem test fixture, test adding bundle
parent
374539a6cc
commit
134267bfd6
|
@ -8,10 +8,12 @@ TODO:
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from stix2.base import _STIXBase
|
from stix2.common import MarkingDefinition
|
||||||
from stix2.core import Bundle, parse
|
from stix2.core import Bundle, parse
|
||||||
|
from stix2.sdo import STIXDomainObject
|
||||||
from stix2.sources import DataSink, DataSource, DataStore
|
from stix2.sources import DataSink, DataSource, DataStore
|
||||||
from stix2.sources.filters import Filter, apply_common_filters
|
from stix2.sources.filters import Filter, apply_common_filters
|
||||||
|
from stix2.sro import STIXRelationshipObject
|
||||||
from stix2.utils import deduplicate
|
from stix2.utils import deduplicate
|
||||||
|
|
||||||
|
|
||||||
|
@ -81,7 +83,7 @@ class FileSystemSink(DataSink):
|
||||||
# Bundle() can take dict or STIX obj as argument
|
# Bundle() can take dict or STIX obj as argument
|
||||||
f.write(str(Bundle(stix_obj)))
|
f.write(str(Bundle(stix_obj)))
|
||||||
|
|
||||||
if isinstance(stix_data, _STIXBase):
|
if isinstance(stix_data, (STIXDomainObject, STIXRelationshipObject, MarkingDefinition)):
|
||||||
# adding python STIX object
|
# adding python STIX object
|
||||||
_check_path_and_write(self._stix_dir, stix_data)
|
_check_path_and_write(self._stix_dir, stix_data)
|
||||||
|
|
||||||
|
@ -94,11 +96,11 @@ class FileSystemSink(DataSink):
|
||||||
# adding json-formatted STIX
|
# adding json-formatted STIX
|
||||||
_check_path_and_write(self._stix_dir, stix_data)
|
_check_path_and_write(self._stix_dir, stix_data)
|
||||||
|
|
||||||
elif isinstance(stix_data, str):
|
elif isinstance(stix_data, (str, Bundle)):
|
||||||
# adding json encoded string of STIX content
|
# adding json encoded string of STIX content
|
||||||
stix_data = parse(stix_data)
|
stix_data = parse(stix_data)
|
||||||
if stix_data["type"] == "bundle":
|
if stix_data["type"] == "bundle":
|
||||||
for stix_obj in stix_data["objects"]:
|
for stix_obj in stix_data.get("objects", []):
|
||||||
self.add(stix_obj)
|
self.add(stix_obj)
|
||||||
else:
|
else:
|
||||||
self.add(stix_data)
|
self.add(stix_data)
|
||||||
|
|
|
@ -1,10 +1,11 @@
|
||||||
import os
|
import os
|
||||||
|
import shutil
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from taxii2client import Collection
|
from taxii2client import Collection
|
||||||
|
|
||||||
from stix2 import (Campaign, FileSystemSink, FileSystemSource, FileSystemStore,
|
from stix2 import (Bundle, Campaign, FileSystemSink, FileSystemSource,
|
||||||
Filter, MemorySource, MemoryStore)
|
FileSystemStore, Filter, MemorySource, MemoryStore)
|
||||||
from stix2.sources import (CompositeDataSource, DataSink, DataSource,
|
from stix2.sources import (CompositeDataSource, DataSink, DataSource,
|
||||||
DataStore, make_id, taxii)
|
DataStore, make_id, taxii)
|
||||||
from stix2.sources.filters import apply_common_filters
|
from stix2.sources.filters import apply_common_filters
|
||||||
|
@ -29,6 +30,15 @@ def ds():
|
||||||
return DataSource()
|
return DataSource()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def fs_store():
|
||||||
|
# create
|
||||||
|
yield FileSystemStore(FS_PATH)
|
||||||
|
|
||||||
|
# remove campaign dir
|
||||||
|
shutil.rmtree(os.path.join(FS_PATH, "campaign"))
|
||||||
|
|
||||||
|
|
||||||
IND1 = {
|
IND1 = {
|
||||||
"created": "2017-01-27T13:49:53.935Z",
|
"created": "2017-01-27T13:49:53.935Z",
|
||||||
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
||||||
|
@ -681,10 +691,7 @@ def test_filesystem_sink():
|
||||||
os.rmdir(os.path.join(FS_PATH, "campaign"))
|
os.rmdir(os.path.join(FS_PATH, "campaign"))
|
||||||
|
|
||||||
|
|
||||||
def test_filesystem_store():
|
def test_filesystem_store(fs_store):
|
||||||
# creation
|
|
||||||
fs_store = FileSystemStore(FS_PATH)
|
|
||||||
|
|
||||||
# get()
|
# get()
|
||||||
coa = fs_store.get("course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd")
|
coa = fs_store.get("course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd")
|
||||||
assert coa.id == "course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd"
|
assert coa.id == "course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd"
|
||||||
|
@ -716,3 +723,8 @@ def test_filesystem_store():
|
||||||
|
|
||||||
# remove campaign dir
|
# remove campaign dir
|
||||||
os.rmdir(os.path.join(FS_PATH, "campaign"))
|
os.rmdir(os.path.join(FS_PATH, "campaign"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_add_object_with_custom_property_in_bundle(fs_store):
|
||||||
|
bundle = Bundle()
|
||||||
|
fs_store.add(bundle)
|
||||||
|
|
Loading…
Reference in New Issue