diff --git a/.gitignore b/.gitignore index 395676e..9e12d7d 100644 --- a/.gitignore +++ b/.gitignore @@ -43,6 +43,7 @@ htmlcov/ nosetests.xml coverage.xml *,cover +.pytest_cache/ # Translations *.mo diff --git a/.isort.cfg b/.isort.cfg index cca9d19..56b0448 100644 --- a/.isort.cfg +++ b/.isort.cfg @@ -3,6 +3,7 @@ not_skip = __init__.py skip = workbench.py known_third_party = dateutil, + medallion, ordereddict, pytest, pytz, diff --git a/docs/guide/datastore.ipynb b/docs/guide/datastore.ipynb index c3980a0..2fabf88 100644 --- a/docs/guide/datastore.ipynb +++ b/docs/guide/datastore.ipynb @@ -356,16 +356,16 @@ "TAXII2 filter fields:\n", "\n", "* added_after\n", - "* match[id]\n", - "* match[type]\n", - "* match[version]\n", + "* id\n", + "* type\n", + "* version\n", "\n", "Supported operators:\n", "\n", "* =\n", "* !=\n", "* in\n", - "* >\n", + "* ```>```\n", "* < \n", "* ```>=```\n", "* <=\n", diff --git a/examples/taxii_example.py b/examples/taxii_example.py index e102b17..978f0f2 100644 --- a/examples/taxii_example.py +++ b/examples/taxii_example.py @@ -1,54 +1,39 @@ -import json +from taxii2client import Collection -from stix2.datastore.taxii import TAXIIDataSource +import stix2 -# Flask TAXII server - developmental -ROOT = 'http://localhost:5000' -AUTH = {'user': 'mk', 'pass': 'Pass'} +# This example is based on the medallion server with default_data.json +# See https://github.com/oasis-open/cti-taxii-server for more information def main(): + collection = Collection("http://127.0.0.1:5000/trustgroup1/collections/52892447-4d7e-4f70-b94d-d7f22742ff63/", + user="admin", password="Password0") # instantiate TAXII data source - taxii = TAXIIDataSource(api_root=ROOT, auth=AUTH) + taxii = stix2.TAXIICollectionSource(collection) - # get (file watch indicator) - indicator_fw = taxii.get(id_="indicator--a932fcc6-e032-176c-126f-cb970a5a1ade") + # get (url watch indicator) + indicator_fw = taxii.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") print("\n\n-------Queried for Indicator - got:") - print(json.dumps(indicator_fw, indent=4)) + print(indicator_fw.serialize(indent=4)) - # all versions (file watch indicator - currently only 1. maybe Emmanuelle can add a version) - indicator_fw_versions = taxii.get(id_="indicator--a932fcc6-e032-176c-126f-cb970a5a1ade") + # all versions (url watch indicator - currently two) + indicator_fw_versions = taxii.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") print("\n\n------Queried for indicator (all_versions()) - got:") - print(json.dumps(indicator_fw_versions, indent=4)) + for indicator in indicator_fw_versions: + print(indicator.serialize(indent=4)) # add TAXII filter (ie filter should be passed to TAXII) - taxii_filter_ids, status = taxii.add_filter( - [ - { - "field": "type", - "op": "in", - "value": "malware" - } - ]) + query_filter = stix2.Filter("type", "in", "malware") - print("\n\n-------Added filter:") - print("Filter ID: {0}".format(taxii_filter_ids[0])) - print("Filter status: \n") - print(json.dumps(status, indent=4)) - print("filters: \n") - print(json.dumps(taxii.get_filters(), indent=4)) - - # get() - but with filter attached - malware = taxii.query() + # query() - but with filter attached. There are no malware objects in this collection + malwares = taxii.query(query=query_filter) print("\n\n\n--------Queried for Malware string (with above filter attached) - got:") - print(json.dumps(malware, indent=4)) - - # remove TAXII filter - taxii.remove_filter(taxii_filter_ids) - print("\n\n-------Removed filter(TAXII filter):") - print("filters: \n") - print(json.dumps(taxii.get_filters(), indent=4)) + for malware in malwares: + print(malware.serialize(indent=4)) + if not malwares: + print(malwares) if __name__ == "__main__": diff --git a/stix2/datastore/memory.py b/stix2/datastore/memory.py index c43da8b..c1d202d 100644 --- a/stix2/datastore/memory.py +++ b/stix2/datastore/memory.py @@ -268,10 +268,7 @@ class MemorySource(DataSource): is returned in the same form as it as added. """ - if query is None: - query = FilterSet() - else: - query = FilterSet(query) + query = FilterSet(query) # combine all query filters if self.filters: diff --git a/stix2/datastore/taxii.py b/stix2/datastore/taxii.py index faa2669..872a510 100644 --- a/stix2/datastore/taxii.py +++ b/stix2/datastore/taxii.py @@ -176,8 +176,8 @@ class TAXIICollectionSource(DataSource): """ # make query in TAXII query format since 'id' is TAXII field query = [ - Filter("match[id]", "=", stix_id), - Filter("match[version]", "=", "all") + Filter("id", "=", stix_id), + Filter("version", "=", "all") ] all_data = self.query(query=query, _composite_filters=_composite_filters) @@ -232,7 +232,8 @@ class TAXIICollectionSource(DataSource): all_data = deduplicate(all_data) # apply local (CompositeDataSource, TAXIICollectionSource and query) filters - all_data = list(apply_common_filters(all_data, (query - taxii_filters))) + query.remove(taxii_filters) + all_data = list(apply_common_filters(all_data, query)) except HTTPError: # if resources not found or access is denied from TAXII server, return empty list @@ -249,7 +250,7 @@ class TAXIICollectionSource(DataSource): Does not put in TAXII spec format as the TAXII2Client (that we use) does this for us. - NOTE: + Notes: Currently, the TAXII2Client can handle TAXII filters where the filter value is list, as both a comma-seperated string or python list @@ -265,7 +266,8 @@ class TAXIICollectionSource(DataSource): query (list): list of filters to extract which ones are TAXII specific. - Returns: a list of the TAXII filters + Returns: + A list of TAXII filters that meet the TAXII filtering parameters. """ taxii_filters = [] diff --git a/stix2/test/conftest.py b/stix2/test/conftest.py index 9f61bc2..c73eafb 100644 --- a/stix2/test/conftest.py +++ b/stix2/test/conftest.py @@ -46,3 +46,114 @@ def malware(uuid4, clock): @pytest.fixture def relationship(uuid4, clock): return stix2.Relationship(**RELATIONSHIP_KWARGS) + + +@pytest.fixture +def stix_objs1(): + ind1 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" + } + ind2 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" + } + ind3 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.936Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" + } + ind4 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" + } + ind5 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" + } + return [ind1, ind2, ind3, ind4, ind5] + + +@pytest.fixture +def stix_objs2(): + ind6 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-31T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" + } + ind7 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" + } + ind8 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" + } + return [ind6, ind7, ind8] + + +@pytest.fixture +def real_stix_objs2(stix_objs2): + return [stix2.parse(x) for x in stix_objs2] diff --git a/stix2/test/test_datastore.py b/stix2/test/test_datastore.py index 315aff5..323365a 100644 --- a/stix2/test/test_datastore.py +++ b/stix2/test/test_datastore.py @@ -1,765 +1,117 @@ import pytest -from taxii2client import Collection -from stix2 import Filter, MemorySink, MemorySource -from stix2.core import parse from stix2.datastore import (CompositeDataSource, DataSink, DataSource, - make_id, taxii) -from stix2.datastore.filters import apply_common_filters -from stix2.utils import STIXdatetime, deduplicate, parse_into_datetime - -COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/' + DataStoreMixin) +from stix2.datastore.filters import Filter +from stix2.test.constants import CAMPAIGN_MORE_KWARGS -class MockTAXIIClient(object): - """Mock for taxii2_client.TAXIIClient""" - pass - - -@pytest.fixture -def collection(): - return Collection(COLLECTION_URL, MockTAXIIClient()) - - -stix_objs = [ - { - "created": "2017-01-27T13:49:53.997Z", - "description": "\n\nTITLE:\n\tPoison Ivy", - "id": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111", - "labels": [ - "remote-access-trojan" - ], - "modified": "2017-01-27T13:49:53.997Z", - "name": "Poison Ivy", - "type": "malware" - }, - { - "created": "2014-05-08T09:00:00.000Z", - "id": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade", - "labels": [ - "file-hash-watchlist" - ], - "modified": "2014-05-08T09:00:00.000Z", - "name": "File hash for Poison Ivy variant", - "pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']", - "type": "indicator", - "valid_from": "2014-05-08T09:00:00.000000Z" - }, - { - "created": "2014-05-08T09:00:00.000Z", - "granular_markings": [ - { - "marking_ref": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed", - "selectors": [ - "relationship_type" - ] - } - ], - "id": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463", - "modified": "2014-05-08T09:00:00.000Z", - "object_marking_refs": [ - "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" - ], - "relationship_type": "indicates", - "revoked": True, - "source_ref": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade", - "target_ref": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111", - "type": "relationship" - }, - { - "id": "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef", - "created": "2016-02-14T00:00:00.000Z", - "created_by_ref": "identity--00000000-0000-0000-0000-b8e91df99dc9", - "modified": "2016-02-14T00:00:00.000Z", - "type": "vulnerability", - "name": "CVE-2014-0160", - "description": "The (1) TLS...", - "external_references": [ - { - "source_name": "cve", - "external_id": "CVE-2014-0160" - } - ], - "labels": ["heartbleed", "has-logo"] - }, - { - "type": "observed-data", - "id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", - "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", - "created": "2016-04-06T19:58:16.000Z", - "modified": "2016-04-06T19:58:16.000Z", - "first_observed": "2015-12-21T19:00:00Z", - "last_observed": "2015-12-21T19:00:00Z", - "number_observed": 1, - "objects": { - "0": { - "type": "file", - "name": "HAL 9000.exe" - } - } - - } -] - -# same as above objects but converted to real Python STIX2 objects -# to test filters against true Python STIX2 objects -real_stix_objs = [parse(stix_obj) for stix_obj in stix_objs] - -filters = [ - Filter("type", "!=", "relationship"), - Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"), - Filter("labels", "in", "remote-access-trojan"), - Filter("created", ">", "2015-01-01T01:00:00.000Z"), - Filter("revoked", "=", True), - Filter("revoked", "!=", True), - Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"), - Filter("granular_markings.selectors", "in", "relationship_type"), - Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"), - Filter("external_references.external_id", "in", "CVE-2014-0160,CVE-2017-6608"), - Filter("created_by_ref", "=", "identity--00000000-0000-0000-0000-b8e91df99dc9"), - Filter("object_marking_refs", "=", "marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9"), - Filter("granular_markings.selectors", "in", "description"), - Filter("external_references.source_name", "=", "CVE"), - Filter("objects", "=", {"0": {"type": "file", "name": "HAL 9000.exe"}}) -] - -IND1 = { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" -} -IND2 = { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" -} -IND3 = { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.936Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" -} -IND4 = { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" -} -IND5 = { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" -} -IND6 = { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-31T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" -} -IND7 = { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" -} -IND8 = { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" -} - -STIX_OBJS2 = [IND6, IND7, IND8] -STIX_OBJS1 = [IND1, IND2, IND3, IND4, IND5] - -REAL_STIX_OBJS2 = [parse(IND6), parse(IND7), parse(IND8)] -REAL_STIX_OBJS1 = [parse(IND1), parse(IND2), parse(IND3), parse(IND4), parse(IND5)] - - -def test_ds_abstract_class_smoke(): +def test_datasource_abstract_class_raises_error(): with pytest.raises(TypeError): DataSource() + +def test_datasink_abstract_class_raises_error(): with pytest.raises(TypeError): DataSink() -def test_ds_taxii(collection): - ds = taxii.TAXIICollectionSource(collection) - assert ds.collection is not None +def test_datastore_smoke(): + assert DataStoreMixin() is not None -def test_ds_taxii_name(collection): - ds = taxii.TAXIICollectionSource(collection) - assert ds.collection is not None +def test_datastore_get_raises(): + with pytest.raises(AttributeError) as excinfo: + DataStoreMixin().get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") + assert "DataStoreMixin has no data source to query" == str(excinfo.value) -def test_parse_taxii_filters(): - query = [ - Filter("added_after", "=", "2016-02-01T00:00:01.000Z"), - Filter("id", "=", "taxii stix object ID"), - Filter("type", "=", "taxii stix object ID"), - Filter("version", "=", "first"), - Filter("created_by_ref", "=", "Bane"), - ] - - taxii_filters_expected = [ - Filter("added_after", "=", "2016-02-01T00:00:01.000Z"), - Filter("id", "=", "taxii stix object ID"), - Filter("type", "=", "taxii stix object ID"), - Filter("version", "=", "first") - ] - - ds = taxii.TAXIICollectionSource(collection) - - taxii_filters = ds._parse_taxii_filters(query) - - assert taxii_filters == taxii_filters_expected +def test_datastore_all_versions_raises(): + with pytest.raises(AttributeError) as excinfo: + DataStoreMixin().all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") + assert "DataStoreMixin has no data source to query" == str(excinfo.value) -def test_add_get_remove_filter(): - ds = taxii.TAXIICollectionSource(collection) - - # First 3 filters are valid, remaining properties are erroneous in some way - valid_filters = [ - Filter('type', '=', 'malware'), - Filter('id', '!=', 'stix object id'), - Filter('labels', 'in', ["heartbleed", "malicious-activity"]), - ] - - assert len(ds.filters) == 0 - - ds.filters.add(valid_filters[0]) - assert len(ds.filters) == 1 - - # Addin the same filter again will have no effect since `filters` acts like a set - ds.filters.add(valid_filters[0]) - assert len(ds.filters) == 1 - - ds.filters.add(valid_filters[1]) - assert len(ds.filters) == 2 - ds.filters.add(valid_filters[2]) - assert len(ds.filters) == 3 - - assert valid_filters == [f for f in ds.filters] - - # remove - ds.filters.remove(valid_filters[0]) - - assert len(ds.filters) == 2 - - ds.filters.add(valid_filters) +def test_datastore_query_raises(): + with pytest.raises(AttributeError) as excinfo: + DataStoreMixin().query([Filter("type", "=", "indicator")]) + assert "DataStoreMixin has no data source to query" == str(excinfo.value) -def test_filter_ops_check(): - # invalid filters - non supported operators - - with pytest.raises(ValueError) as excinfo: - # create Filter that has an operator that is not allowed - Filter('modified', '*', 'not supported operator') - assert str(excinfo.value) == "Filter operator '*' not supported for specified property: 'modified'" - - with pytest.raises(ValueError) as excinfo: - Filter("type", "%", "4") - assert "Filter operator '%' not supported for specified property" in str(excinfo.value) +def test_datastore_creator_of_raises(): + with pytest.raises(AttributeError) as excinfo: + DataStoreMixin().creator_of(CAMPAIGN_MORE_KWARGS) + assert "DataStoreMixin has no data source to query" == str(excinfo.value) -def test_filter_value_type_check(): - # invalid filters - non supported value types +def test_datastore_relationships_raises(): + with pytest.raises(AttributeError) as excinfo: + DataStoreMixin().relationships(obj="indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", + target_only=True) + assert "DataStoreMixin has no data source to query" == str(excinfo.value) + +def test_datastore_related_to_raises(): + with pytest.raises(AttributeError) as excinfo: + DataStoreMixin().related_to(obj="indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", + target_only=True) + assert "DataStoreMixin has no data source to query" == str(excinfo.value) + + +def test_datastore_add_raises(): + with pytest.raises(AttributeError) as excinfo: + DataStoreMixin().add(CAMPAIGN_MORE_KWARGS) + assert "DataStoreMixin has no data sink to put objects in" == str(excinfo.value) + + +def test_composite_datastore_get_raises_error(): + with pytest.raises(AttributeError) as excinfo: + CompositeDataSource().get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") + assert "CompositeDataSource has no data sources" == str(excinfo.value) + + +def test_composite_datastore_all_versions_raises_error(): + with pytest.raises(AttributeError) as excinfo: + CompositeDataSource().all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") + assert "CompositeDataSource has no data sources" == str(excinfo.value) + + +def test_composite_datastore_query_raises_error(): + with pytest.raises(AttributeError) as excinfo: + CompositeDataSource().query([Filter("type", "=", "indicator")]) + assert "CompositeDataSource has no data sources" == str(excinfo.value) + + +def test_composite_datastore_relationships_raises_error(): + with pytest.raises(AttributeError) as excinfo: + CompositeDataSource().relationships(obj="indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", + target_only=True) + assert "CompositeDataSource has no data sources" == str(excinfo.value) + + +def test_composite_datastore_related_to_raises_error(): + with pytest.raises(AttributeError) as excinfo: + CompositeDataSource().related_to(obj="indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", + target_only=True) + assert "CompositeDataSource has no data sources" == str(excinfo.value) + + +def test_composite_datastore_add_data_source_raises_error(): with pytest.raises(TypeError) as excinfo: - Filter('created', '=', object()) - # On Python 2, the type of object() is `` On Python 3, it's ``. - assert any([s in str(excinfo.value) for s in ["", "''"]]) - assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value) + ind = "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" + CompositeDataSource().add_data_source(ind) + assert "DataSource (to be added) is not of type stix2.DataSource. DataSource type is '{}'".format(type(ind)) == str(excinfo.value) + +def test_composite_datastore_add_data_sources_raises_error(): with pytest.raises(TypeError) as excinfo: - Filter("type", "=", complex(2, -1)) - assert any([s in str(excinfo.value) for s in ["", "''"]]) - assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value) - - with pytest.raises(TypeError) as excinfo: - Filter("type", "=", set([16, 23])) - assert any([s in str(excinfo.value) for s in ["", "''"]]) - assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value) - - -def test_filter_type_underscore_check(): - # check that Filters where property="type", value (name) doesnt have underscores - with pytest.raises(ValueError) as excinfo: - Filter("type", "=", "oh_underscore") - assert "Filter for property 'type' cannot have its value 'oh_underscore'" in str(excinfo.value) - - -def test_apply_common_filters0(): - # "Return any object whose type is not relationship" - resp = list(apply_common_filters(stix_objs, [filters[0]])) - ids = [r['id'] for r in resp] - assert stix_objs[0]['id'] in ids - assert stix_objs[1]['id'] in ids - assert stix_objs[3]['id'] in ids - assert len(ids) == 4 - - resp = list(apply_common_filters(real_stix_objs, [filters[0]])) - ids = [r.id for r in resp] - assert real_stix_objs[0].id in ids - assert real_stix_objs[1].id in ids - assert real_stix_objs[3].id in ids - assert len(ids) == 4 - - -def test_apply_common_filters1(): - # "Return any object that matched id relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463" - resp = list(apply_common_filters(stix_objs, [filters[1]])) - assert resp[0]['id'] == stix_objs[2]['id'] - assert len(resp) == 1 - - resp = list(apply_common_filters(real_stix_objs, [filters[1]])) - assert resp[0].id == real_stix_objs[2].id - assert len(resp) == 1 - - -def test_apply_common_filters2(): - # "Return any object that contains remote-access-trojan in labels" - resp = list(apply_common_filters(stix_objs, [filters[2]])) - assert resp[0]['id'] == stix_objs[0]['id'] - assert len(resp) == 1 - - resp = list(apply_common_filters(real_stix_objs, [filters[2]])) - assert resp[0].id == real_stix_objs[0].id - assert len(resp) == 1 - - -def test_apply_common_filters3(): - # "Return any object created after 2015-01-01T01:00:00.000Z" - resp = list(apply_common_filters(stix_objs, [filters[3]])) - assert resp[0]['id'] == stix_objs[0]['id'] - assert len(resp) == 3 - - resp = list(apply_common_filters(real_stix_objs, [filters[3]])) - assert resp[0].id == real_stix_objs[0].id - assert len(resp) == 3 - - -def test_apply_common_filters4(): - # "Return any revoked object" - resp = list(apply_common_filters(stix_objs, [filters[4]])) - assert resp[0]['id'] == stix_objs[2]['id'] - assert len(resp) == 1 - - resp = list(apply_common_filters(real_stix_objs, [filters[4]])) - assert resp[0].id == real_stix_objs[2].id - assert len(resp) == 1 - - -def test_apply_common_filters5(): - # "Return any object whose not revoked" - resp = list(apply_common_filters(stix_objs, [filters[5]])) - assert len(resp) == 0 - - resp = list(apply_common_filters(real_stix_objs, [filters[5]])) - assert len(resp) == 4 - - -def test_apply_common_filters6(): - # "Return any object that matches marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9 in object_marking_refs" - resp = list(apply_common_filters(stix_objs, [filters[6]])) - assert resp[0]['id'] == stix_objs[2]['id'] - assert len(resp) == 1 - - resp = list(apply_common_filters(real_stix_objs, [filters[6]])) - assert resp[0].id == real_stix_objs[2].id - assert len(resp) == 1 - - -def test_apply_common_filters7(): - # "Return any object that contains relationship_type in their selectors AND - # also has marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed in marking_ref" - resp = list(apply_common_filters(stix_objs, [filters[7], filters[8]])) - assert resp[0]['id'] == stix_objs[2]['id'] - assert len(resp) == 1 - - resp = list(apply_common_filters(real_stix_objs, [filters[7], filters[8]])) - assert resp[0].id == real_stix_objs[2].id - assert len(resp) == 1 - - -def test_apply_common_filters8(): - # "Return any object that contains CVE-2014-0160,CVE-2017-6608 in their external_id" - resp = list(apply_common_filters(stix_objs, [filters[9]])) - assert resp[0]['id'] == stix_objs[3]['id'] - assert len(resp) == 1 - - resp = list(apply_common_filters(real_stix_objs, [filters[9]])) - assert resp[0].id == real_stix_objs[3].id - assert len(resp) == 1 - - -def test_apply_common_filters9(): - # "Return any object that matches created_by_ref identity--00000000-0000-0000-0000-b8e91df99dc9" - resp = list(apply_common_filters(stix_objs, [filters[10]])) - assert len(resp) == 1 - - resp = list(apply_common_filters(real_stix_objs, [filters[10]])) - assert len(resp) == 1 - - -def test_apply_common_filters10(): - # "Return any object that matches marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9 in object_marking_refs" (None) - resp = list(apply_common_filters(stix_objs, [filters[11]])) - assert len(resp) == 0 - - resp = list(apply_common_filters(real_stix_objs, [filters[11]])) - assert len(resp) == 0 - - -def test_apply_common_filters11(): - # "Return any object that contains description in its selectors" (None) - resp = list(apply_common_filters(stix_objs, [filters[12]])) - assert len(resp) == 0 - - resp = list(apply_common_filters(real_stix_objs, [filters[12]])) - assert len(resp) == 0 - - -def test_apply_common_filters12(): - # "Return any object that matches CVE in source_name" (None, case sensitive) - resp = list(apply_common_filters(stix_objs, [filters[13]])) - assert len(resp) == 0 - - resp = list(apply_common_filters(real_stix_objs, [filters[13]])) - assert len(resp) == 0 - - -def test_apply_common_filters13(): - # Return any object that matches file object in "objects" - resp = list(apply_common_filters(stix_objs, [filters[14]])) - assert resp[0]["id"] == stix_objs[4]["id"] - assert len(resp) == 1 - # important additional check to make sure original File dict was - # not converted to File object. (this was a deep bug found) - assert isinstance(resp[0]["objects"]["0"], dict) - - resp = list(apply_common_filters(real_stix_objs, [filters[14]])) - assert resp[0].id == real_stix_objs[4].id - assert len(resp) == 1 - - -def test_datetime_filter_behavior(): - """if a filter is initialized with its value being a datetime object - OR the STIX object property being filtered on is a datetime object, all - resulting comparisons executed are done on the string representations - of the datetime objects, as the Filter functionality will convert - all datetime objects to there string forms using format_datetim() - - This test makes sure all datetime comparisons are carried out correctly - """ - filter_with_dt_obj = Filter("created", "=", parse_into_datetime("2016-02-14T00:00:00.000Z", "millisecond")) - filter_with_str = Filter("created", "=", "2016-02-14T00:00:00.000Z") - - # check that filter value is converted from datetime to str - assert isinstance(filter_with_dt_obj.value, str) - - # compare datetime string to filter w/ datetime obj - resp = list(apply_common_filters(stix_objs, [filter_with_dt_obj])) - assert len(resp) == 1 - assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef" - - # compare datetime obj to filter w/ datetime obj - resp = list(apply_common_filters(real_stix_objs, [filter_with_dt_obj])) - assert len(resp) == 1 - assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef" - assert isinstance(resp[0].created, STIXdatetime) # make sure original object not altered - - # compare datetime string to filter w/ str - resp = list(apply_common_filters(stix_objs, [filter_with_str])) - assert len(resp) == 1 - assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef" - - # compare datetime obj to filter w/ str - resp = list(apply_common_filters(real_stix_objs, [filter_with_str])) - assert len(resp) == 1 - assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef" - assert isinstance(resp[0].created, STIXdatetime) # make sure original object not altered - - -def test_filters0(): - # "Return any object modified before 2017-01-28T13:49:53.935Z" - resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")])) - assert resp[0]['id'] == STIX_OBJS2[1]['id'] - assert len(resp) == 2 - - resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("modified", "<", parse_into_datetime("2017-01-28T13:49:53.935Z"))])) - assert resp[0].id == REAL_STIX_OBJS2[1].id - assert len(resp) == 2 - - -def test_filters1(): - # "Return any object modified after 2017-01-28T13:49:53.935Z" - resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")])) - assert resp[0]['id'] == STIX_OBJS2[0]['id'] - assert len(resp) == 1 - - resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("modified", ">", parse_into_datetime("2017-01-28T13:49:53.935Z"))])) - assert resp[0].id == REAL_STIX_OBJS2[0].id - assert len(resp) == 1 - - -def test_filters2(): - # "Return any object modified after or on 2017-01-28T13:49:53.935Z" - resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")])) - assert resp[0]['id'] == STIX_OBJS2[0]['id'] - assert len(resp) == 3 - - resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("modified", ">=", parse_into_datetime("2017-01-27T13:49:53.935Z"))])) - assert resp[0].id == REAL_STIX_OBJS2[0].id - assert len(resp) == 3 - - -def test_filters3(): - # "Return any object modified before or on 2017-01-28T13:49:53.935Z" - resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")])) - assert resp[0]['id'] == STIX_OBJS2[1]['id'] - assert len(resp) == 2 - - # "Return any object modified before or on 2017-01-28T13:49:53.935Z" - fv = Filter("modified", "<=", parse_into_datetime("2017-01-27T13:49:53.935Z")) - resp = list(apply_common_filters(REAL_STIX_OBJS2, [fv])) - assert resp[0].id == REAL_STIX_OBJS2[1].id - assert len(resp) == 2 - - -def test_filters4(): - # Assert invalid Filter cannot be created - with pytest.raises(ValueError) as excinfo: - Filter("modified", "?", "2017-01-27T13:49:53.935Z") - assert str(excinfo.value) == ("Filter operator '?' not supported " - "for specified property: 'modified'") - - -def test_filters5(): - # "Return any object whose id is not indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f" - resp = list(apply_common_filters(STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")])) - assert resp[0]['id'] == STIX_OBJS2[0]['id'] - assert len(resp) == 1 - - resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")])) - assert resp[0].id == REAL_STIX_OBJS2[0].id - assert len(resp) == 1 - - -def test_filters6(): - # Test filtering on non-common property - resp = list(apply_common_filters(STIX_OBJS2, [Filter("name", "=", "Malicious site hosting downloader")])) - assert resp[0]['id'] == STIX_OBJS2[0]['id'] - assert len(resp) == 3 - - resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("name", "=", "Malicious site hosting downloader")])) - assert resp[0].id == REAL_STIX_OBJS2[0].id - assert len(resp) == 3 - - -def test_filters7(): - # Test filtering on embedded property - obsvd_data_obj = { - "type": "observed-data", - "id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", - "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", - "created": "2016-04-06T19:58:16.000Z", - "modified": "2016-04-06T19:58:16.000Z", - "first_observed": "2015-12-21T19:00:00Z", - "last_observed": "2015-12-21T19:00:00Z", - "number_observed": 50, - "objects": { - "0": { - "type": "file", - "hashes": { - "SHA-256": "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100f" - }, - "extensions": { - "pdf-ext": { - "version": "1.7", - "document_info_dict": { - "Title": "Sample document", - "Author": "Adobe Systems Incorporated", - "Creator": "Adobe FrameMaker 5.5.3 for Power Macintosh", - "Producer": "Acrobat Distiller 3.01 for Power Macintosh", - "CreationDate": "20070412090123-02" - }, - "pdfid0": "DFCE52BD827ECF765649852119D", - "pdfid1": "57A1E0F9ED2AE523E313C" - } - } - } - } - } - - stix_objects = list(STIX_OBJS2) + [obsvd_data_obj] - real_stix_objects = list(REAL_STIX_OBJS2) + [parse(obsvd_data_obj)] - - resp = list(apply_common_filters(stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")])) - assert resp[0]['id'] == stix_objects[3]['id'] - assert len(resp) == 1 - - resp = list(apply_common_filters(real_stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")])) - assert resp[0].id == real_stix_objects[3].id - assert len(resp) == 1 - - -def test_deduplicate(): - unique = deduplicate(STIX_OBJS1) - - # Only 3 objects are unique - # 2 id's vary - # 2 modified times vary for a particular id - - assert len(unique) == 3 - - ids = [obj['id'] for obj in unique] - mods = [obj['modified'] for obj in unique] - - assert "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f" in ids - assert "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" in ids - assert "2017-01-27T13:49:53.935Z" in mods - assert "2017-01-27T13:49:53.936Z" in mods - - -def test_add_remove_composite_datasource(): - cds = CompositeDataSource() - ds1 = MemorySource() - ds2 = MemorySource() - ds3 = MemorySink() - - with pytest.raises(TypeError) as excinfo: - cds.add_data_sources([ds1, ds2, ds1, ds3]) - assert str(excinfo.value) == ("DataSource (to be added) is not of type " - "stix2.DataSource. DataSource type is ''") - - cds.add_data_sources([ds1, ds2, ds1]) - - assert len(cds.get_all_data_sources()) == 2 - - cds.remove_data_sources([ds1.id, ds2.id]) - - assert len(cds.get_all_data_sources()) == 0 - - -def test_composite_datasource_operations(): - BUNDLE1 = dict(id="bundle--%s" % make_id(), - objects=STIX_OBJS1, - spec_version="2.0", - type="bundle") - cds1 = CompositeDataSource() - ds1_1 = MemorySource(stix_data=BUNDLE1) - ds1_2 = MemorySource(stix_data=STIX_OBJS2) - - cds2 = CompositeDataSource() - ds2_1 = MemorySource(stix_data=BUNDLE1) - ds2_2 = MemorySource(stix_data=STIX_OBJS2) - - cds1.add_data_sources([ds1_1, ds1_2]) - cds2.add_data_sources([ds2_1, ds2_2]) - - indicators = cds1.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") - - # In STIX_OBJS2 changed the 'modified' property to a later time... - assert len(indicators) == 2 - - cds1.add_data_sources([cds2]) - - indicator = cds1.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") - - assert indicator["id"] == "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" - assert indicator["modified"] == "2017-01-31T13:49:53.935Z" - assert indicator["type"] == "indicator" - - query1 = [ - Filter("type", "=", "indicator") - ] - - query2 = [ - Filter("valid_from", "=", "2017-01-27T13:49:53.935382Z") - ] - - cds1.filters.add(query2) - - results = cds1.query(query1) - - # STIX_OBJS2 has indicator with later time, one with different id, one with - # original time in STIX_OBJS1 - assert len(results) == 3 - - indicator = cds1.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") - - assert indicator["id"] == "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" - assert indicator["modified"] == "2017-01-31T13:49:53.935Z" - assert indicator["type"] == "indicator" - - # There is only one indicator with different ID. Since we use the same data - # when deduplicated, only two indicators (one with different modified). - results = cds1.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") - assert len(results) == 2 - - # Since we have filters already associated with our CompositeSource providing - # nothing returns the same as cds1.query(query1) (the associated query is query2) - results = cds1.query([]) - assert len(results) == 3 + ind = "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" + CompositeDataSource().add_data_sources(ind) + assert "DataSource (to be added) is not of type stix2.DataSource. DataSource type is '{}'".format(type(ind)) == str(excinfo.value) def test_composite_datastore_no_datasource(): cds = CompositeDataSource() - with pytest.raises(AttributeError) as excinfo: cds.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") assert 'CompositeDataSource has no data source' in str(excinfo.value) diff --git a/stix2/test/test_filesystem.py b/stix2/test/test_datastore_filesystem.py similarity index 98% rename from stix2/test/test_filesystem.py rename to stix2/test/test_datastore_filesystem.py index f59136e..4139ab1 100644 --- a/stix2/test/test_filesystem.py +++ b/stix2/test/test_datastore_filesystem.py @@ -7,10 +7,10 @@ import pytest from stix2 import (Bundle, Campaign, CustomObject, FileSystemSink, FileSystemSource, FileSystemStore, Filter, Identity, Indicator, Malware, Relationship, properties) - -from .constants import (CAMPAIGN_ID, CAMPAIGN_KWARGS, IDENTITY_ID, - IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS, - MALWARE_ID, MALWARE_KWARGS, RELATIONSHIP_IDS) +from stix2.test.constants import (CAMPAIGN_ID, CAMPAIGN_KWARGS, IDENTITY_ID, + IDENTITY_KWARGS, INDICATOR_ID, + INDICATOR_KWARGS, MALWARE_ID, MALWARE_KWARGS, + RELATIONSHIP_IDS) FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data") diff --git a/stix2/test/test_datastore_filters.py b/stix2/test/test_datastore_filters.py new file mode 100644 index 0000000..5ffd051 --- /dev/null +++ b/stix2/test/test_datastore_filters.py @@ -0,0 +1,463 @@ +import pytest + +from stix2 import parse +from stix2.datastore.filters import Filter, apply_common_filters +from stix2.utils import STIXdatetime, parse_into_datetime + +stix_objs = [ + { + "created": "2017-01-27T13:49:53.997Z", + "description": "\n\nTITLE:\n\tPoison Ivy", + "id": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111", + "labels": [ + "remote-access-trojan" + ], + "modified": "2017-01-27T13:49:53.997Z", + "name": "Poison Ivy", + "type": "malware" + }, + { + "created": "2014-05-08T09:00:00.000Z", + "id": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade", + "labels": [ + "file-hash-watchlist" + ], + "modified": "2014-05-08T09:00:00.000Z", + "name": "File hash for Poison Ivy variant", + "pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']", + "type": "indicator", + "valid_from": "2014-05-08T09:00:00.000000Z" + }, + { + "created": "2014-05-08T09:00:00.000Z", + "granular_markings": [ + { + "marking_ref": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed", + "selectors": [ + "relationship_type" + ] + } + ], + "id": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463", + "modified": "2014-05-08T09:00:00.000Z", + "object_marking_refs": [ + "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" + ], + "relationship_type": "indicates", + "revoked": True, + "source_ref": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade", + "target_ref": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111", + "type": "relationship" + }, + { + "id": "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef", + "created": "2016-02-14T00:00:00.000Z", + "created_by_ref": "identity--00000000-0000-0000-0000-b8e91df99dc9", + "modified": "2016-02-14T00:00:00.000Z", + "type": "vulnerability", + "name": "CVE-2014-0160", + "description": "The (1) TLS...", + "external_references": [ + { + "source_name": "cve", + "external_id": "CVE-2014-0160" + } + ], + "labels": ["heartbleed", "has-logo"] + }, + { + "type": "observed-data", + "id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", + "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + "created": "2016-04-06T19:58:16.000Z", + "modified": "2016-04-06T19:58:16.000Z", + "first_observed": "2015-12-21T19:00:00Z", + "last_observed": "2015-12-21T19:00:00Z", + "number_observed": 1, + "objects": { + "0": { + "type": "file", + "name": "HAL 9000.exe" + } + } + + } +] + + +filters = [ + Filter("type", "!=", "relationship"), + Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"), + Filter("labels", "in", "remote-access-trojan"), + Filter("created", ">", "2015-01-01T01:00:00.000Z"), + Filter("revoked", "=", True), + Filter("revoked", "!=", True), + Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"), + Filter("granular_markings.selectors", "in", "relationship_type"), + Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"), + Filter("external_references.external_id", "in", "CVE-2014-0160,CVE-2017-6608"), + Filter("created_by_ref", "=", "identity--00000000-0000-0000-0000-b8e91df99dc9"), + Filter("object_marking_refs", "=", "marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9"), + Filter("granular_markings.selectors", "in", "description"), + Filter("external_references.source_name", "=", "CVE"), + Filter("objects", "=", {"0": {"type": "file", "name": "HAL 9000.exe"}}) +] + +# same as above objects but converted to real Python STIX2 objects +# to test filters against true Python STIX2 objects +real_stix_objs = [parse(stix_obj) for stix_obj in stix_objs] + + +def test_filter_ops_check(): + # invalid filters - non supported operators + + with pytest.raises(ValueError) as excinfo: + # create Filter that has an operator that is not allowed + Filter('modified', '*', 'not supported operator') + assert str(excinfo.value) == "Filter operator '*' not supported for specified property: 'modified'" + + with pytest.raises(ValueError) as excinfo: + Filter("type", "%", "4") + assert "Filter operator '%' not supported for specified property" in str(excinfo.value) + + +def test_filter_value_type_check(): + # invalid filters - non supported value types + + with pytest.raises(TypeError) as excinfo: + Filter('created', '=', object()) + # On Python 2, the type of object() is `` On Python 3, it's ``. + assert any([s in str(excinfo.value) for s in ["", "''"]]) + assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value) + + with pytest.raises(TypeError) as excinfo: + Filter("type", "=", complex(2, -1)) + assert any([s in str(excinfo.value) for s in ["", "''"]]) + assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value) + + with pytest.raises(TypeError) as excinfo: + Filter("type", "=", set([16, 23])) + assert any([s in str(excinfo.value) for s in ["", "''"]]) + assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value) + + +def test_filter_type_underscore_check(): + # check that Filters where property="type", value (name) doesnt have underscores + with pytest.raises(ValueError) as excinfo: + Filter("type", "=", "oh_underscore") + assert "Filter for property 'type' cannot have its value 'oh_underscore'" in str(excinfo.value) + + +def test_apply_common_filters0(): + # "Return any object whose type is not relationship" + resp = list(apply_common_filters(stix_objs, [filters[0]])) + ids = [r['id'] for r in resp] + assert stix_objs[0]['id'] in ids + assert stix_objs[1]['id'] in ids + assert stix_objs[3]['id'] in ids + assert len(ids) == 4 + + resp = list(apply_common_filters(real_stix_objs, [filters[0]])) + ids = [r.id for r in resp] + assert real_stix_objs[0].id in ids + assert real_stix_objs[1].id in ids + assert real_stix_objs[3].id in ids + assert len(ids) == 4 + + +def test_apply_common_filters1(): + # "Return any object that matched id relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463" + resp = list(apply_common_filters(stix_objs, [filters[1]])) + assert resp[0]['id'] == stix_objs[2]['id'] + assert len(resp) == 1 + + resp = list(apply_common_filters(real_stix_objs, [filters[1]])) + assert resp[0].id == real_stix_objs[2].id + assert len(resp) == 1 + + +def test_apply_common_filters2(): + # "Return any object that contains remote-access-trojan in labels" + resp = list(apply_common_filters(stix_objs, [filters[2]])) + assert resp[0]['id'] == stix_objs[0]['id'] + assert len(resp) == 1 + + resp = list(apply_common_filters(real_stix_objs, [filters[2]])) + assert resp[0].id == real_stix_objs[0].id + assert len(resp) == 1 + + +def test_apply_common_filters3(): + # "Return any object created after 2015-01-01T01:00:00.000Z" + resp = list(apply_common_filters(stix_objs, [filters[3]])) + assert resp[0]['id'] == stix_objs[0]['id'] + assert len(resp) == 3 + + resp = list(apply_common_filters(real_stix_objs, [filters[3]])) + assert resp[0].id == real_stix_objs[0].id + assert len(resp) == 3 + + +def test_apply_common_filters4(): + # "Return any revoked object" + resp = list(apply_common_filters(stix_objs, [filters[4]])) + assert resp[0]['id'] == stix_objs[2]['id'] + assert len(resp) == 1 + + resp = list(apply_common_filters(real_stix_objs, [filters[4]])) + assert resp[0].id == real_stix_objs[2].id + assert len(resp) == 1 + + +def test_apply_common_filters5(): + # "Return any object whose not revoked" + resp = list(apply_common_filters(stix_objs, [filters[5]])) + assert len(resp) == 0 + + resp = list(apply_common_filters(real_stix_objs, [filters[5]])) + assert len(resp) == 4 + + +def test_apply_common_filters6(): + # "Return any object that matches marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9 in object_marking_refs" + resp = list(apply_common_filters(stix_objs, [filters[6]])) + assert resp[0]['id'] == stix_objs[2]['id'] + assert len(resp) == 1 + + resp = list(apply_common_filters(real_stix_objs, [filters[6]])) + assert resp[0].id == real_stix_objs[2].id + assert len(resp) == 1 + + +def test_apply_common_filters7(): + # "Return any object that contains relationship_type in their selectors AND + # also has marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed in marking_ref" + resp = list(apply_common_filters(stix_objs, [filters[7], filters[8]])) + assert resp[0]['id'] == stix_objs[2]['id'] + assert len(resp) == 1 + + resp = list(apply_common_filters(real_stix_objs, [filters[7], filters[8]])) + assert resp[0].id == real_stix_objs[2].id + assert len(resp) == 1 + + +def test_apply_common_filters8(): + # "Return any object that contains CVE-2014-0160,CVE-2017-6608 in their external_id" + resp = list(apply_common_filters(stix_objs, [filters[9]])) + assert resp[0]['id'] == stix_objs[3]['id'] + assert len(resp) == 1 + + resp = list(apply_common_filters(real_stix_objs, [filters[9]])) + assert resp[0].id == real_stix_objs[3].id + assert len(resp) == 1 + + +def test_apply_common_filters9(): + # "Return any object that matches created_by_ref identity--00000000-0000-0000-0000-b8e91df99dc9" + resp = list(apply_common_filters(stix_objs, [filters[10]])) + assert len(resp) == 1 + + resp = list(apply_common_filters(real_stix_objs, [filters[10]])) + assert len(resp) == 1 + + +def test_apply_common_filters10(): + # "Return any object that matches marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9 in object_marking_refs" (None) + resp = list(apply_common_filters(stix_objs, [filters[11]])) + assert len(resp) == 0 + + resp = list(apply_common_filters(real_stix_objs, [filters[11]])) + assert len(resp) == 0 + + +def test_apply_common_filters11(): + # "Return any object that contains description in its selectors" (None) + resp = list(apply_common_filters(stix_objs, [filters[12]])) + assert len(resp) == 0 + + resp = list(apply_common_filters(real_stix_objs, [filters[12]])) + assert len(resp) == 0 + + +def test_apply_common_filters12(): + # "Return any object that matches CVE in source_name" (None, case sensitive) + resp = list(apply_common_filters(stix_objs, [filters[13]])) + assert len(resp) == 0 + + resp = list(apply_common_filters(real_stix_objs, [filters[13]])) + assert len(resp) == 0 + + +def test_apply_common_filters13(): + # Return any object that matches file object in "objects" + resp = list(apply_common_filters(stix_objs, [filters[14]])) + assert resp[0]["id"] == stix_objs[4]["id"] + assert len(resp) == 1 + # important additional check to make sure original File dict was + # not converted to File object. (this was a deep bug found) + assert isinstance(resp[0]["objects"]["0"], dict) + + resp = list(apply_common_filters(real_stix_objs, [filters[14]])) + assert resp[0].id == real_stix_objs[4].id + assert len(resp) == 1 + + +def test_datetime_filter_behavior(): + """if a filter is initialized with its value being a datetime object + OR the STIX object property being filtered on is a datetime object, all + resulting comparisons executed are done on the string representations + of the datetime objects, as the Filter functionality will convert + all datetime objects to there string forms using format_datetim() + + This test makes sure all datetime comparisons are carried out correctly + """ + filter_with_dt_obj = Filter("created", "=", parse_into_datetime("2016-02-14T00:00:00.000Z", "millisecond")) + filter_with_str = Filter("created", "=", "2016-02-14T00:00:00.000Z") + + # check that filter value is converted from datetime to str + assert isinstance(filter_with_dt_obj.value, str) + + # compare datetime string to filter w/ datetime obj + resp = list(apply_common_filters(stix_objs, [filter_with_dt_obj])) + assert len(resp) == 1 + assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef" + + # compare datetime obj to filter w/ datetime obj + resp = list(apply_common_filters(real_stix_objs, [filter_with_dt_obj])) + assert len(resp) == 1 + assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef" + assert isinstance(resp[0].created, STIXdatetime) # make sure original object not altered + + # compare datetime string to filter w/ str + resp = list(apply_common_filters(stix_objs, [filter_with_str])) + assert len(resp) == 1 + assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef" + + # compare datetime obj to filter w/ str + resp = list(apply_common_filters(real_stix_objs, [filter_with_str])) + assert len(resp) == 1 + assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef" + assert isinstance(resp[0].created, STIXdatetime) # make sure original object not altered + + +def test_filters0(stix_objs2, real_stix_objs2): + # "Return any object modified before 2017-01-28T13:49:53.935Z" + resp = list(apply_common_filters(stix_objs2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")])) + assert resp[0]['id'] == stix_objs2[1]['id'] + assert len(resp) == 2 + + resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", "<", parse_into_datetime("2017-01-28T13:49:53.935Z"))])) + assert resp[0].id == real_stix_objs2[1].id + assert len(resp) == 2 + + +def test_filters1(stix_objs2, real_stix_objs2): + # "Return any object modified after 2017-01-28T13:49:53.935Z" + resp = list(apply_common_filters(stix_objs2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")])) + assert resp[0]['id'] == stix_objs2[0]['id'] + assert len(resp) == 1 + + resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", ">", parse_into_datetime("2017-01-28T13:49:53.935Z"))])) + assert resp[0].id == real_stix_objs2[0].id + assert len(resp) == 1 + + +def test_filters2(stix_objs2, real_stix_objs2): + # "Return any object modified after or on 2017-01-28T13:49:53.935Z" + resp = list(apply_common_filters(stix_objs2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")])) + assert resp[0]['id'] == stix_objs2[0]['id'] + assert len(resp) == 3 + + resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", ">=", parse_into_datetime("2017-01-27T13:49:53.935Z"))])) + assert resp[0].id == real_stix_objs2[0].id + assert len(resp) == 3 + + +def test_filters3(stix_objs2, real_stix_objs2): + # "Return any object modified before or on 2017-01-28T13:49:53.935Z" + resp = list(apply_common_filters(stix_objs2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")])) + assert resp[0]['id'] == stix_objs2[1]['id'] + assert len(resp) == 2 + + # "Return any object modified before or on 2017-01-28T13:49:53.935Z" + fv = Filter("modified", "<=", parse_into_datetime("2017-01-27T13:49:53.935Z")) + resp = list(apply_common_filters(real_stix_objs2, [fv])) + assert resp[0].id == real_stix_objs2[1].id + assert len(resp) == 2 + + +def test_filters4(): + # Assert invalid Filter cannot be created + with pytest.raises(ValueError) as excinfo: + Filter("modified", "?", "2017-01-27T13:49:53.935Z") + assert str(excinfo.value) == ("Filter operator '?' not supported " + "for specified property: 'modified'") + + +def test_filters5(stix_objs2, real_stix_objs2): + # "Return any object whose id is not indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f" + resp = list(apply_common_filters(stix_objs2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")])) + assert resp[0]['id'] == stix_objs2[0]['id'] + assert len(resp) == 1 + + resp = list(apply_common_filters(real_stix_objs2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")])) + assert resp[0].id == real_stix_objs2[0].id + assert len(resp) == 1 + + +def test_filters6(stix_objs2, real_stix_objs2): + # Test filtering on non-common property + resp = list(apply_common_filters(stix_objs2, [Filter("name", "=", "Malicious site hosting downloader")])) + assert resp[0]['id'] == stix_objs2[0]['id'] + assert len(resp) == 3 + + resp = list(apply_common_filters(real_stix_objs2, [Filter("name", "=", "Malicious site hosting downloader")])) + assert resp[0].id == real_stix_objs2[0].id + assert len(resp) == 3 + + +def test_filters7(stix_objs2, real_stix_objs2): + # Test filtering on embedded property + obsvd_data_obj = { + "type": "observed-data", + "id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", + "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + "created": "2016-04-06T19:58:16.000Z", + "modified": "2016-04-06T19:58:16.000Z", + "first_observed": "2015-12-21T19:00:00Z", + "last_observed": "2015-12-21T19:00:00Z", + "number_observed": 50, + "objects": { + "0": { + "type": "file", + "hashes": { + "SHA-256": "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100f" + }, + "extensions": { + "pdf-ext": { + "version": "1.7", + "document_info_dict": { + "Title": "Sample document", + "Author": "Adobe Systems Incorporated", + "Creator": "Adobe FrameMaker 5.5.3 for Power Macintosh", + "Producer": "Acrobat Distiller 3.01 for Power Macintosh", + "CreationDate": "20070412090123-02" + }, + "pdfid0": "DFCE52BD827ECF765649852119D", + "pdfid1": "57A1E0F9ED2AE523E313C" + } + } + } + } + } + + stix_objects = list(stix_objs2) + [obsvd_data_obj] + real_stix_objects = list(real_stix_objs2) + [parse(obsvd_data_obj)] + + resp = list(apply_common_filters(stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")])) + assert resp[0]['id'] == stix_objects[3]['id'] + assert len(resp) == 1 + + resp = list(apply_common_filters(real_stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")])) + assert resp[0].id == real_stix_objects[3].id + assert len(resp) == 1 diff --git a/stix2/test/test_datastore_memory.py b/stix2/test/test_datastore_memory.py new file mode 100644 index 0000000..7a5bf10 --- /dev/null +++ b/stix2/test/test_datastore_memory.py @@ -0,0 +1,87 @@ +import pytest + +from stix2.datastore import CompositeDataSource, make_id +from stix2.datastore.filters import Filter +from stix2.datastore.memory import MemorySink, MemorySource + + +def test_add_remove_composite_datasource(): + cds = CompositeDataSource() + ds1 = MemorySource() + ds2 = MemorySource() + ds3 = MemorySink() + + with pytest.raises(TypeError) as excinfo: + cds.add_data_sources([ds1, ds2, ds1, ds3]) + assert str(excinfo.value) == ("DataSource (to be added) is not of type " + "stix2.DataSource. DataSource type is ''") + + cds.add_data_sources([ds1, ds2, ds1]) + + assert len(cds.get_all_data_sources()) == 2 + + cds.remove_data_sources([ds1.id, ds2.id]) + + assert len(cds.get_all_data_sources()) == 0 + + +def test_composite_datasource_operations(stix_objs1, stix_objs2): + BUNDLE1 = dict(id="bundle--%s" % make_id(), + objects=stix_objs1, + spec_version="2.0", + type="bundle") + cds1 = CompositeDataSource() + ds1_1 = MemorySource(stix_data=BUNDLE1) + ds1_2 = MemorySource(stix_data=stix_objs2) + + cds2 = CompositeDataSource() + ds2_1 = MemorySource(stix_data=BUNDLE1) + ds2_2 = MemorySource(stix_data=stix_objs2) + + cds1.add_data_sources([ds1_1, ds1_2]) + cds2.add_data_sources([ds2_1, ds2_2]) + + indicators = cds1.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") + + # In STIX_OBJS2 changed the 'modified' property to a later time... + assert len(indicators) == 2 + + cds1.add_data_sources([cds2]) + + indicator = cds1.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") + + assert indicator["id"] == "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" + assert indicator["modified"] == "2017-01-31T13:49:53.935Z" + assert indicator["type"] == "indicator" + + query1 = [ + Filter("type", "=", "indicator") + ] + + query2 = [ + Filter("valid_from", "=", "2017-01-27T13:49:53.935382Z") + ] + + cds1.filters.add(query2) + + results = cds1.query(query1) + + # STIX_OBJS2 has indicator with later time, one with different id, one with + # original time in STIX_OBJS1 + assert len(results) == 3 + + indicator = cds1.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") + + assert indicator["id"] == "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" + assert indicator["modified"] == "2017-01-31T13:49:53.935Z" + assert indicator["type"] == "indicator" + + # There is only one indicator with different ID. Since we use the same data + # when deduplicated, only two indicators (one with different modified). + results = cds1.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") + assert len(results) == 2 + + # Since we have filters already associated with our CompositeSource providing + # nothing returns the same as cds1.query(query1) (the associated query is query2) + results = cds1.query([]) + assert len(results) == 3 diff --git a/stix2/test/test_datastore_taxii.py b/stix2/test/test_datastore_taxii.py new file mode 100644 index 0000000..098f944 --- /dev/null +++ b/stix2/test/test_datastore_taxii.py @@ -0,0 +1,294 @@ +import json + +from medallion.filters.basic_filter import BasicFilter +import pytest +from taxii2client import Collection, _filter_kwargs_to_query_params + +from stix2 import (Bundle, TAXIICollectionSink, TAXIICollectionSource, + TAXIICollectionStore, ThreatActor) +from stix2.datastore.filters import Filter + +COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/' + + +class MockTAXIICollectionEndpoint(Collection): + """Mock for taxii2_client.TAXIIClient""" + + def __init__(self, url, **kwargs): + super(MockTAXIICollectionEndpoint, self).__init__(url, **kwargs) + self.objects = [] + + def add_objects(self, bundle): + self._verify_can_write() + if isinstance(bundle, str): + bundle = json.loads(bundle) + for object in bundle.get("objects", []): + self.objects.append(object) + + def get_objects(self, **filter_kwargs): + self._verify_can_read() + query_params = _filter_kwargs_to_query_params(filter_kwargs) + if not isinstance(query_params, dict): + query_params = json.loads(query_params) + full_filter = BasicFilter(query_params or {}) + objs = full_filter.process_filter( + self.objects, + ("id", "type", "version"), + [] + ) + return Bundle(objects=objs) + + def get_object(self, id, version=None): + self._verify_can_read() + query_params = None + if version: + query_params = _filter_kwargs_to_query_params({"version": version}) + if query_params: + query_params = json.loads(query_params) + full_filter = BasicFilter(query_params or {}) + objs = full_filter.process_filter( + self.objects, + ("version",), + [] + ) + return Bundle(objects=objs) + + +@pytest.fixture +def collection(stix_objs1): + mock = MockTAXIICollectionEndpoint(COLLECTION_URL, **{ + "id": "91a7b528-80eb-42ed-a74d-c6fbd5a26116", + "title": "Writable Collection", + "description": "This collection is a dropbox for submitting indicators", + "can_read": True, + "can_write": True, + "media_types": [ + "application/vnd.oasis.stix+json; version=2.0" + ] + }) + + mock.objects.extend(stix_objs1) + return mock + + +def test_ds_taxii(collection): + ds = TAXIICollectionSource(collection) + assert ds.collection is not None + + +def test_add_stix2_object(collection): + tc_sink = TAXIICollectionSink(collection) + + # create new STIX threat-actor + ta = ThreatActor(name="Teddy Bear", + labels=["nation-state"], + sophistication="innovator", + resource_level="government", + goals=[ + "compromising environment NGOs", + "water-hole attacks geared towards energy sector", + ]) + + tc_sink.add(ta) + + +def test_add_stix2_with_custom_object(collection): + tc_sink = TAXIICollectionStore(collection, allow_custom=True) + + # create new STIX threat-actor + ta = ThreatActor(name="Teddy Bear", + labels=["nation-state"], + sophistication="innovator", + resource_level="government", + goals=[ + "compromising environment NGOs", + "water-hole attacks geared towards energy sector", + ], + foo="bar", + allow_custom=True) + + tc_sink.add(ta) + + +def test_add_list_object(collection, indicator): + tc_sink = TAXIICollectionSink(collection) + + # create new STIX threat-actor + ta = ThreatActor(name="Teddy Bear", + labels=["nation-state"], + sophistication="innovator", + resource_level="government", + goals=[ + "compromising environment NGOs", + "water-hole attacks geared towards energy sector", + ]) + + tc_sink.add([ta, indicator]) + + +def test_add_stix2_bundle_object(collection): + tc_sink = TAXIICollectionSink(collection) + + # create new STIX threat-actor + ta = ThreatActor(name="Teddy Bear", + labels=["nation-state"], + sophistication="innovator", + resource_level="government", + goals=[ + "compromising environment NGOs", + "water-hole attacks geared towards energy sector", + ]) + + tc_sink.add(Bundle(objects=[ta])) + + +def test_add_str_object(collection): + tc_sink = TAXIICollectionSink(collection) + + # create new STIX threat-actor + ta = """{ + "type": "threat-actor", + "id": "threat-actor--eddff64f-feb1-4469-b07c-499a73c96415", + "created": "2018-04-23T16:40:50.847Z", + "modified": "2018-04-23T16:40:50.847Z", + "name": "Teddy Bear", + "goals": [ + "compromising environment NGOs", + "water-hole attacks geared towards energy sector" + ], + "sophistication": "innovator", + "resource_level": "government", + "labels": [ + "nation-state" + ] + }""" + + tc_sink.add(ta) + + +def test_add_dict_object(collection): + tc_sink = TAXIICollectionSink(collection) + + ta = { + "type": "threat-actor", + "id": "threat-actor--eddff64f-feb1-4469-b07c-499a73c96415", + "created": "2018-04-23T16:40:50.847Z", + "modified": "2018-04-23T16:40:50.847Z", + "name": "Teddy Bear", + "goals": [ + "compromising environment NGOs", + "water-hole attacks geared towards energy sector" + ], + "sophistication": "innovator", + "resource_level": "government", + "labels": [ + "nation-state" + ] + } + + tc_sink.add(ta) + + +def test_add_dict_bundle_object(collection): + tc_sink = TAXIICollectionSink(collection) + + ta = { + "type": "bundle", + "id": "bundle--860ccc8d-56c9-4fda-9384-84276fb52fb1", + "spec_version": "2.0", + "objects": [ + { + "type": "threat-actor", + "id": "threat-actor--dc5a2f41-f76e-425a-81fe-33afc7aabd75", + "created": "2018-04-23T18:45:11.390Z", + "modified": "2018-04-23T18:45:11.390Z", + "name": "Teddy Bear", + "goals": [ + "compromising environment NGOs", + "water-hole attacks geared towards energy sector" + ], + "sophistication": "innovator", + "resource_level": "government", + "labels": [ + "nation-state" + ] + } + ] + } + + tc_sink.add(ta) + + +def test_get_stix2_object(collection): + tc_sink = TAXIICollectionSource(collection) + + objects = tc_sink.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") + + assert objects + + +def test_parse_taxii_filters(collection): + query = [ + Filter("added_after", "=", "2016-02-01T00:00:01.000Z"), + Filter("id", "=", "taxii stix object ID"), + Filter("type", "=", "taxii stix object ID"), + Filter("version", "=", "first"), + Filter("created_by_ref", "=", "Bane"), + ] + + taxii_filters_expected = [ + Filter("added_after", "=", "2016-02-01T00:00:01.000Z"), + Filter("id", "=", "taxii stix object ID"), + Filter("type", "=", "taxii stix object ID"), + Filter("version", "=", "first") + ] + + ds = TAXIICollectionSource(collection) + + taxii_filters = ds._parse_taxii_filters(query) + + assert taxii_filters == taxii_filters_expected + + +def test_add_get_remove_filter(collection): + ds = TAXIICollectionSource(collection) + + # First 3 filters are valid, remaining properties are erroneous in some way + valid_filters = [ + Filter('type', '=', 'malware'), + Filter('id', '!=', 'stix object id'), + Filter('labels', 'in', ["heartbleed", "malicious-activity"]), + ] + + assert len(ds.filters) == 0 + + ds.filters.add(valid_filters[0]) + assert len(ds.filters) == 1 + + # Addin the same filter again will have no effect since `filters` acts + # like a set + ds.filters.add(valid_filters[0]) + assert len(ds.filters) == 1 + + ds.filters.add(valid_filters[1]) + assert len(ds.filters) == 2 + + ds.filters.add(valid_filters[2]) + assert len(ds.filters) == 3 + + assert valid_filters == [f for f in ds.filters] + + # remove + ds.filters.remove(valid_filters[0]) + + assert len(ds.filters) == 2 + + ds.filters.add(valid_filters) + + +def test_get_all_versions(collection): + ds = TAXIICollectionStore(collection) + + indicators = ds.all_versions('indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f') + # There are 3 indicators but 2 share the same 'modified' timestamp + assert len(indicators) == 2 diff --git a/stix2/test/test_utils.py b/stix2/test/test_utils.py index 84b2476..655cd61 100644 --- a/stix2/test/test_utils.py +++ b/stix2/test/test_utils.py @@ -82,3 +82,21 @@ def test_get_dict_invalid(data): ]) def test_get_type_from_id(stix_id, typ): assert stix2.utils.get_type_from_id(stix_id) == typ + + +def test_deduplicate(stix_objs1): + unique = stix2.utils.deduplicate(stix_objs1) + + # Only 3 objects are unique + # 2 id's vary + # 2 modified times vary for a particular id + + assert len(unique) == 3 + + ids = [obj['id'] for obj in unique] + mods = [obj['modified'] for obj in unique] + + assert "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f" in ids + assert "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" in ids + assert "2017-01-27T13:49:53.935Z" in mods + assert "2017-01-27T13:49:53.936Z" in mods diff --git a/tox.ini b/tox.ini index 86cd4ee..683e137 100644 --- a/tox.ini +++ b/tox.ini @@ -3,15 +3,16 @@ envlist = py27,py34,py35,py36,style,isort-check [testenv] deps = - -U - tox - pytest - pytest-cov - coverage - taxii2-client + -U + tox + pytest + pytest-cov + coverage + taxii2-client + medallion commands = - pytest --ignore=stix2/test/test_workbench.py --cov=stix2 stix2/test/ --cov-report term-missing - pytest stix2/test/test_workbench.py --cov=stix2 --cov-report term-missing --cov-append + pytest --ignore=stix2/test/test_workbench.py --cov=stix2 stix2/test/ --cov-report term-missing + pytest stix2/test/test_workbench.py --cov=stix2 --cov-report term-missing --cov-append passenv = CI TRAVIS TRAVIS_* @@ -22,13 +23,13 @@ commands = flake8 [flake8] -max-line-length=160 +max-line-length = 160 [testenv:isort-check] deps = isort commands = - isort -rc stix2 examples -df - isort -rc stix2 examples -c + isort -rc stix2 examples -df + isort -rc stix2 examples -c [travis] python =