Update tests to improve coverage.

stix2.1
Emmanuelle Vargas-Gonzalez 2017-08-28 14:33:12 -04:00
parent 160d69b03d
commit aea076103c
1 changed files with 170 additions and 71 deletions

View File

@ -1,19 +1,15 @@
import pytest
from taxii2client import Collection
from stix2.sources import DataSource, Filter, taxii
from stix2.sources import CompositeDataSource, DataSink, DataSource, DataStore, Filter, make_id, taxii
from stix2.sources.memory import MemorySource
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
class MockTAXIIClient(object):
"""Mock for taxii2_client.TAXIIClient"""
def get(self):
return {}
def post(self):
return {}
pass
@pytest.fixture
@ -21,6 +17,18 @@ def collection():
return Collection(COLLECTION_URL, MockTAXIIClient())
def test_ds_smoke():
ds1 = DataSource()
ds2 = DataSink()
ds3 = DataStore(source=ds1, sink=ds2)
with pytest.raises(NotImplementedError):
ds3.add(None)
ds3.all_versions("malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")
ds3.get("malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")
ds3.query([Filter("id", "=", "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")])
def test_ds_taxii(collection):
ds = taxii.TAXIICollectionSource(collection)
assert ds.name == 'TAXIICollectionSource'
@ -104,6 +112,8 @@ def test_add_get_remove_filter():
assert len(ds.filters) == 2
ds.add_filters(valid_filters)
def test_apply_common_filters():
stix_objs = [
@ -160,9 +170,7 @@ def test_apply_common_filters():
resp = ds.apply_common_filters(stix_objs, [filters[2]])
assert resp[0]['id'] == stix_objs[0]['id']
def test_deduplicate():
stix_objs = [
STIX_OBJS1 = [
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
@ -225,8 +233,49 @@ def test_deduplicate():
}
]
STIX_OBJS2 = [
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-31T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
]
def test_deduplicate():
ds = DataSource()
unique = ds.deduplicate(stix_objs)
unique = ds.deduplicate(STIX_OBJS1)
# Only 3 objects are unique
# 2 id's vary
@ -243,6 +292,56 @@ def test_deduplicate():
assert "2017-01-27T13:49:53.936Z" in mods
def test_add_remove_composite_datasource():
cds = CompositeDataSource()
ds1 = DataSource()
ds2 = DataSource()
ds3 = DataSink()
cds.add_data_source([ds1, ds2, ds1, ds3])
assert len(cds.get_all_data_sources()) == 2
cds.remove_data_source([ds1.id_, ds2.id_])
assert len(cds.get_all_data_sources()) == 0
with pytest.raises(ValueError):
cds.remove_data_source([ds3.id_])
def test_composite_datasource_operations():
BUNDLE1 = dict(id="bundle--%s" % make_id(),
objects=STIX_OBJS1,
spec_version="2.0",
type="bundle")
cds = CompositeDataSource()
ds1 = MemorySource(stix_data=BUNDLE1)
ds2 = MemorySource(stix_data=STIX_OBJS2)
cds.add_data_source([ds1, ds2])
indicators = cds.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
# In STIX_OBJS2 changed the 'modified' property to a later time...
assert len(indicators) == 2
indicator = cds.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
assert indicator["id"] == "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f"
assert indicator["modified"] == "2017-01-31T13:49:53.935Z"
assert indicator["type"] == "indicator"
query = [
Filter("type", "=", "indicator")
]
results = cds.query(query)
# STIX_OBJS2 has indicator with later time, one with different id, one with
# original time in STIX_OBJS1
assert len(results) == 3
# def test_data_source_file():
# ds = file.FileDataSource()
#