diff --git a/stix2/test/test_datastore_taxii.py b/stix2/test/test_datastore_taxii.py index c20fbb6..dd81cd4 100644 --- a/stix2/test/test_datastore_taxii.py +++ b/stix2/test/test_datastore_taxii.py @@ -1,4 +1,5 @@ -from stix2 import Bundle, ThreatActor, TAXIICollectionSource, TAXIICollectionSink +from stix2 import (Bundle, ThreatActor, TAXIICollectionSink, + TAXIICollectionSource, TAXIICollectionStore) from stix2.datastore.filters import Filter import json @@ -27,7 +28,8 @@ class MockTAXIICollectionEndpoint(Collection): def get_objects(self, **filter_kwargs): self._verify_can_read() query_params = _filter_kwargs_to_query_params(filter_kwargs) - query_params = json.loads(query_params) + if not isinstance(query_params, dict): + query_params = json.loads(query_params) full_filter = BasicFilter(query_params or {}) objs = full_filter.process_filter( self.objects, @@ -90,6 +92,80 @@ def test_add_stix2_object(collection): tc_sink.add(ta) +def test_add_stix2_with_custom_object(collection): + tc_sink = TAXIICollectionStore(collection, allow_custom=True) + + # create new STIX threat-actor + ta = ThreatActor(name="Teddy Bear", + labels=["nation-state"], + sophistication="innovator", + resource_level="government", + goals=[ + "compromising environment NGOs", + "water-hole attacks geared towards energy sector", + ], + foo="bar", + allow_custom=True) + + tc_sink.add(ta) + + +def test_add_list_object(collection, indicator): + tc_sink = TAXIICollectionSink(collection) + + # create new STIX threat-actor + ta = ThreatActor(name="Teddy Bear", + labels=["nation-state"], + sophistication="innovator", + resource_level="government", + goals=[ + "compromising environment NGOs", + "water-hole attacks geared towards energy sector", + ]) + + tc_sink.add([ta, indicator]) + + +def test_add_stix2_bundle_object(collection): + tc_sink = TAXIICollectionSink(collection) + + # create new STIX threat-actor + ta = ThreatActor(name="Teddy Bear", + labels=["nation-state"], + sophistication="innovator", + resource_level="government", + goals=[ + "compromising environment NGOs", + "water-hole attacks geared towards energy sector", + ]) + + tc_sink.add(Bundle(objects=[ta])) + + +def test_add_str_object(collection): + tc_sink = TAXIICollectionSink(collection) + + # create new STIX threat-actor + ta = """{ + "type": "threat-actor", + "id": "threat-actor--eddff64f-feb1-4469-b07c-499a73c96415", + "created": "2018-04-23T16:40:50.847Z", + "modified": "2018-04-23T16:40:50.847Z", + "name": "Teddy Bear", + "goals": [ + "compromising environment NGOs", + "water-hole attacks geared towards energy sector" + ], + "sophistication": "innovator", + "resource_level": "government", + "labels": [ + "nation-state" + ] + }""" + + tc_sink.add(ta) + + def test_get_stix2_object(collection): tc_sink = TAXIICollectionSource(collection) @@ -98,7 +174,7 @@ def test_get_stix2_object(collection): assert objects -def test_parse_taxii_filters(): +def test_parse_taxii_filters(collection): query = [ Filter("added_after", "=", "2016-02-01T00:00:01.000Z"), Filter("id", "=", "taxii stix object ID"), @@ -107,12 +183,12 @@ def test_parse_taxii_filters(): Filter("created_by_ref", "=", "Bane"), ] - taxii_filters_expected = set([ + taxii_filters_expected = [ Filter("added_after", "=", "2016-02-01T00:00:01.000Z"), Filter("id", "=", "taxii stix object ID"), Filter("type", "=", "taxii stix object ID"), Filter("version", "=", "first") - ]) + ] ds = TAXIICollectionSource(collection) @@ -121,7 +197,7 @@ def test_parse_taxii_filters(): assert taxii_filters == taxii_filters_expected -def test_add_get_remove_filter(): +def test_add_get_remove_filter(collection): ds = TAXIICollectionSource(collection) # First 3 filters are valid, remaining properties are erroneous in some way @@ -136,20 +212,30 @@ def test_add_get_remove_filter(): ds.filters.add(valid_filters[0]) assert len(ds.filters) == 1 - # Addin the same filter again will have no effect since `filters` uses a set + # Addin the same filter again will have no effect since `filters` acts + # like a set ds.filters.add(valid_filters[0]) assert len(ds.filters) == 1 ds.filters.add(valid_filters[1]) assert len(ds.filters) == 2 + ds.filters.add(valid_filters[2]) assert len(ds.filters) == 3 - assert set(valid_filters) == ds.filters + assert valid_filters == [f for f in ds.filters] # remove ds.filters.remove(valid_filters[0]) assert len(ds.filters) == 2 - ds.filters.update(valid_filters) + ds.filters.add(valid_filters) + + +def test_get_all_versions(collection): + ds = TAXIICollectionStore(collection) + + indicators = ds.all_versions('indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f') + # There are 3 indicators but 2 share the same 'modified' timestamp + assert len(indicators) == 2