import pytest from taxii2client import Collection from stix2.sources import (CompositeDataSource, DataSink, DataSource, DataStore, Filter, make_id, taxii) from stix2.sources.memory import MemorySource COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/' class MockTAXIIClient(object): """Mock for taxii2_client.TAXIIClient""" pass @pytest.fixture def collection(): return Collection(COLLECTION_URL, MockTAXIIClient()) STIX_OBJS1 = [ { "created": "2017-01-27T13:49:53.935Z", "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", "labels": [ "url-watchlist" ], "modified": "2017-01-27T13:49:53.935Z", "name": "Malicious site hosting downloader", "pattern": "[url:value = 'http://x4z9arb.cn/4712']", "type": "indicator", "valid_from": "2017-01-27T13:49:53.935382Z" }, { "created": "2017-01-27T13:49:53.935Z", "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", "labels": [ "url-watchlist" ], "modified": "2017-01-27T13:49:53.935Z", "name": "Malicious site hosting downloader", "pattern": "[url:value = 'http://x4z9arb.cn/4712']", "type": "indicator", "valid_from": "2017-01-27T13:49:53.935382Z" }, { "created": "2017-01-27T13:49:53.935Z", "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", "labels": [ "url-watchlist" ], "modified": "2017-01-27T13:49:53.936Z", "name": "Malicious site hosting downloader", "pattern": "[url:value = 'http://x4z9arb.cn/4712']", "type": "indicator", "valid_from": "2017-01-27T13:49:53.935382Z" }, { "created": "2017-01-27T13:49:53.935Z", "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", "labels": [ "url-watchlist" ], "modified": "2017-01-27T13:49:53.935Z", "name": "Malicious site hosting downloader", "pattern": "[url:value = 'http://x4z9arb.cn/4712']", "type": "indicator", "valid_from": "2017-01-27T13:49:53.935382Z" }, { "created": "2017-01-27T13:49:53.935Z", "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", "labels": [ "url-watchlist" ], "modified": "2017-01-27T13:49:53.935Z", "name": "Malicious site hosting downloader", "pattern": "[url:value = 'http://x4z9arb.cn/4712']", "type": "indicator", "valid_from": "2017-01-27T13:49:53.935382Z" } ] STIX_OBJS2 = [ { "created": "2017-01-27T13:49:53.935Z", "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", "labels": [ "url-watchlist" ], "modified": "2017-01-31T13:49:53.935Z", "name": "Malicious site hosting downloader", "pattern": "[url:value = 'http://x4z9arb.cn/4712']", "type": "indicator", "valid_from": "2017-01-27T13:49:53.935382Z" }, { "created": "2017-01-27T13:49:53.935Z", "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", "labels": [ "url-watchlist" ], "modified": "2017-01-27T13:49:53.935Z", "name": "Malicious site hosting downloader", "pattern": "[url:value = 'http://x4z9arb.cn/4712']", "type": "indicator", "valid_from": "2017-01-27T13:49:53.935382Z" }, { "created": "2017-01-27T13:49:53.935Z", "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", "labels": [ "url-watchlist" ], "modified": "2017-01-27T13:49:53.935Z", "name": "Malicious site hosting downloader", "pattern": "[url:value = 'http://x4z9arb.cn/4712']", "type": "indicator", "valid_from": "2017-01-27T13:49:53.935382Z" } ] def test_ds_smoke(): ds1 = DataSource() ds2 = DataSink() ds3 = DataStore(source=ds1, sink=ds2) with pytest.raises(NotImplementedError): ds3.add(None) with pytest.raises(NotImplementedError): ds3.all_versions("malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111") with pytest.raises(NotImplementedError): ds3.get("malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111") with pytest.raises(NotImplementedError): ds3.query([Filter("id", "=", "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")]) def test_ds_taxii(collection): ds = taxii.TAXIICollectionSource(collection) assert ds.name == 'TAXIICollectionSource' def test_ds_taxii_name(collection): ds = taxii.TAXIICollectionSource(collection, name='My Data Source Name') assert ds.name == "My Data Source Name" def test_parse_taxii_filters(): query = [ Filter("added_after", "=", "2016-02-01T00:00:01.000Z"), Filter("id", "=", "taxii stix object ID"), Filter("type", "=", "taxii stix object ID"), Filter("version", "=", "first"), Filter("created_by_ref", "=", "Bane"), ] expected_params = { "added_after": "2016-02-01T00:00:01.000Z", "match[id]": "taxii stix object ID", "match[type]": "taxii stix object ID", "match[version]": "first" } ds = taxii.TAXIICollectionSource(collection) taxii_filters = ds._parse_taxii_filters(query) assert taxii_filters == expected_params def test_add_get_remove_filter(): # First 3 filters are valid, remaining fields are erroneous in some way valid_filters = [ Filter('type', '=', 'malware'), Filter('id', '!=', 'stix object id'), Filter('labels', 'in', ["heartbleed", "malicious-activity"]), ] invalid_filters = [ Filter('description', '=', 'not supported field - just place holder'), Filter('modified', '*', 'not supported operator - just place holder'), Filter('created', '=', object()), ] ds = DataSource() assert len(ds.filters) == 0 ds.add_filter(valid_filters[0]) assert len(ds.filters) == 1 # Addin the same filter again will have no effect since `filters` uses a set ds.add_filter(valid_filters[0]) assert len(ds.filters) == 1 ds.add_filter(valid_filters[1]) assert len(ds.filters) == 2 ds.add_filter(valid_filters[2]) assert len(ds.filters) == 3 # TODO: make better error messages with pytest.raises(ValueError) as excinfo: ds.add_filter(invalid_filters[0]) assert str(excinfo.value) == "Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported" with pytest.raises(ValueError) as excinfo: ds.add_filter(invalid_filters[1]) assert str(excinfo.value) == "Filter operation(from 'op' field) not supported" with pytest.raises(ValueError) as excinfo: ds.add_filter(invalid_filters[2]) assert str(excinfo.value) == "Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary" assert set(valid_filters) == ds.filters # remove ds.filters.remove(valid_filters[0]) assert len(ds.filters) == 2 ds.add_filters(valid_filters) def test_apply_common_filters(): stix_objs = [ { "created": "2017-01-27T13:49:53.997Z", "description": "\n\nTITLE:\n\tPoison Ivy", "id": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111", "labels": [ "remote-access-trojan" ], "modified": "2017-01-27T13:49:53.997Z", "name": "Poison Ivy", "type": "malware" }, { "created": "2014-05-08T09:00:00.000Z", "id": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade", "labels": [ "file-hash-watchlist" ], "modified": "2014-05-08T09:00:00.000Z", "name": "File hash for Poison Ivy variant", "pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']", "type": "indicator", "valid_from": "2014-05-08T09:00:00.000000Z" }, { "created": "2014-05-08T09:00:00.000Z", "granular_markings": [ { "marking_ref": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed", "selectors": [ "relationship_type" ] } ], "id": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463", "modified": "2014-05-08T09:00:00.000Z", "object_marking_refs": [ "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" ], "relationship_type": "indicates", "revoked": True, "source_ref": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade", "target_ref": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111", "type": "relationship" } ] filters = [ Filter("type", "!=", "relationship"), Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"), Filter("labels", "in", "remote-access-trojan"), Filter("created", ">", "2015-01-01T01:00:00.000Z"), Filter("revoked", "=", True), Filter("revoked", "!=", True), Filter("revoked", "?", False), Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"), Filter("granular_markings.selectors", "in", "relationship_type"), Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"), ] ds = DataSource() resp = ds.apply_common_filters(stix_objs, [filters[0]]) ids = [r['id'] for r in resp] assert stix_objs[0]['id'] in ids assert stix_objs[1]['id'] in ids resp = ds.apply_common_filters(stix_objs, [filters[1]]) assert resp[0]['id'] == stix_objs[2]['id'] resp = ds.apply_common_filters(stix_objs, [filters[2]]) assert resp[0]['id'] == stix_objs[0]['id'] resp = ds.apply_common_filters(stix_objs, [filters[3]]) assert resp[0]['id'] == stix_objs[0]['id'] assert len(resp) == 1 resp = ds.apply_common_filters(stix_objs, [filters[4]]) assert resp[0]['id'] == stix_objs[2]['id'] assert len(resp) == 1 # Note that if 'revoked' property is not present in object. # Currently we can't use such an expression to filter for... resp = ds.apply_common_filters(stix_objs, [filters[5]]) assert len(resp) == 0 with pytest.raises(ValueError) as excinfo: ds.apply_common_filters(stix_objs, [filters[6]]) assert str(excinfo.value) == ("Error, filter operator: {0} not supported " "for specified field: {1}").format(filters[6].op, filters[6].field) resp = ds.apply_common_filters(stix_objs, [filters[7]]) assert resp[0]['id'] == stix_objs[2]['id'] assert len(resp) == 1 resp = ds.apply_common_filters(stix_objs, [filters[8], filters[9]]) assert resp[0]['id'] == stix_objs[2]['id'] assert len(resp) == 1 # These are used with STIX_OBJS2 more_filters = [ Filter("modified", "<", "2017-01-28T13:49:53.935Z"), Filter("modified", ">", "2017-01-28T13:49:53.935Z"), Filter("modified", ">=", "2017-01-27T13:49:53.935Z"), Filter("modified", "<=", "2017-01-27T13:49:53.935Z"), Filter("modified", "?", "2017-01-27T13:49:53.935Z"), Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"), Filter("id", "?", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"), Filter("notacommonproperty", "=", "bar"), ] resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[0]]) assert resp[0]['id'] == STIX_OBJS2[1]['id'] assert len(resp) == 2 resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[1]]) assert resp[0]['id'] == STIX_OBJS2[0]['id'] assert len(resp) == 1 resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[2]]) assert resp[0]['id'] == STIX_OBJS2[0]['id'] assert len(resp) == 3 resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[3]]) assert resp[0]['id'] == STIX_OBJS2[1]['id'] assert len(resp) == 2 with pytest.raises(ValueError) as excinfo: ds.apply_common_filters(STIX_OBJS2, [more_filters[4]]) assert str(excinfo.value) == ("Error, filter operator: {0} not supported " "for specified field: {1}").format(more_filters[4].op, more_filters[4].field) resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[5]]) assert resp[0]['id'] == STIX_OBJS2[0]['id'] assert len(resp) == 1 with pytest.raises(ValueError) as excinfo: ds.apply_common_filters(STIX_OBJS2, [more_filters[6]]) assert str(excinfo.value) == ("Error, filter operator: {0} not supported " "for specified field: {1}").format(more_filters[6].op, more_filters[6].field) with pytest.raises(ValueError) as excinfo: ds.apply_common_filters(STIX_OBJS2, [more_filters[7]]) assert str(excinfo.value) == ("Error, field: {0} is not supported for " "filtering on.".format(more_filters[7].field)) def test_deduplicate(): ds = DataSource() unique = ds.deduplicate(STIX_OBJS1) # Only 3 objects are unique # 2 id's vary # 2 modified times vary for a particular id assert len(unique) == 3 ids = [obj['id'] for obj in unique] mods = [obj['modified'] for obj in unique] assert "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f" in ids assert "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" in ids assert "2017-01-27T13:49:53.935Z" in mods assert "2017-01-27T13:49:53.936Z" in mods def test_add_remove_composite_datasource(): cds = CompositeDataSource() ds1 = DataSource() ds2 = DataSource() ds3 = DataSink() cds.add_data_source([ds1, ds2, ds1, ds3]) assert len(cds.get_all_data_sources()) == 2 cds.remove_data_source([ds1.id_, ds2.id_]) assert len(cds.get_all_data_sources()) == 0 with pytest.raises(ValueError): cds.remove_data_source([ds3.id_]) def test_composite_datasource_operations(): BUNDLE1 = dict(id="bundle--%s" % make_id(), objects=STIX_OBJS1, spec_version="2.0", type="bundle") cds = CompositeDataSource() ds1 = MemorySource(stix_data=BUNDLE1) ds2 = MemorySource(stix_data=STIX_OBJS2) cds.add_data_source([ds1, ds2]) indicators = cds.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") # In STIX_OBJS2 changed the 'modified' property to a later time... assert len(indicators) == 2 indicator = cds.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") assert indicator["id"] == "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" assert indicator["modified"] == "2017-01-31T13:49:53.935Z" assert indicator["type"] == "indicator" query = [ Filter("type", "=", "indicator") ] results = cds.query(query) # STIX_OBJS2 has indicator with later time, one with different id, one with # original time in STIX_OBJS1 assert len(results) == 3 # def test_data_source_file(): # ds = file.FileDataSource() # # assert ds.name == "DataSource" # # # def test_data_source_name(): # ds = file.FileDataSource(name="My File Data Source") # # assert ds.name == "My File Data Source" # # # def test_data_source_get(): # ds = file.FileDataSource(name="My File Data Source") # # with pytest.raises(NotImplementedError): # ds.get("foo") # # #filter testing # def test_add_filter(): # ds = file.FileDataSource()