tests for TAXII data source; some bug fixes

stix2.1
= 2017-05-30 16:56:27 -04:00
parent fc1ce6d56d
commit c65055bb0e
3 changed files with 203 additions and 33 deletions

View File

@ -253,20 +253,23 @@ class CompositeDataSource(object):
errors.append("Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.")
break
# check filter field is a supported STIX 2.0 common field
if filter_['field'] not in STIX_COMMON_FIELDS:
allowed = False
errors.append("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported")
if allowed:
#no need for further checks if filter is missing parameters
# check filter field is a supported STIX 2.0 common field
if filter_['field'] not in STIX_COMMON_FIELDS:
allowed = False
errors.append("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported")
# check filter operator is supported
if filter_['op'] not in FILTER_OPS:
allowed = False
errors.append("Filter operation(from 'op' field) not supported")
# check filter operator is supported
if filter_['op'] not in FILTER_OPS:
allowed = False
errors.append("Filter operation(from 'op' field) not supported")
# check filter value type is supported
if type(filter_['value']) not in FILTER_VALUE_TYPES:
allowed = False
errors.append("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary")
# check filter value type is supported
if type(filter_['value']) not in FILTER_VALUE_TYPES:
allowed = False
errors.append("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary")
'''
Filter is added regardless of whether it fits requirements
@ -292,6 +295,7 @@ class CompositeDataSource(object):
status.append({
"status": "added but is not a common filter",
"filter": filter_,
"errors": errors,
"data_source_name": self.name,
"data_source_id": self.id_
})
@ -485,21 +489,23 @@ class DataSource(object):
allowed = False
errors.append("Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.")
break
if allowed:
#no reason for further checks if missing filter parameters
# check filter field is a supported STIX 2.0 common field
if filter_['field'] not in STIX_COMMON_FIELDS:
allowed = False
errors.append("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported")
# check filter field is a supported STIX 2.0 common field
if filter_['field'] not in STIX_COMMON_FIELDS:
allowed = False
errors.append("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported")
# check filter operator is supported
if filter_['op'] not in FILTER_OPS:
allowed = False
errors.append("Filter operation(from 'op' field) not supported")
# check filter operator is supported
if filter_['op'] not in FILTER_OPS:
allowed = False
errors.append("Filter operation(from 'op' field) not supported")
# check filter value type is supported
if type(filter_['value']) not in FILTER_VALUE_TYPES:
allowed = False
errors.append("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary")
# check filter value type is supported
if type(filter_['value']) not in FILTER_VALUE_TYPES:
allowed = False
errors.append("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary")
'''
Filter is added regardless of whether it fits requirements

View File

@ -10,7 +10,9 @@ TODO:
that are found in "self.taxii_info"
'''
TAXII_FILTERS = ['added_after', 'match[id]', 'match[type]', 'match[version]']
TAXII_FILTERS = ['added_after', 'id', 'type', 'version']
test = True
class TAXIIDataSource(DataSource):
@ -20,6 +22,11 @@ class TAXIIDataSource(DataSource):
super(TAXIIDataSource, self).__init__(name=name)
if not api_root:
api_root = "http://localhost:5000"
if not auth:
auth = {"user":"admin", "pass":"taxii"}
self.taxii_info = {
"api_root": {
"url": api_root
@ -27,6 +34,9 @@ class TAXIIDataSource(DataSource):
"auth": auth
}
if test:
return
try:
# check api-root is reachable/exists and grab api collections
coll_url = self.taxii_info['api_root']['url'] + "/collections/"
@ -230,7 +240,11 @@ class TAXIIDataSource(DataSource):
for q in query:
if q['field'] in TAXII_FILTERS:
params[q['field']] = q['value']
if q['field'] == 'added_after':
params[q['field']] = q['value']
else:
taxii_field = 'match[' + q['field'] + ']'
params[taxii_field] = q['value']
return params
def close(self):

View File

@ -1,22 +1,172 @@
import pytest
import requests
import stix2.sources
from stix2.sources import taxii
def test_data_source():
ds = stix2.sources.DataSource()
#Data Source (common API)
#TAXII
def test_ds_taxii():
ds = taxii.TAXIIDataSource()
assert ds.name == 'TAXII'
def test_ds_taxii_name():
ds = taxii.TAXIIDataSource(name='My Data Source Name')
assert ds.name == "My Data Source Name"
def test_ds_params():
url = "http://taxii_url.com:5000"
creds = {"username":"Wade", "password":"Wilson"}
ds = taxii.TAXIIDataSource(api_root=url,
auth= creds)
assert ds.taxii_info['api_root']['url'] == url
assert ds.taxii_info['auth'] == creds
def test_parse_taxii_filters():
query = [
{
"field":"added_after",
"op":"=",
"value":"2016-02-01T00:00:01.000Z"
},
{
"field":"id",
"op":"=",
"value":"taxii stix object ID"
},
{
"field":"type",
"op":"=",
"value":"taxii stix object ID"
},
{
"field":"version",
"op":"=",
"value":"first"
},
{
"field":"created_by_ref",
"op":"=",
"value":"Bane"
}
]
expected_params = {
"added_after":"2016-02-01T00:00:01.000Z",
"match[id]":"taxii stix object ID",
"match[type]":"taxii stix object ID",
"match[version]":"first"
}
ds = taxii.TAXIIDataSource()
taxii_filters = ds._parse_taxii_filters(query)
assert taxii_filters == expected_params
def test_add_get_filter():
class dummy(object):
x = 4
obj_1 = dummy()
#First 3 filters are valid, remaining fields are erroneous in some way
filters = [
{
"field": "type",
"op": '=',
"value":"malware"
},
{
"field":"id",
"op":"!=",
"value":"stix object id"
},
{
"field":"labels",
"op":"in",
"value":["heartbleed","malicious-activity"]
},
{
"field":"revoked",
"value":"filter missing \'op\' field"
},
{
"field":"granular_markings",
"op":"=",
"value":"not supported field - just place holder"
},
{
"field":"modified",
"op":"*",
"value":"not supported operator - just place holder"
},
{
"field":"created",
"op":"=",
"value":obj_1
}
]
expected_errors =[
"Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.",
"Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported",
"Filter operation(from 'op' field) not supported",
"Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary"
]
ds = taxii.TAXIIDataSource()
ids, statuses = ds.add_filter(filters)
#7 filters should have been successfully added
assert len(ids) == 7
#all filters added to data source
for idx, status in enumerate(statuses):
assert status['filter'] == filters[idx]
#proper status warnings were triggered
assert statuses[3]['errors'][0]== expected_errors[0]
assert statuses[4]['errors'][0]== expected_errors[1]
assert statuses[5]['errors'][0]== expected_errors[2]
assert statuses[6]['errors'][0]== expected_errors[3]
#File
'''
def test_data_source_file():
ds = file.FileDataSource()
assert ds.name == "DataSource"
def test_set_data_source_name():
ds = stix2.sources.DataSource(name="My Data Source")
def test_data_source_name():
ds = file.FileDataSource(name="My File Data Source")
assert ds.name == "My Data Source"
assert ds.name == "My File Data Source"
def test_data_source_get():
ds = stix2.sources.DataSource(name="My Data Source")
ds = file.FileDataSource(name="My File Data Source")
with pytest.raises(NotImplementedError):
ds.get("foo")
#filter testing
def test_add_filter():
ds = file.FileDataSource()
'''