parent
bf9677094f
commit
55e92bc237
|
@ -2,19 +2,19 @@ from stix2.sources import taxii
|
|||
|
||||
|
||||
def test_ds_taxii():
|
||||
ds = taxii.TAXIIDataSource()
|
||||
ds = taxii.TAXIICollectionSource()
|
||||
assert ds.name == 'TAXII'
|
||||
|
||||
|
||||
def test_ds_taxii_name():
|
||||
ds = taxii.TAXIIDataSource(name='My Data Source Name')
|
||||
ds = taxii.TAXIICollectionSource(name='My Data Source Name')
|
||||
assert ds.name == "My Data Source Name"
|
||||
|
||||
|
||||
def test_ds_params():
|
||||
url = "http://taxii_url.com:5000"
|
||||
creds = {"username": "Wade", "password": "Wilson"}
|
||||
ds = taxii.TAXIIDataSource(api_root=url, auth=creds)
|
||||
ds = taxii.TAXIICollectionSource(api_root=url, auth=creds)
|
||||
assert ds.taxii_info['api_root']['url'] == url
|
||||
assert ds.taxii_info['auth'] == creds
|
||||
|
||||
|
@ -55,12 +55,13 @@ def test_parse_taxii_filters():
|
|||
"match[version]": "first"
|
||||
}
|
||||
|
||||
ds = taxii.TAXIIDataSource()
|
||||
ds = taxii.TAXIICollectionSource()
|
||||
|
||||
taxii_filters = ds._parse_taxii_filters(query)
|
||||
|
||||
assert taxii_filters == expected_params
|
||||
|
||||
|
||||
def test_add_get_remove_filter():
|
||||
|
||||
class dummy(object):
|
||||
|
@ -113,7 +114,7 @@ def test_add_get_remove_filter():
|
|||
"Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary"
|
||||
]
|
||||
|
||||
ds = taxii.TAXIIDataSource()
|
||||
ds = taxii.TAXIICollectionSource()
|
||||
# add
|
||||
ids, statuses = ds.add_filter(filters)
|
||||
|
||||
|
@ -130,13 +131,14 @@ def test_add_get_remove_filter():
|
|||
assert statuses[5]['errors'][0] == expected_errors[2]
|
||||
assert statuses[6]['errors'][0] == expected_errors[3]
|
||||
|
||||
#get
|
||||
# get
|
||||
ds_filters = ds.get_filters()
|
||||
|
||||
for idx,flt in enumerate(filters):
|
||||
assert flt['value'] == filters[idx]['value']
|
||||
# TODO: what are we trying to test here?
|
||||
for idx, flt in enumerate(filters):
|
||||
assert flt['value'] == ds_filters[idx]['value']
|
||||
|
||||
#remove
|
||||
# remove
|
||||
ds.remove_filter([ids[3]])
|
||||
ds.remove_filter([ids[4]])
|
||||
ds.remove_filter([ids[5]])
|
||||
|
@ -146,13 +148,14 @@ def test_add_get_remove_filter():
|
|||
|
||||
assert len(rem_filters) == 3
|
||||
|
||||
#check remaining filters
|
||||
# check remaining filters
|
||||
rem_ids = [f['id'] for f in rem_filters]
|
||||
|
||||
#check remaining
|
||||
# check remaining
|
||||
for id_ in rem_ids:
|
||||
assert id_ in ids[:3]
|
||||
|
||||
|
||||
def test_apply_common_filters():
|
||||
stix_objs = [
|
||||
{
|
||||
|
@ -207,7 +210,7 @@ def test_apply_common_filters():
|
|||
}
|
||||
]
|
||||
|
||||
ds = taxii.TAXIIDataSource()
|
||||
ds = taxii.TAXIICollectionSource()
|
||||
|
||||
resp = ds.apply_common_filters(stix_objs, [filters[0]])
|
||||
ids = [r['id'] for r in resp]
|
||||
|
@ -220,6 +223,7 @@ def test_apply_common_filters():
|
|||
resp = ds.apply_common_filters(stix_objs, [filters[2]])
|
||||
assert resp[0]['id'] == stix_objs[0]['id']
|
||||
|
||||
|
||||
def test_deduplicate():
|
||||
stix_objs = [
|
||||
{
|
||||
|
@ -284,12 +288,12 @@ def test_deduplicate():
|
|||
}
|
||||
]
|
||||
|
||||
ds = taxii.TAXIIDataSource()
|
||||
ds = taxii.TAXIICollectionSource()
|
||||
unique = ds.deduplicate(stix_objs)
|
||||
|
||||
#Only 3 objects are unique
|
||||
#2 id's vary
|
||||
#2 modified times vary for a particular id
|
||||
# Only 3 objects are unique
|
||||
# 2 id's vary
|
||||
# 2 modified times vary for a particular id
|
||||
|
||||
assert len(unique) == 3
|
||||
|
||||
|
@ -302,7 +306,6 @@ def test_deduplicate():
|
|||
assert "2017-01-27T13:49:53.936Z" in mods
|
||||
|
||||
|
||||
|
||||
# def test_data_source_file():
|
||||
# ds = file.FileDataSource()
|
||||
#
|
||||
|
|
Loading…
Reference in New Issue