import pytest from taxii2client import Collection from stix2.sources import (CompositeDataSource, DataSink, DataSource, DataStore, make_id, taxii) from stix2.sources.filters import Filter from stix2.sources.memory import MemorySource, MemoryStore COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/' class MockTAXIIClient(object): """Mock for taxii2_client.TAXIIClient""" pass @pytest.fixture def collection(): return Collection(COLLECTION_URL, MockTAXIIClient()) @pytest.fixture def ds(): return DataSource() IND1 = { "created": "2017-01-27T13:49:53.935Z", "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", "labels": [ "url-watchlist" ], "modified": "2017-01-27T13:49:53.935Z", "name": "Malicious site hosting downloader", "pattern": "[url:value = 'http://x4z9arb.cn/4712']", "type": "indicator", "valid_from": "2017-01-27T13:49:53.935382Z" } IND2 = { "created": "2017-01-27T13:49:53.935Z", "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", "labels": [ "url-watchlist" ], "modified": "2017-01-27T13:49:53.935Z", "name": "Malicious site hosting downloader", "pattern": "[url:value = 'http://x4z9arb.cn/4712']", "type": "indicator", "valid_from": "2017-01-27T13:49:53.935382Z" } IND3 = { "created": "2017-01-27T13:49:53.935Z", "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", "labels": [ "url-watchlist" ], "modified": "2017-01-27T13:49:53.936Z", "name": "Malicious site hosting downloader", "pattern": "[url:value = 'http://x4z9arb.cn/4712']", "type": "indicator", "valid_from": "2017-01-27T13:49:53.935382Z" } IND4 = { "created": "2017-01-27T13:49:53.935Z", "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", "labels": [ "url-watchlist" ], "modified": "2017-01-27T13:49:53.935Z", "name": "Malicious site hosting downloader", "pattern": "[url:value = 'http://x4z9arb.cn/4712']", "type": "indicator", "valid_from": "2017-01-27T13:49:53.935382Z" } IND5 = { "created": "2017-01-27T13:49:53.935Z", "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", "labels": [ "url-watchlist" ], "modified": "2017-01-27T13:49:53.935Z", "name": "Malicious site hosting downloader", "pattern": "[url:value = 'http://x4z9arb.cn/4712']", "type": "indicator", "valid_from": "2017-01-27T13:49:53.935382Z" } IND6 = { "created": "2017-01-27T13:49:53.935Z", "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", "labels": [ "url-watchlist" ], "modified": "2017-01-31T13:49:53.935Z", "name": "Malicious site hosting downloader", "pattern": "[url:value = 'http://x4z9arb.cn/4712']", "type": "indicator", "valid_from": "2017-01-27T13:49:53.935382Z" } IND7 = { "created": "2017-01-27T13:49:53.935Z", "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", "labels": [ "url-watchlist" ], "modified": "2017-01-27T13:49:53.935Z", "name": "Malicious site hosting downloader", "pattern": "[url:value = 'http://x4z9arb.cn/4712']", "type": "indicator", "valid_from": "2017-01-27T13:49:53.935382Z" } IND8 = { "created": "2017-01-27T13:49:53.935Z", "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", "labels": [ "url-watchlist" ], "modified": "2017-01-27T13:49:53.935Z", "name": "Malicious site hosting downloader", "pattern": "[url:value = 'http://x4z9arb.cn/4712']", "type": "indicator", "valid_from": "2017-01-27T13:49:53.935382Z" } STIX_OBJS2 = [IND6, IND7, IND8] STIX_OBJS1 = [IND1, IND2, IND3, IND4, IND5] def test_ds_abstract_class_smoke(): ds1 = DataSource() ds2 = DataSink() ds3 = DataStore(source=ds1, sink=ds2) with pytest.raises(NotImplementedError): ds3.add(None) with pytest.raises(NotImplementedError): ds3.all_versions("malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111") with pytest.raises(NotImplementedError): ds3.get("malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111") with pytest.raises(NotImplementedError): ds3.query([Filter("id", "=", "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")]) def test_memory_store_smoke(): # Initialize MemoryStore with dict ms = MemoryStore(STIX_OBJS1) # Add item to sink ms.add(dict(id="bundle--%s" % make_id(), objects=STIX_OBJS2, spec_version="2.0", type="bundle")) resp = ms.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") assert len(resp) == 1 resp = ms.get("indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f") assert resp["id"] == "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f" query = [Filter('type', '=', 'malware')] resp = ms.query(query) assert len(resp) == 0 def test_ds_taxii(collection): ds = taxii.TAXIICollectionSource(collection) assert ds.collection is not None def test_ds_taxii_name(collection): ds = taxii.TAXIICollectionSource(collection) assert ds.collection is not None def test_parse_taxii_filters(): query = [ Filter("added_after", "=", "2016-02-01T00:00:01.000Z"), Filter("id", "=", "taxii stix object ID"), Filter("type", "=", "taxii stix object ID"), Filter("version", "=", "first"), Filter("created_by_ref", "=", "Bane"), ] expected_params = { "added_after": "2016-02-01T00:00:01.000Z", "match[id]": "taxii stix object ID", "match[type]": "taxii stix object ID", "match[version]": "first" } ds = taxii.TAXIICollectionSource(collection) taxii_filters = ds._parse_taxii_filters(query) assert taxii_filters == expected_params def test_add_get_remove_filter(ds): # First 3 filters are valid, remaining fields are erroneous in some way valid_filters = [ Filter('type', '=', 'malware'), Filter('id', '!=', 'stix object id'), Filter('labels', 'in', ["heartbleed", "malicious-activity"]), ] invalid_filters = [ Filter('description', '=', 'not supported field - just place holder'), Filter('modified', '*', 'not supported operator - just place holder'), Filter('created', '=', object()), ] assert len(ds.filters) == 0 ds.add_filter(valid_filters[0]) assert len(ds.filters) == 1 # Addin the same filter again will have no effect since `filters` uses a set ds.add_filter(valid_filters[0]) assert len(ds.filters) == 1 ds.add_filter(valid_filters[1]) assert len(ds.filters) == 2 ds.add_filter(valid_filters[2]) assert len(ds.filters) == 3 # TODO: make better error messages with pytest.raises(ValueError) as excinfo: ds.add_filter(invalid_filters[0]) assert str(excinfo.value) == "Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported" with pytest.raises(ValueError) as excinfo: ds.add_filter(invalid_filters[1]) assert str(excinfo.value) == "Filter operation (from 'op' field) not supported" with pytest.raises(ValueError) as excinfo: ds.add_filter(invalid_filters[2]) assert str(excinfo.value) == "Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary" assert set(valid_filters) == ds.filters # remove ds.filters.remove(valid_filters[0]) assert len(ds.filters) == 2 ds.add_filters(valid_filters) def test_apply_common_filters(ds): stix_objs = [ { "created": "2017-01-27T13:49:53.997Z", "description": "\n\nTITLE:\n\tPoison Ivy", "id": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111", "labels": [ "remote-access-trojan" ], "modified": "2017-01-27T13:49:53.997Z", "name": "Poison Ivy", "type": "malware" }, { "created": "2014-05-08T09:00:00.000Z", "id": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade", "labels": [ "file-hash-watchlist" ], "modified": "2014-05-08T09:00:00.000Z", "name": "File hash for Poison Ivy variant", "pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']", "type": "indicator", "valid_from": "2014-05-08T09:00:00.000000Z" }, { "created": "2014-05-08T09:00:00.000Z", "granular_markings": [ { "marking_ref": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed", "selectors": [ "relationship_type" ] } ], "id": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463", "modified": "2014-05-08T09:00:00.000Z", "object_marking_refs": [ "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" ], "relationship_type": "indicates", "revoked": True, "source_ref": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade", "target_ref": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111", "type": "relationship" }, { "id": "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef", "created": "2016-02-14T00:00:00.000Z", "created_by_ref": "identity--00000000-0000-0000-0000-b8e91df99dc9", "modified": "2016-02-14T00:00:00.000Z", "type": "vulnerability", "name": "CVE-2014-0160", "description": "The (1) TLS...", "external_references": [ { "source_name": "cve", "external_id": "CVE-2014-0160" } ], "labels": ["heartbleed", "has-logo"] } ] filters = [ Filter("type", "!=", "relationship"), Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"), Filter("labels", "in", "remote-access-trojan"), Filter("created", ">", "2015-01-01T01:00:00.000Z"), Filter("revoked", "=", True), Filter("revoked", "!=", True), Filter("revoked", "?", False), Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"), Filter("granular_markings.selectors", "in", "relationship_type"), Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"), Filter("external_references.external_id", "in", "CVE-2014-0160,CVE-2017-6608"), Filter("created_by_ref", "=", "identity--00000000-0000-0000-0000-b8e91df99dc9"), Filter("object_marking_refs", "=", "marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9"), Filter("granular_markings.selectors", "in", "description"), Filter("external_references.source_name", "=", "CVE"), ] # "Return any object whose type is not relationship" resp = ds.apply_common_filters(stix_objs, [filters[0]]) ids = [r['id'] for r in resp] assert stix_objs[0]['id'] in ids assert stix_objs[1]['id'] in ids assert stix_objs[3]['id'] in ids assert len(ids) == 3 # "Return any object that matched id relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463" resp = ds.apply_common_filters(stix_objs, [filters[1]]) assert resp[0]['id'] == stix_objs[2]['id'] assert len(resp) == 1 # "Return any object that contains remote-access-trojan in labels" resp = ds.apply_common_filters(stix_objs, [filters[2]]) assert resp[0]['id'] == stix_objs[0]['id'] assert len(resp) == 1 # "Return any object created after 2015-01-01T01:00:00.000Z" resp = ds.apply_common_filters(stix_objs, [filters[3]]) assert resp[0]['id'] == stix_objs[0]['id'] assert len(resp) == 2 # "Return any revoked object" resp = ds.apply_common_filters(stix_objs, [filters[4]]) assert resp[0]['id'] == stix_objs[2]['id'] assert len(resp) == 1 # "Return any object whose not revoked" # Note that if 'revoked' property is not present in object. # Currently we can't use such an expression to filter for... :( resp = ds.apply_common_filters(stix_objs, [filters[5]]) assert len(resp) == 0 # Assert unknown operator for _boolean() raises exception. with pytest.raises(ValueError) as excinfo: ds.apply_common_filters(stix_objs, [filters[6]]) assert str(excinfo.value) == ("Error, filter operator: {0} not supported " "for specified field: {1}" .format(filters[6].op, filters[6].field)) # "Return any object that matches marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9 in object_marking_refs" resp = ds.apply_common_filters(stix_objs, [filters[7]]) assert resp[0]['id'] == stix_objs[2]['id'] assert len(resp) == 1 # "Return any object that contains relationship_type in their selectors AND # also has marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed in marking_ref" resp = ds.apply_common_filters(stix_objs, [filters[8], filters[9]]) assert resp[0]['id'] == stix_objs[2]['id'] assert len(resp) == 1 # "Return any object that contains CVE-2014-0160,CVE-2017-6608 in their external_id" resp = ds.apply_common_filters(stix_objs, [filters[10]]) assert resp[0]['id'] == stix_objs[3]['id'] assert len(resp) == 1 # "Return any object that matches created_by_ref identity--00000000-0000-0000-0000-b8e91df99dc9" resp = ds.apply_common_filters(stix_objs, [filters[11]]) assert len(resp) == 1 # "Return any object that matches marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9 in object_marking_refs" (None) resp = ds.apply_common_filters(stix_objs, [filters[12]]) assert len(resp) == 0 # "Return any object that contains description in its selectors" (None) resp = ds.apply_common_filters(stix_objs, [filters[13]]) assert len(resp) == 0 # "Return any object that object that matches CVE in source_name" (None, case sensitive) resp = ds.apply_common_filters(stix_objs, [filters[14]]) assert len(resp) == 0 def test_filters0(ds): # "Return any object modified before 2017-01-28T13:49:53.935Z" resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")]) assert resp[0]['id'] == STIX_OBJS2[1]['id'] assert len(resp) == 2 def test_filters1(ds): # "Return any object modified after 2017-01-28T13:49:53.935Z" resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")]) assert resp[0]['id'] == STIX_OBJS2[0]['id'] assert len(resp) == 1 def test_filters2(ds): # "Return any object modified after or on 2017-01-28T13:49:53.935Z" resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")]) assert resp[0]['id'] == STIX_OBJS2[0]['id'] assert len(resp) == 3 def test_filters3(ds): # "Return any object modified before or on 2017-01-28T13:49:53.935Z" resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")]) assert resp[0]['id'] == STIX_OBJS2[1]['id'] assert len(resp) == 2 def test_filters4(ds): fltr4 = Filter("modified", "?", "2017-01-27T13:49:53.935Z") # Assert unknown operator for _all() raises exception. with pytest.raises(ValueError) as excinfo: ds.apply_common_filters(STIX_OBJS2, [fltr4]) assert str(excinfo.value) == ("Error, filter operator: {0} not supported " "for specified field: {1}").format(fltr4.op, fltr4.field) def test_filters5(ds): # "Return any object whose id is not indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f" resp = ds.apply_common_filters(STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")]) assert resp[0]['id'] == STIX_OBJS2[0]['id'] assert len(resp) == 1 def test_filters6(ds): fltr6 = Filter("id", "?", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f") # Assert unknown operator for _id() raises exception. with pytest.raises(ValueError) as excinfo: ds.apply_common_filters(STIX_OBJS2, [fltr6]) assert str(excinfo.value) == ("Error, filter operator: {0} not supported " "for specified field: {1}").format(fltr6.op, fltr6.field) def test_filters7(ds): fltr7 = Filter("notacommonproperty", "=", "bar") # Assert unknown field raises exception. with pytest.raises(ValueError) as excinfo: ds.apply_common_filters(STIX_OBJS2, [fltr7]) assert str(excinfo.value) == ("Error, field: {0} is not supported for " "filtering on.").format(fltr7.field) def test_deduplicate(ds): unique = ds.deduplicate(STIX_OBJS1) # Only 3 objects are unique # 2 id's vary # 2 modified times vary for a particular id assert len(unique) == 3 ids = [obj['id'] for obj in unique] mods = [obj['modified'] for obj in unique] assert "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f" in ids assert "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" in ids assert "2017-01-27T13:49:53.935Z" in mods assert "2017-01-27T13:49:53.936Z" in mods def test_add_remove_composite_datasource(): cds = CompositeDataSource() ds1 = DataSource() ds2 = DataSource() ds3 = DataSink() cds.add_data_source([ds1, ds2, ds1, ds3]) assert len(cds.get_all_data_sources()) == 2 cds.remove_data_source([ds1.id, ds2.id]) assert len(cds.get_all_data_sources()) == 0 with pytest.raises(ValueError): cds.remove_data_source([ds3.id]) def test_composite_datasource_operations(): BUNDLE1 = dict(id="bundle--%s" % make_id(), objects=STIX_OBJS1, spec_version="2.0", type="bundle") cds = CompositeDataSource() ds1 = MemorySource(stix_data=BUNDLE1) ds2 = MemorySource(stix_data=STIX_OBJS2) cds.add_data_source([ds1, ds2]) indicators = cds.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") # In STIX_OBJS2 changed the 'modified' property to a later time... assert len(indicators) == 2 indicator = cds.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") assert indicator["id"] == "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" assert indicator["modified"] == "2017-01-31T13:49:53.935Z" assert indicator["type"] == "indicator" query = [ Filter("type", "=", "indicator") ] results = cds.query(query) # STIX_OBJS2 has indicator with later time, one with different id, one with # original time in STIX_OBJS1 assert len(results) == 3