Style/lint fixes

stix2.1
Greg Back 2017-05-31 08:58:14 -05:00
parent 6c3a689f02
commit 53ddf32e8c
1 changed files with 69 additions and 82 deletions

View File

@ -1,63 +1,58 @@
import pytest
import requests
from stix2.sources import taxii from stix2.sources import taxii
#Data Source (common API)
#TAXII
def test_ds_taxii(): def test_ds_taxii():
ds = taxii.TAXIIDataSource() ds = taxii.TAXIIDataSource()
assert ds.name == 'TAXII' assert ds.name == 'TAXII'
def test_ds_taxii_name(): def test_ds_taxii_name():
ds = taxii.TAXIIDataSource(name='My Data Source Name') ds = taxii.TAXIIDataSource(name='My Data Source Name')
assert ds.name == "My Data Source Name" assert ds.name == "My Data Source Name"
def test_ds_params(): def test_ds_params():
url = "http://taxii_url.com:5000" url = "http://taxii_url.com:5000"
creds = {"username":"Wade", "password":"Wilson"} creds = {"username": "Wade", "password": "Wilson"}
ds = taxii.TAXIIDataSource(api_root=url, ds = taxii.TAXIIDataSource(api_root=url, auth=creds)
auth= creds)
assert ds.taxii_info['api_root']['url'] == url assert ds.taxii_info['api_root']['url'] == url
assert ds.taxii_info['auth'] == creds assert ds.taxii_info['auth'] == creds
def test_parse_taxii_filters(): def test_parse_taxii_filters():
query = [ query = [
{ {
"field":"added_after", "field": "added_after",
"op":"=", "op": "=",
"value":"2016-02-01T00:00:01.000Z" "value": "2016-02-01T00:00:01.000Z"
}, },
{ {
"field":"id", "field": "id",
"op":"=", "op": "=",
"value":"taxii stix object ID" "value": "taxii stix object ID"
}, },
{ {
"field":"type", "field": "type",
"op":"=", "op": "=",
"value":"taxii stix object ID" "value": "taxii stix object ID"
}, },
{ {
"field":"version", "field": "version",
"op":"=", "op": "=",
"value":"first" "value": "first"
}, },
{ {
"field":"created_by_ref", "field": "created_by_ref",
"op":"=", "op": "=",
"value":"Bane" "value": "Bane"
} }
] ]
expected_params = { expected_params = {
"added_after":"2016-02-01T00:00:01.000Z", "added_after": "2016-02-01T00:00:01.000Z",
"match[id]":"taxii stix object ID", "match[id]": "taxii stix object ID",
"match[type]":"taxii stix object ID", "match[type]": "taxii stix object ID",
"match[version]":"first" "match[version]": "first"
} }
ds = taxii.TAXIIDataSource() ds = taxii.TAXIIDataSource()
@ -73,45 +68,45 @@ def test_add_get_remove_filter():
obj_1 = dummy() obj_1 = dummy()
#First 3 filters are valid, remaining fields are erroneous in some way # First 3 filters are valid, remaining fields are erroneous in some way
filters = [ filters = [
{ {
"field": "type", "field": "type",
"op": '=', "op": '=',
"value":"malware" "value": "malware"
}, },
{ {
"field":"id", "field": "id",
"op":"!=", "op": "!=",
"value":"stix object id" "value": "stix object id"
}, },
{ {
"field":"labels", "field": "labels",
"op":"in", "op": "in",
"value":["heartbleed","malicious-activity"] "value": ["heartbleed", "malicious-activity"]
}, },
{ {
"field":"revoked", "field": "revoked",
"value":"filter missing \'op\' field" "value": "filter missing \'op\' field"
}, },
{ {
"field":"granular_markings", "field": "granular_markings",
"op":"=", "op": "=",
"value":"not supported field - just place holder" "value": "not supported field - just place holder"
}, },
{ {
"field":"modified", "field": "modified",
"op":"*", "op": "*",
"value":"not supported operator - just place holder" "value": "not supported operator - just place holder"
}, },
{ {
"field":"created", "field": "created",
"op":"=", "op": "=",
"value":obj_1 "value": obj_1
} }
] ]
expected_errors =[ expected_errors = [
"Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.", "Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.",
"Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported", "Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported",
"Filter operation(from 'op' field) not supported", "Filter operation(from 'op' field) not supported",
@ -119,49 +114,41 @@ def test_add_get_remove_filter():
] ]
ds = taxii.TAXIIDataSource() ds = taxii.TAXIIDataSource()
#add # add
ids, statuses = ds.add_filter(filters) ids, statuses = ds.add_filter(filters)
#7 filters should have been successfully added # 7 filters should have been successfully added
assert len(ids) == 7 assert len(ids) == 7
#all filters added to data source # all filters added to data source
for idx, status in enumerate(statuses): for idx, status in enumerate(statuses):
assert status['filter'] == filters[idx] assert status['filter'] == filters[idx]
#proper status warnings were triggered # proper status warnings were triggered
assert statuses[3]['errors'][0] == expected_errors[0] assert statuses[3]['errors'][0] == expected_errors[0]
assert statuses[4]['errors'][0] == expected_errors[1] assert statuses[4]['errors'][0] == expected_errors[1]
assert statuses[5]['errors'][0] == expected_errors[2] assert statuses[5]['errors'][0] == expected_errors[2]
assert statuses[6]['errors'][0] == expected_errors[3] assert statuses[6]['errors'][0] == expected_errors[3]
# def test_data_source_file():
#File # ds = file.FileDataSource()
#
''' # assert ds.name == "DataSource"
#
def test_data_source_file(): #
ds = file.FileDataSource() # def test_data_source_name():
# ds = file.FileDataSource(name="My File Data Source")
assert ds.name == "DataSource" #
# assert ds.name == "My File Data Source"
#
def test_data_source_name(): #
ds = file.FileDataSource(name="My File Data Source") # def test_data_source_get():
# ds = file.FileDataSource(name="My File Data Source")
assert ds.name == "My File Data Source" #
# with pytest.raises(NotImplementedError):
# ds.get("foo")
def test_data_source_get(): #
ds = file.FileDataSource(name="My File Data Source") # #filter testing
# def test_add_filter():
with pytest.raises(NotImplementedError): # ds = file.FileDataSource()
ds.get("foo")
#filter testing
def test_add_filter():
ds = file.FileDataSource()
'''