fixed indentation
parent
03f9af4b96
commit
6c3a689f02
|
@ -7,138 +7,133 @@ from stix2.sources import taxii
|
|||
#Data Source (common API)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#TAXII
|
||||
def test_ds_taxii():
|
||||
ds = taxii.TAXIIDataSource()
|
||||
assert ds.name == 'TAXII'
|
||||
ds = taxii.TAXIIDataSource()
|
||||
assert ds.name == 'TAXII'
|
||||
|
||||
def test_ds_taxii_name():
|
||||
ds = taxii.TAXIIDataSource(name='My Data Source Name')
|
||||
assert ds.name == "My Data Source Name"
|
||||
ds = taxii.TAXIIDataSource(name='My Data Source Name')
|
||||
assert ds.name == "My Data Source Name"
|
||||
|
||||
def test_ds_params():
|
||||
url = "http://taxii_url.com:5000"
|
||||
creds = {"username":"Wade", "password":"Wilson"}
|
||||
ds = taxii.TAXIIDataSource(api_root=url,
|
||||
auth= creds)
|
||||
assert ds.taxii_info['api_root']['url'] == url
|
||||
assert ds.taxii_info['auth'] == creds
|
||||
url = "http://taxii_url.com:5000"
|
||||
creds = {"username":"Wade", "password":"Wilson"}
|
||||
ds = taxii.TAXIIDataSource(api_root=url,
|
||||
auth= creds)
|
||||
assert ds.taxii_info['api_root']['url'] == url
|
||||
assert ds.taxii_info['auth'] == creds
|
||||
|
||||
def test_parse_taxii_filters():
|
||||
query = [
|
||||
{
|
||||
"field":"added_after",
|
||||
"op":"=",
|
||||
"value":"2016-02-01T00:00:01.000Z"
|
||||
},
|
||||
{
|
||||
"field":"id",
|
||||
"op":"=",
|
||||
"value":"taxii stix object ID"
|
||||
},
|
||||
{
|
||||
"field":"type",
|
||||
"op":"=",
|
||||
"value":"taxii stix object ID"
|
||||
},
|
||||
{
|
||||
"field":"version",
|
||||
"op":"=",
|
||||
"value":"first"
|
||||
},
|
||||
{
|
||||
"field":"created_by_ref",
|
||||
"op":"=",
|
||||
"value":"Bane"
|
||||
}
|
||||
]
|
||||
query = [
|
||||
{
|
||||
"field":"added_after",
|
||||
"op":"=",
|
||||
"value":"2016-02-01T00:00:01.000Z"
|
||||
},
|
||||
{
|
||||
"field":"id",
|
||||
"op":"=",
|
||||
"value":"taxii stix object ID"
|
||||
},
|
||||
{
|
||||
"field":"type",
|
||||
"op":"=",
|
||||
"value":"taxii stix object ID"
|
||||
},
|
||||
{
|
||||
"field":"version",
|
||||
"op":"=",
|
||||
"value":"first"
|
||||
},
|
||||
{
|
||||
"field":"created_by_ref",
|
||||
"op":"=",
|
||||
"value":"Bane"
|
||||
}
|
||||
]
|
||||
|
||||
expected_params = {
|
||||
"added_after":"2016-02-01T00:00:01.000Z",
|
||||
"match[id]":"taxii stix object ID",
|
||||
"match[type]":"taxii stix object ID",
|
||||
"match[version]":"first"
|
||||
}
|
||||
expected_params = {
|
||||
"added_after":"2016-02-01T00:00:01.000Z",
|
||||
"match[id]":"taxii stix object ID",
|
||||
"match[type]":"taxii stix object ID",
|
||||
"match[version]":"first"
|
||||
}
|
||||
|
||||
ds = taxii.TAXIIDataSource()
|
||||
ds = taxii.TAXIIDataSource()
|
||||
|
||||
taxii_filters = ds._parse_taxii_filters(query)
|
||||
taxii_filters = ds._parse_taxii_filters(query)
|
||||
|
||||
assert taxii_filters == expected_params
|
||||
assert taxii_filters == expected_params
|
||||
|
||||
|
||||
def test_add_get_filter():
|
||||
class dummy(object):
|
||||
x = 4
|
||||
def test_add_get_remove_filter():
|
||||
class dummy(object):
|
||||
x = 4
|
||||
|
||||
obj_1 = dummy()
|
||||
obj_1 = dummy()
|
||||
|
||||
#First 3 filters are valid, remaining fields are erroneous in some way
|
||||
filters = [
|
||||
{
|
||||
"field": "type",
|
||||
"op": '=',
|
||||
"value":"malware"
|
||||
},
|
||||
{
|
||||
"field":"id",
|
||||
"op":"!=",
|
||||
"value":"stix object id"
|
||||
},
|
||||
{
|
||||
"field":"labels",
|
||||
"op":"in",
|
||||
"value":["heartbleed","malicious-activity"]
|
||||
},
|
||||
{
|
||||
"field":"revoked",
|
||||
"value":"filter missing \'op\' field"
|
||||
},
|
||||
{
|
||||
"field":"granular_markings",
|
||||
"op":"=",
|
||||
"value":"not supported field - just place holder"
|
||||
},
|
||||
{
|
||||
"field":"modified",
|
||||
"op":"*",
|
||||
"value":"not supported operator - just place holder"
|
||||
},
|
||||
{
|
||||
"field":"created",
|
||||
"op":"=",
|
||||
"value":obj_1
|
||||
}
|
||||
]
|
||||
#First 3 filters are valid, remaining fields are erroneous in some way
|
||||
filters = [
|
||||
{
|
||||
"field": "type",
|
||||
"op": '=',
|
||||
"value":"malware"
|
||||
},
|
||||
{
|
||||
"field":"id",
|
||||
"op":"!=",
|
||||
"value":"stix object id"
|
||||
},
|
||||
{
|
||||
"field":"labels",
|
||||
"op":"in",
|
||||
"value":["heartbleed","malicious-activity"]
|
||||
},
|
||||
{
|
||||
"field":"revoked",
|
||||
"value":"filter missing \'op\' field"
|
||||
},
|
||||
{
|
||||
"field":"granular_markings",
|
||||
"op":"=",
|
||||
"value":"not supported field - just place holder"
|
||||
},
|
||||
{
|
||||
"field":"modified",
|
||||
"op":"*",
|
||||
"value":"not supported operator - just place holder"
|
||||
},
|
||||
{
|
||||
"field":"created",
|
||||
"op":"=",
|
||||
"value":obj_1
|
||||
}
|
||||
]
|
||||
|
||||
expected_errors =[
|
||||
"Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.",
|
||||
"Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported",
|
||||
"Filter operation(from 'op' field) not supported",
|
||||
"Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary"
|
||||
]
|
||||
expected_errors =[
|
||||
"Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.",
|
||||
"Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported",
|
||||
"Filter operation(from 'op' field) not supported",
|
||||
"Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary"
|
||||
]
|
||||
|
||||
ds = taxii.TAXIIDataSource()
|
||||
ids, statuses = ds.add_filter(filters)
|
||||
|
||||
#7 filters should have been successfully added
|
||||
assert len(ids) == 7
|
||||
|
||||
#all filters added to data source
|
||||
for idx, status in enumerate(statuses):
|
||||
assert status['filter'] == filters[idx]
|
||||
|
||||
#proper status warnings were triggered
|
||||
assert statuses[3]['errors'][0]== expected_errors[0]
|
||||
assert statuses[4]['errors'][0]== expected_errors[1]
|
||||
assert statuses[5]['errors'][0]== expected_errors[2]
|
||||
assert statuses[6]['errors'][0]== expected_errors[3]
|
||||
ds = taxii.TAXIIDataSource()
|
||||
#add
|
||||
ids, statuses = ds.add_filter(filters)
|
||||
|
||||
#7 filters should have been successfully added
|
||||
assert len(ids) == 7
|
||||
|
||||
#all filters added to data source
|
||||
for idx, status in enumerate(statuses):
|
||||
assert status['filter'] == filters[idx]
|
||||
|
||||
#proper status warnings were triggered
|
||||
assert statuses[3]['errors'][0] == expected_errors[0]
|
||||
assert statuses[4]['errors'][0] == expected_errors[1]
|
||||
assert statuses[5]['errors'][0] == expected_errors[2]
|
||||
assert statuses[6]['errors'][0] == expected_errors[3]
|
||||
|
||||
|
||||
|
||||
|
@ -166,7 +161,7 @@ def test_data_source_get():
|
|||
|
||||
#filter testing
|
||||
def test_add_filter():
|
||||
ds = file.FileDataSource()
|
||||
ds = file.FileDataSource()
|
||||
|
||||
|
||||
'''
|
Loading…
Reference in New Issue