From e4a226cae6ae3a5e6c1a507942f7bd8e4c52fc16 Mon Sep 17 00:00:00 2001 From: Emmanuelle Vargas-Gonzalez Date: Mon, 23 Apr 2018 14:14:16 -0400 Subject: [PATCH] Finish merging all the changes from @mbastian1135 --- stix2/test/test_datastore.py | 7 + stix2/test/test_datastore_filters.py | 376 ++++++++++++++++++++------- stix2/test/test_datastore_memory.py | 2 +- 3 files changed, 288 insertions(+), 97 deletions(-) diff --git a/stix2/test/test_datastore.py b/stix2/test/test_datastore.py index 0121a85..323365a 100644 --- a/stix2/test/test_datastore.py +++ b/stix2/test/test_datastore.py @@ -108,3 +108,10 @@ def test_composite_datastore_add_data_sources_raises_error(): ind = "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" CompositeDataSource().add_data_sources(ind) assert "DataSource (to be added) is not of type stix2.DataSource. DataSource type is '{}'".format(type(ind)) == str(excinfo.value) + + +def test_composite_datastore_no_datasource(): + cds = CompositeDataSource() + with pytest.raises(AttributeError) as excinfo: + cds.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") + assert 'CompositeDataSource has no data source' in str(excinfo.value) diff --git a/stix2/test/test_datastore_filters.py b/stix2/test/test_datastore_filters.py index 32e91ed..8803e4b 100644 --- a/stix2/test/test_datastore_filters.py +++ b/stix2/test/test_datastore_filters.py @@ -1,6 +1,112 @@ import pytest +from stix2 import parse from stix2.datastore.filters import Filter, apply_common_filters +from stix2.utils import STIXdatetime, parse_into_datetime + + +stix_objs = [ + { + "created": "2017-01-27T13:49:53.997Z", + "description": "\n\nTITLE:\n\tPoison Ivy", + "id": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111", + "labels": [ + "remote-access-trojan" + ], + "modified": "2017-01-27T13:49:53.997Z", + "name": "Poison Ivy", + "type": "malware" + }, + { + "created": "2014-05-08T09:00:00.000Z", + "id": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade", + "labels": [ + "file-hash-watchlist" + ], + "modified": "2014-05-08T09:00:00.000Z", + "name": "File hash for Poison Ivy variant", + "pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']", + "type": "indicator", + "valid_from": "2014-05-08T09:00:00.000000Z" + }, + { + "created": "2014-05-08T09:00:00.000Z", + "granular_markings": [ + { + "marking_ref": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed", + "selectors": [ + "relationship_type" + ] + } + ], + "id": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463", + "modified": "2014-05-08T09:00:00.000Z", + "object_marking_refs": [ + "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" + ], + "relationship_type": "indicates", + "revoked": True, + "source_ref": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade", + "target_ref": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111", + "type": "relationship" + }, + { + "id": "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef", + "created": "2016-02-14T00:00:00.000Z", + "created_by_ref": "identity--00000000-0000-0000-0000-b8e91df99dc9", + "modified": "2016-02-14T00:00:00.000Z", + "type": "vulnerability", + "name": "CVE-2014-0160", + "description": "The (1) TLS...", + "external_references": [ + { + "source_name": "cve", + "external_id": "CVE-2014-0160" + } + ], + "labels": ["heartbleed", "has-logo"] + }, + { + "type": "observed-data", + "id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", + "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + "created": "2016-04-06T19:58:16.000Z", + "modified": "2016-04-06T19:58:16.000Z", + "first_observed": "2015-12-21T19:00:00Z", + "last_observed": "2015-12-21T19:00:00Z", + "number_observed": 1, + "objects": { + "0": { + "type": "file", + "name": "HAL 9000.exe" + } + } + + } +] + + +filters = [ + Filter("type", "!=", "relationship"), + Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"), + Filter("labels", "in", "remote-access-trojan"), + Filter("created", ">", "2015-01-01T01:00:00.000Z"), + Filter("revoked", "=", True), + Filter("revoked", "!=", True), + Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"), + Filter("granular_markings.selectors", "in", "relationship_type"), + Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"), + Filter("external_references.external_id", "in", "CVE-2014-0160,CVE-2017-6608"), + Filter("created_by_ref", "=", "identity--00000000-0000-0000-0000-b8e91df99dc9"), + Filter("object_marking_refs", "=", "marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9"), + Filter("granular_markings.selectors", "in", "description"), + Filter("external_references.source_name", "=", "CVE"), + Filter("objects", "=", {"0": {"type": "file", "name": "HAL 9000.exe"}}) +] + +# same as above objects but converted to real Python STIX2 objects +# to test filters against true Python STIX2 objects +real_stix_objs = [parse(stix_obj) for stix_obj in stix_objs] def test_filter_ops_check(): @@ -43,181 +149,243 @@ def test_filter_type_underscore_check(): assert "Filter for property 'type' cannot have its value 'oh_underscore'" in str(excinfo.value) -def test_apply_common_filters(): - stix_objs = [ - { - "created": "2017-01-27T13:49:53.997Z", - "description": "\n\nTITLE:\n\tPoison Ivy", - "id": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111", - "labels": [ - "remote-access-trojan" - ], - "modified": "2017-01-27T13:49:53.997Z", - "name": "Poison Ivy", - "type": "malware" - }, - { - "created": "2014-05-08T09:00:00.000Z", - "id": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade", - "labels": [ - "file-hash-watchlist" - ], - "modified": "2014-05-08T09:00:00.000Z", - "name": "File hash for Poison Ivy variant", - "pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']", - "type": "indicator", - "valid_from": "2014-05-08T09:00:00.000000Z" - }, - { - "created": "2014-05-08T09:00:00.000Z", - "granular_markings": [ - { - "marking_ref": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed", - "selectors": [ - "relationship_type" - ] - } - ], - "id": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463", - "modified": "2014-05-08T09:00:00.000Z", - "object_marking_refs": [ - "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" - ], - "relationship_type": "indicates", - "revoked": True, - "source_ref": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade", - "target_ref": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111", - "type": "relationship" - }, - { - "id": "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef", - "created": "2016-02-14T00:00:00.000Z", - "created_by_ref": "identity--00000000-0000-0000-0000-b8e91df99dc9", - "modified": "2016-02-14T00:00:00.000Z", - "type": "vulnerability", - "name": "CVE-2014-0160", - "description": "The (1) TLS...", - "external_references": [ - { - "source_name": "cve", - "external_id": "CVE-2014-0160" - } - ], - "labels": ["heartbleed", "has-logo"] - } - ] - - filters = [ - Filter("type", "!=", "relationship"), - Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"), - Filter("labels", "in", "remote-access-trojan"), - Filter("created", ">", "2015-01-01T01:00:00.000Z"), - Filter("revoked", "=", True), - Filter("revoked", "!=", True), - Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"), - Filter("granular_markings.selectors", "in", "relationship_type"), - Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"), - Filter("external_references.external_id", "in", "CVE-2014-0160,CVE-2017-6608"), - Filter("created_by_ref", "=", "identity--00000000-0000-0000-0000-b8e91df99dc9"), - Filter("object_marking_refs", "=", "marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9"), - Filter("granular_markings.selectors", "in", "description"), - Filter("external_references.source_name", "=", "CVE"), - ] - +def test_apply_common_filters0(): # "Return any object whose type is not relationship" resp = list(apply_common_filters(stix_objs, [filters[0]])) ids = [r['id'] for r in resp] assert stix_objs[0]['id'] in ids assert stix_objs[1]['id'] in ids assert stix_objs[3]['id'] in ids - assert len(ids) == 3 + assert len(ids) == 4 + resp = list(apply_common_filters(real_stix_objs, [filters[0]])) + ids = [r.id for r in resp] + assert real_stix_objs[0].id in ids + assert real_stix_objs[1].id in ids + assert real_stix_objs[3].id in ids + assert len(ids) == 4 + + +def test_apply_common_filters1(): # "Return any object that matched id relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463" resp = list(apply_common_filters(stix_objs, [filters[1]])) assert resp[0]['id'] == stix_objs[2]['id'] assert len(resp) == 1 + resp = list(apply_common_filters(real_stix_objs, [filters[1]])) + assert resp[0].id == real_stix_objs[2].id + assert len(resp) == 1 + + +def test_apply_common_filters2(): # "Return any object that contains remote-access-trojan in labels" resp = list(apply_common_filters(stix_objs, [filters[2]])) assert resp[0]['id'] == stix_objs[0]['id'] assert len(resp) == 1 + resp = list(apply_common_filters(real_stix_objs, [filters[2]])) + assert resp[0].id == real_stix_objs[0].id + assert len(resp) == 1 + + +def test_apply_common_filters3(): # "Return any object created after 2015-01-01T01:00:00.000Z" resp = list(apply_common_filters(stix_objs, [filters[3]])) assert resp[0]['id'] == stix_objs[0]['id'] - assert len(resp) == 2 + assert len(resp) == 3 + resp = list(apply_common_filters(real_stix_objs, [filters[3]])) + assert resp[0].id == real_stix_objs[0].id + assert len(resp) == 3 + + +def test_apply_common_filters4(): # "Return any revoked object" resp = list(apply_common_filters(stix_objs, [filters[4]])) assert resp[0]['id'] == stix_objs[2]['id'] assert len(resp) == 1 + resp = list(apply_common_filters(real_stix_objs, [filters[4]])) + assert resp[0].id == real_stix_objs[2].id + assert len(resp) == 1 + + +def test_apply_common_filters5(): # "Return any object whose not revoked" - # Note that if 'revoked' property is not present in object. - # Currently we can't use such an expression to filter for... :( resp = list(apply_common_filters(stix_objs, [filters[5]])) assert len(resp) == 0 + resp = list(apply_common_filters(real_stix_objs, [filters[5]])) + assert len(resp) == 4 + + +def test_apply_common_filters6(): # "Return any object that matches marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9 in object_marking_refs" resp = list(apply_common_filters(stix_objs, [filters[6]])) assert resp[0]['id'] == stix_objs[2]['id'] assert len(resp) == 1 + resp = list(apply_common_filters(real_stix_objs, [filters[6]])) + assert resp[0].id == real_stix_objs[2].id + assert len(resp) == 1 + + +def test_apply_common_filters7(): # "Return any object that contains relationship_type in their selectors AND # also has marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed in marking_ref" resp = list(apply_common_filters(stix_objs, [filters[7], filters[8]])) assert resp[0]['id'] == stix_objs[2]['id'] assert len(resp) == 1 + resp = list(apply_common_filters(real_stix_objs, [filters[7], filters[8]])) + assert resp[0].id == real_stix_objs[2].id + assert len(resp) == 1 + + +def test_apply_common_filters8(): # "Return any object that contains CVE-2014-0160,CVE-2017-6608 in their external_id" resp = list(apply_common_filters(stix_objs, [filters[9]])) assert resp[0]['id'] == stix_objs[3]['id'] assert len(resp) == 1 + resp = list(apply_common_filters(real_stix_objs, [filters[9]])) + assert resp[0].id == real_stix_objs[3].id + assert len(resp) == 1 + + +def test_apply_common_filters9(): # "Return any object that matches created_by_ref identity--00000000-0000-0000-0000-b8e91df99dc9" resp = list(apply_common_filters(stix_objs, [filters[10]])) assert len(resp) == 1 + resp = list(apply_common_filters(real_stix_objs, [filters[10]])) + assert len(resp) == 1 + + +def test_apply_common_filters10(): # "Return any object that matches marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9 in object_marking_refs" (None) resp = list(apply_common_filters(stix_objs, [filters[11]])) assert len(resp) == 0 + resp = list(apply_common_filters(real_stix_objs, [filters[11]])) + assert len(resp) == 0 + + +def test_apply_common_filters11(): # "Return any object that contains description in its selectors" (None) resp = list(apply_common_filters(stix_objs, [filters[12]])) assert len(resp) == 0 - # "Return any object that object that matches CVE in source_name" (None, case sensitive) - resp = list(apply_common_filters(stix_objs, [filters[13]])) + resp = list(apply_common_filters(real_stix_objs, [filters[12]])) assert len(resp) == 0 -def test_filters0(stix_objs2): +def test_apply_common_filters12(): + # "Return any object that matches CVE in source_name" (None, case sensitive) + resp = list(apply_common_filters(stix_objs, [filters[13]])) + assert len(resp) == 0 + + resp = list(apply_common_filters(real_stix_objs, [filters[13]])) + assert len(resp) == 0 + + +def test_apply_common_filters13(): + # Return any object that matches file object in "objects" + resp = list(apply_common_filters(stix_objs, [filters[14]])) + assert resp[0]["id"] == stix_objs[4]["id"] + assert len(resp) == 1 + # important additional check to make sure original File dict was + # not converted to File object. (this was a deep bug found) + assert isinstance(resp[0]["objects"]["0"], dict) + + resp = list(apply_common_filters(real_stix_objs, [filters[14]])) + assert resp[0].id == real_stix_objs[4].id + assert len(resp) == 1 + + +def test_datetime_filter_behavior(): + """if a filter is initialized with its value being a datetime object + OR the STIX object property being filtered on is a datetime object, all + resulting comparisons executed are done on the string representations + of the datetime objects, as the Filter functionality will convert + all datetime objects to there string forms using format_datetim() + + This test makes sure all datetime comparisons are carried out correctly + """ + filter_with_dt_obj = Filter("created", "=", parse_into_datetime("2016-02-14T00:00:00.000Z", "millisecond")) + filter_with_str = Filter("created", "=", "2016-02-14T00:00:00.000Z") + + # check that filter value is converted from datetime to str + assert isinstance(filter_with_dt_obj.value, str) + + # compare datetime string to filter w/ datetime obj + resp = list(apply_common_filters(stix_objs, [filter_with_dt_obj])) + assert len(resp) == 1 + assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef" + + # compare datetime obj to filter w/ datetime obj + resp = list(apply_common_filters(real_stix_objs, [filter_with_dt_obj])) + assert len(resp) == 1 + assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef" + assert isinstance(resp[0].created, STIXdatetime) # make sure original object not altered + + # compare datetime string to filter w/ str + resp = list(apply_common_filters(stix_objs, [filter_with_str])) + assert len(resp) == 1 + assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef" + + # compare datetime obj to filter w/ str + resp = list(apply_common_filters(real_stix_objs, [filter_with_str])) + assert len(resp) == 1 + assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef" + assert isinstance(resp[0].created, STIXdatetime) # make sure original object not altered + + +def test_filters0(stix_objs2, real_stix_objs2): # "Return any object modified before 2017-01-28T13:49:53.935Z" resp = list(apply_common_filters(stix_objs2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")])) assert resp[0]['id'] == stix_objs2[1]['id'] assert len(resp) == 2 + resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", "<", parse_into_datetime("2017-01-28T13:49:53.935Z"))])) + assert resp[0].id == real_stix_objs2[1].id + assert len(resp) == 2 -def test_filters1(stix_objs2): + +def test_filters1(stix_objs2, real_stix_objs2): # "Return any object modified after 2017-01-28T13:49:53.935Z" resp = list(apply_common_filters(stix_objs2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")])) assert resp[0]['id'] == stix_objs2[0]['id'] assert len(resp) == 1 + resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", ">", parse_into_datetime("2017-01-28T13:49:53.935Z"))])) + assert resp[0].id == real_stix_objs2[0].id + assert len(resp) == 1 -def test_filters2(stix_objs2): + +def test_filters2(stix_objs2, real_stix_objs2): # "Return any object modified after or on 2017-01-28T13:49:53.935Z" resp = list(apply_common_filters(stix_objs2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")])) assert resp[0]['id'] == stix_objs2[0]['id'] assert len(resp) == 3 + resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", ">=", parse_into_datetime("2017-01-27T13:49:53.935Z"))])) + assert resp[0].id == real_stix_objs2[0].id + assert len(resp) == 3 -def test_filters3(stix_objs2): + +def test_filters3(stix_objs2, real_stix_objs2): # "Return any object modified before or on 2017-01-28T13:49:53.935Z" resp = list(apply_common_filters(stix_objs2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")])) assert resp[0]['id'] == stix_objs2[1]['id'] assert len(resp) == 2 + # "Return any object modified before or on 2017-01-28T13:49:53.935Z" + fv = Filter("modified", "<=", parse_into_datetime("2017-01-27T13:49:53.935Z")) + resp = list(apply_common_filters(real_stix_objs2, [fv])) + assert resp[0].id == real_stix_objs2[1].id + assert len(resp) == 2 + def test_filters4(): # Assert invalid Filter cannot be created @@ -227,23 +395,31 @@ def test_filters4(): "for specified property: 'modified'") -def test_filters5(stix_objs2): +def test_filters5(stix_objs2, real_stix_objs2): # "Return any object whose id is not indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f" resp = list(apply_common_filters(stix_objs2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")])) assert resp[0]['id'] == stix_objs2[0]['id'] assert len(resp) == 1 + resp = list(apply_common_filters(real_stix_objs2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")])) + assert resp[0].id == real_stix_objs2[0].id + assert len(resp) == 1 -def test_filters6(stix_objs2): + +def test_filters6(stix_objs2, real_stix_objs2): # Test filtering on non-common property resp = list(apply_common_filters(stix_objs2, [Filter("name", "=", "Malicious site hosting downloader")])) assert resp[0]['id'] == stix_objs2[0]['id'] assert len(resp) == 3 + resp = list(apply_common_filters(real_stix_objs2, [Filter("name", "=", "Malicious site hosting downloader")])) + assert resp[0].id == real_stix_objs2[0].id + assert len(resp) == 3 -def test_filters7(stix_objs2): + +def test_filters7(stix_objs2, real_stix_objs2): # Test filtering on embedded property - stix_objects = list(stix_objs2) + [{ + obsvd_data_obj = { "type": "observed-data", "id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", @@ -274,7 +450,15 @@ def test_filters7(stix_objs2): } } } - }] + } + + stix_objects = list(stix_objs2) + [obsvd_data_obj] + real_stix_objects = list(real_stix_objs2) + [parse(obsvd_data_obj)] + resp = list(apply_common_filters(stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")])) assert resp[0]['id'] == stix_objects[3]['id'] assert len(resp) == 1 + + resp = list(apply_common_filters(real_stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")])) + assert resp[0].id == real_stix_objects[3].id + assert len(resp) == 1 diff --git a/stix2/test/test_datastore_memory.py b/stix2/test/test_datastore_memory.py index d341fd2..dcec813 100644 --- a/stix2/test/test_datastore_memory.py +++ b/stix2/test/test_datastore_memory.py @@ -62,7 +62,7 @@ def test_composite_datasource_operations(stix_objs1, stix_objs2): Filter("valid_from", "=", "2017-01-27T13:49:53.935382Z") ] - cds1.filters.update(query2) + cds1.filters.add(query2) results = cds1.query(query1)