Update tests related to Datastores

stix2.0
Emmanuelle Vargas-Gonzalez 2017-11-02 21:31:56 -04:00
parent 1e591a827d
commit cd6c7982af
3 changed files with 31 additions and 39 deletions

View File

@ -1,7 +1,7 @@
import pytest
from taxii2client import Collection
from stix2 import Filter, MemorySource
from stix2 import Filter, MemorySink, MemorySource
from stix2.sources import (CompositeDataSource, DataSink, DataSource,
DataStore, make_id, taxii)
from stix2.sources.filters import apply_common_filters
@ -20,11 +20,6 @@ def collection():
return Collection(COLLECTION_URL, MockTAXIIClient())
@pytest.fixture
def ds():
return DataSource()
IND1 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
@ -127,21 +122,17 @@ STIX_OBJS1 = [IND1, IND2, IND3, IND4, IND5]
def test_ds_abstract_class_smoke():
ds1 = DataSource()
ds2 = DataSink()
ds3 = DataStore(source=ds1, sink=ds2)
with pytest.raises(TypeError):
DataStore()
with pytest.raises(NotImplementedError):
ds3.add(None)
with pytest.raises(TypeError):
DataStore.get()
with pytest.raises(NotImplementedError):
ds3.all_versions("malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")
with pytest.raises(TypeError):
DataSource()
with pytest.raises(NotImplementedError):
ds3.get("malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")
with pytest.raises(NotImplementedError):
ds3.query([Filter("id", "=", "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")])
with pytest.raises(TypeError):
DataSink()
def test_ds_taxii(collection):
@ -177,7 +168,8 @@ def test_parse_taxii_filters():
assert taxii_filters == expected_params
def test_add_get_remove_filter(ds):
def test_add_get_remove_filter():
ds = taxii.TAXIICollectionSource(collection)
# First 3 filters are valid, remaining properties are erroneous in some way
valid_filters = [
@ -226,7 +218,7 @@ def test_add_get_remove_filter(ds):
ds.filters.update(valid_filters)
def test_apply_common_filters(ds):
def test_apply_common_filters():
stix_objs = [
{
"created": "2017-01-27T13:49:53.997Z",
@ -374,35 +366,35 @@ def test_apply_common_filters(ds):
assert len(resp) == 0
def test_filters0(ds):
def test_filters0():
# "Return any object modified before 2017-01-28T13:49:53.935Z"
resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")]))
assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2
def test_filters1(ds):
def test_filters1():
# "Return any object modified after 2017-01-28T13:49:53.935Z"
resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")]))
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1
def test_filters2(ds):
def test_filters2():
# "Return any object modified after or on 2017-01-28T13:49:53.935Z"
resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")]))
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 3
def test_filters3(ds):
def test_filters3():
# "Return any object modified before or on 2017-01-28T13:49:53.935Z"
resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")]))
assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2
def test_filters4(ds):
def test_filters4():
# Assert invalid Filter cannot be created
with pytest.raises(ValueError) as excinfo:
Filter("modified", "?", "2017-01-27T13:49:53.935Z")
@ -410,21 +402,21 @@ def test_filters4(ds):
"for specified property: 'modified'")
def test_filters5(ds):
def test_filters5():
# "Return any object whose id is not indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"
resp = list(apply_common_filters(STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")]))
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1
def test_filters6(ds):
def test_filters6():
# Test filtering on non-common property
resp = list(apply_common_filters(STIX_OBJS2, [Filter("name", "=", "Malicious site hosting downloader")]))
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 3
def test_filters7(ds):
def test_filters7():
# Test filtering on embedded property
stix_objects = list(STIX_OBJS2) + [{
"type": "observed-data",
@ -463,7 +455,7 @@ def test_filters7(ds):
assert len(resp) == 1
def test_deduplicate(ds):
def test_deduplicate():
unique = deduplicate(STIX_OBJS1)
# Only 3 objects are unique
@ -483,14 +475,14 @@ def test_deduplicate(ds):
def test_add_remove_composite_datasource():
cds = CompositeDataSource()
ds1 = DataSource()
ds2 = DataSource()
ds3 = DataSink()
ds1 = MemorySource()
ds2 = MemorySource()
ds3 = MemorySink()
with pytest.raises(TypeError) as excinfo:
cds.add_data_sources([ds1, ds2, ds1, ds3])
assert str(excinfo.value) == ("DataSource (to be added) is not of type "
"stix2.DataSource. DataSource type is '<class 'stix2.sources.DataSink'>'")
"stix2.DataSource. DataSource type is '<class 'stix2.sources.memory.MemorySink'>'")
cds.add_data_sources([ds1, ds2, ds1])

View File

@ -340,7 +340,7 @@ def test_filesystem_object_with_custom_property(fs_store):
fs_store.add(camp, True)
camp_r = fs_store.get(camp.id, True)
camp_r = fs_store.get(camp.id)
assert camp_r.id == camp.id
assert camp_r.x_empire == camp.x_empire
@ -354,7 +354,7 @@ def test_filesystem_object_with_custom_property_in_bundle(fs_store):
bundle = Bundle(camp, allow_custom=True)
fs_store.add(bundle, True)
camp_r = fs_store.get(camp.id, True)
camp_r = fs_store.get(camp.id)
assert camp_r.id == camp.id
assert camp_r.x_empire == camp.x_empire
@ -369,7 +369,7 @@ def test_filesystem_custom_object(fs_store):
newobj = NewObj(property1='something')
fs_store.add(newobj, True)
newobj_r = fs_store.get(newobj.id, True)
newobj_r = fs_store.get(newobj.id)
assert newobj_r.id == newobj.id
assert newobj_r.property1 == 'something'

View File

@ -235,7 +235,7 @@ def test_memory_store_object_with_custom_property(mem_store):
mem_store.add(camp, True)
camp_r = mem_store.get(camp.id, True)
camp_r = mem_store.get(camp.id)
assert camp_r.id == camp.id
assert camp_r.x_empire == camp.x_empire
@ -249,7 +249,7 @@ def test_memory_store_object_with_custom_property_in_bundle(mem_store):
bundle = Bundle(camp, allow_custom=True)
mem_store.add(bundle, True)
bundle_r = mem_store.get(bundle.id, True)
bundle_r = mem_store.get(bundle.id)
camp_r = bundle_r['objects'][0]
assert camp_r.id == camp.id
assert camp_r.x_empire == camp.x_empire
@ -265,6 +265,6 @@ def test_memory_store_custom_object(mem_store):
newobj = NewObj(property1='something')
mem_store.add(newobj, True)
newobj_r = mem_store.get(newobj.id, True)
newobj_r = mem_store.get(newobj.id)
assert newobj_r.id == newobj.id
assert newobj_r.property1 == 'something'