Finish adding new tests for TAXII datastore. closes #148

stix2.0
Emmanuelle Vargas-Gonzalez 2018-04-23 14:15:02 -04:00
parent e4a226cae6
commit 2fe9a0f297
1 changed files with 95 additions and 9 deletions

View File

@ -1,4 +1,5 @@
from stix2 import Bundle, ThreatActor, TAXIICollectionSource, TAXIICollectionSink from stix2 import (Bundle, ThreatActor, TAXIICollectionSink,
TAXIICollectionSource, TAXIICollectionStore)
from stix2.datastore.filters import Filter from stix2.datastore.filters import Filter
import json import json
@ -27,6 +28,7 @@ class MockTAXIICollectionEndpoint(Collection):
def get_objects(self, **filter_kwargs): def get_objects(self, **filter_kwargs):
self._verify_can_read() self._verify_can_read()
query_params = _filter_kwargs_to_query_params(filter_kwargs) query_params = _filter_kwargs_to_query_params(filter_kwargs)
if not isinstance(query_params, dict):
query_params = json.loads(query_params) query_params = json.loads(query_params)
full_filter = BasicFilter(query_params or {}) full_filter = BasicFilter(query_params or {})
objs = full_filter.process_filter( objs = full_filter.process_filter(
@ -90,6 +92,80 @@ def test_add_stix2_object(collection):
tc_sink.add(ta) tc_sink.add(ta)
def test_add_stix2_with_custom_object(collection):
tc_sink = TAXIICollectionStore(collection, allow_custom=True)
# create new STIX threat-actor
ta = ThreatActor(name="Teddy Bear",
labels=["nation-state"],
sophistication="innovator",
resource_level="government",
goals=[
"compromising environment NGOs",
"water-hole attacks geared towards energy sector",
],
foo="bar",
allow_custom=True)
tc_sink.add(ta)
def test_add_list_object(collection, indicator):
tc_sink = TAXIICollectionSink(collection)
# create new STIX threat-actor
ta = ThreatActor(name="Teddy Bear",
labels=["nation-state"],
sophistication="innovator",
resource_level="government",
goals=[
"compromising environment NGOs",
"water-hole attacks geared towards energy sector",
])
tc_sink.add([ta, indicator])
def test_add_stix2_bundle_object(collection):
tc_sink = TAXIICollectionSink(collection)
# create new STIX threat-actor
ta = ThreatActor(name="Teddy Bear",
labels=["nation-state"],
sophistication="innovator",
resource_level="government",
goals=[
"compromising environment NGOs",
"water-hole attacks geared towards energy sector",
])
tc_sink.add(Bundle(objects=[ta]))
def test_add_str_object(collection):
tc_sink = TAXIICollectionSink(collection)
# create new STIX threat-actor
ta = """{
"type": "threat-actor",
"id": "threat-actor--eddff64f-feb1-4469-b07c-499a73c96415",
"created": "2018-04-23T16:40:50.847Z",
"modified": "2018-04-23T16:40:50.847Z",
"name": "Teddy Bear",
"goals": [
"compromising environment NGOs",
"water-hole attacks geared towards energy sector"
],
"sophistication": "innovator",
"resource_level": "government",
"labels": [
"nation-state"
]
}"""
tc_sink.add(ta)
def test_get_stix2_object(collection): def test_get_stix2_object(collection):
tc_sink = TAXIICollectionSource(collection) tc_sink = TAXIICollectionSource(collection)
@ -98,7 +174,7 @@ def test_get_stix2_object(collection):
assert objects assert objects
def test_parse_taxii_filters(): def test_parse_taxii_filters(collection):
query = [ query = [
Filter("added_after", "=", "2016-02-01T00:00:01.000Z"), Filter("added_after", "=", "2016-02-01T00:00:01.000Z"),
Filter("id", "=", "taxii stix object ID"), Filter("id", "=", "taxii stix object ID"),
@ -107,12 +183,12 @@ def test_parse_taxii_filters():
Filter("created_by_ref", "=", "Bane"), Filter("created_by_ref", "=", "Bane"),
] ]
taxii_filters_expected = set([ taxii_filters_expected = [
Filter("added_after", "=", "2016-02-01T00:00:01.000Z"), Filter("added_after", "=", "2016-02-01T00:00:01.000Z"),
Filter("id", "=", "taxii stix object ID"), Filter("id", "=", "taxii stix object ID"),
Filter("type", "=", "taxii stix object ID"), Filter("type", "=", "taxii stix object ID"),
Filter("version", "=", "first") Filter("version", "=", "first")
]) ]
ds = TAXIICollectionSource(collection) ds = TAXIICollectionSource(collection)
@ -121,7 +197,7 @@ def test_parse_taxii_filters():
assert taxii_filters == taxii_filters_expected assert taxii_filters == taxii_filters_expected
def test_add_get_remove_filter(): def test_add_get_remove_filter(collection):
ds = TAXIICollectionSource(collection) ds = TAXIICollectionSource(collection)
# First 3 filters are valid, remaining properties are erroneous in some way # First 3 filters are valid, remaining properties are erroneous in some way
@ -136,20 +212,30 @@ def test_add_get_remove_filter():
ds.filters.add(valid_filters[0]) ds.filters.add(valid_filters[0])
assert len(ds.filters) == 1 assert len(ds.filters) == 1
# Addin the same filter again will have no effect since `filters` uses a set # Addin the same filter again will have no effect since `filters` acts
# like a set
ds.filters.add(valid_filters[0]) ds.filters.add(valid_filters[0])
assert len(ds.filters) == 1 assert len(ds.filters) == 1
ds.filters.add(valid_filters[1]) ds.filters.add(valid_filters[1])
assert len(ds.filters) == 2 assert len(ds.filters) == 2
ds.filters.add(valid_filters[2]) ds.filters.add(valid_filters[2])
assert len(ds.filters) == 3 assert len(ds.filters) == 3
assert set(valid_filters) == ds.filters assert valid_filters == [f for f in ds.filters]
# remove # remove
ds.filters.remove(valid_filters[0]) ds.filters.remove(valid_filters[0])
assert len(ds.filters) == 2 assert len(ds.filters) == 2
ds.filters.update(valid_filters) ds.filters.add(valid_filters)
def test_get_all_versions(collection):
ds = TAXIICollectionStore(collection)
indicators = ds.all_versions('indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f')
# There are 3 indicators but 2 share the same 'modified' timestamp
assert len(indicators) == 2