From c65055bb0e0b406b16a16bbea40d618f2a117b56 Mon Sep 17 00:00:00 2001 From: = Date: Tue, 30 May 2017 16:56:27 -0400 Subject: [PATCH] tests for TAXII data source; some bug fixes --- stix2/sources/__init__.py | 54 ++++++----- stix2/sources/taxii.py | 18 +++- stix2/test/test_data_sources.py | 164 ++++++++++++++++++++++++++++++-- 3 files changed, 203 insertions(+), 33 deletions(-) diff --git a/stix2/sources/__init__.py b/stix2/sources/__init__.py index 4c6a36e..a668830 100644 --- a/stix2/sources/__init__.py +++ b/stix2/sources/__init__.py @@ -253,20 +253,23 @@ class CompositeDataSource(object): errors.append("Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.") break - # check filter field is a supported STIX 2.0 common field - if filter_['field'] not in STIX_COMMON_FIELDS: - allowed = False - errors.append("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported") + if allowed: + #no need for further checks if filter is missing parameters + + # check filter field is a supported STIX 2.0 common field + if filter_['field'] not in STIX_COMMON_FIELDS: + allowed = False + errors.append("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported") - # check filter operator is supported - if filter_['op'] not in FILTER_OPS: - allowed = False - errors.append("Filter operation(from 'op' field) not supported") + # check filter operator is supported + if filter_['op'] not in FILTER_OPS: + allowed = False + errors.append("Filter operation(from 'op' field) not supported") - # check filter value type is supported - if type(filter_['value']) not in FILTER_VALUE_TYPES: - allowed = False - errors.append("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary") + # check filter value type is supported + if type(filter_['value']) not in FILTER_VALUE_TYPES: + allowed = False + errors.append("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary") ''' Filter is added regardless of whether it fits requirements @@ -292,6 +295,7 @@ class CompositeDataSource(object): status.append({ "status": "added but is not a common filter", "filter": filter_, + "errors": errors, "data_source_name": self.name, "data_source_id": self.id_ }) @@ -485,21 +489,23 @@ class DataSource(object): allowed = False errors.append("Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.") break + if allowed: + #no reason for further checks if missing filter parameters - # check filter field is a supported STIX 2.0 common field - if filter_['field'] not in STIX_COMMON_FIELDS: - allowed = False - errors.append("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported") + # check filter field is a supported STIX 2.0 common field + if filter_['field'] not in STIX_COMMON_FIELDS: + allowed = False + errors.append("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported") - # check filter operator is supported - if filter_['op'] not in FILTER_OPS: - allowed = False - errors.append("Filter operation(from 'op' field) not supported") + # check filter operator is supported + if filter_['op'] not in FILTER_OPS: + allowed = False + errors.append("Filter operation(from 'op' field) not supported") - # check filter value type is supported - if type(filter_['value']) not in FILTER_VALUE_TYPES: - allowed = False - errors.append("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary") + # check filter value type is supported + if type(filter_['value']) not in FILTER_VALUE_TYPES: + allowed = False + errors.append("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary") ''' Filter is added regardless of whether it fits requirements diff --git a/stix2/sources/taxii.py b/stix2/sources/taxii.py index 805c5ae..7cb0398 100644 --- a/stix2/sources/taxii.py +++ b/stix2/sources/taxii.py @@ -10,7 +10,9 @@ TODO: that are found in "self.taxii_info" ''' -TAXII_FILTERS = ['added_after', 'match[id]', 'match[type]', 'match[version]'] +TAXII_FILTERS = ['added_after', 'id', 'type', 'version'] + +test = True class TAXIIDataSource(DataSource): @@ -20,6 +22,11 @@ class TAXIIDataSource(DataSource): super(TAXIIDataSource, self).__init__(name=name) + if not api_root: + api_root = "http://localhost:5000" + if not auth: + auth = {"user":"admin", "pass":"taxii"} + self.taxii_info = { "api_root": { "url": api_root @@ -27,6 +34,9 @@ class TAXIIDataSource(DataSource): "auth": auth } + if test: + return + try: # check api-root is reachable/exists and grab api collections coll_url = self.taxii_info['api_root']['url'] + "/collections/" @@ -230,7 +240,11 @@ class TAXIIDataSource(DataSource): for q in query: if q['field'] in TAXII_FILTERS: - params[q['field']] = q['value'] + if q['field'] == 'added_after': + params[q['field']] = q['value'] + else: + taxii_field = 'match[' + q['field'] + ']' + params[taxii_field] = q['value'] return params def close(self): diff --git a/stix2/test/test_data_sources.py b/stix2/test/test_data_sources.py index ab95d20..373c6bb 100644 --- a/stix2/test/test_data_sources.py +++ b/stix2/test/test_data_sources.py @@ -1,22 +1,172 @@ import pytest +import requests -import stix2.sources +from stix2.sources import taxii -def test_data_source(): - ds = stix2.sources.DataSource() +#Data Source (common API) + + + + + +#TAXII +def test_ds_taxii(): + ds = taxii.TAXIIDataSource() + assert ds.name == 'TAXII' + +def test_ds_taxii_name(): + ds = taxii.TAXIIDataSource(name='My Data Source Name') + assert ds.name == "My Data Source Name" + +def test_ds_params(): + url = "http://taxii_url.com:5000" + creds = {"username":"Wade", "password":"Wilson"} + ds = taxii.TAXIIDataSource(api_root=url, + auth= creds) + assert ds.taxii_info['api_root']['url'] == url + assert ds.taxii_info['auth'] == creds + +def test_parse_taxii_filters(): + query = [ + { + "field":"added_after", + "op":"=", + "value":"2016-02-01T00:00:01.000Z" + }, + { + "field":"id", + "op":"=", + "value":"taxii stix object ID" + }, + { + "field":"type", + "op":"=", + "value":"taxii stix object ID" + }, + { + "field":"version", + "op":"=", + "value":"first" + }, + { + "field":"created_by_ref", + "op":"=", + "value":"Bane" + } + ] + + expected_params = { + "added_after":"2016-02-01T00:00:01.000Z", + "match[id]":"taxii stix object ID", + "match[type]":"taxii stix object ID", + "match[version]":"first" + } + + ds = taxii.TAXIIDataSource() + + taxii_filters = ds._parse_taxii_filters(query) + + assert taxii_filters == expected_params + + +def test_add_get_filter(): + class dummy(object): + x = 4 + + obj_1 = dummy() + + #First 3 filters are valid, remaining fields are erroneous in some way + filters = [ + { + "field": "type", + "op": '=', + "value":"malware" + }, + { + "field":"id", + "op":"!=", + "value":"stix object id" + }, + { + "field":"labels", + "op":"in", + "value":["heartbleed","malicious-activity"] + }, + { + "field":"revoked", + "value":"filter missing \'op\' field" + }, + { + "field":"granular_markings", + "op":"=", + "value":"not supported field - just place holder" + }, + { + "field":"modified", + "op":"*", + "value":"not supported operator - just place holder" + }, + { + "field":"created", + "op":"=", + "value":obj_1 + } + ] + + expected_errors =[ + "Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.", + "Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported", + "Filter operation(from 'op' field) not supported", + "Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary" + ] + + ds = taxii.TAXIIDataSource() + ids, statuses = ds.add_filter(filters) + + #7 filters should have been successfully added + assert len(ids) == 7 + + #all filters added to data source + for idx, status in enumerate(statuses): + assert status['filter'] == filters[idx] + + #proper status warnings were triggered + assert statuses[3]['errors'][0]== expected_errors[0] + assert statuses[4]['errors'][0]== expected_errors[1] + assert statuses[5]['errors'][0]== expected_errors[2] + assert statuses[6]['errors'][0]== expected_errors[3] + + + + + + +#File + +''' + +def test_data_source_file(): + ds = file.FileDataSource() assert ds.name == "DataSource" -def test_set_data_source_name(): - ds = stix2.sources.DataSource(name="My Data Source") +def test_data_source_name(): + ds = file.FileDataSource(name="My File Data Source") - assert ds.name == "My Data Source" + assert ds.name == "My File Data Source" def test_data_source_get(): - ds = stix2.sources.DataSource(name="My Data Source") + ds = file.FileDataSource(name="My File Data Source") with pytest.raises(NotImplementedError): ds.get("foo") + +#filter testing +def test_add_filter(): + ds = file.FileDataSource() + + +''' \ No newline at end of file