Convert filters to be NamedTuples

stix2.1
Greg Back 2017-08-09 18:49:06 +00:00
parent a4ead4f6e7
commit 87f7503c0a
2 changed files with 67 additions and 288 deletions

View File

@ -14,15 +14,28 @@ NOTE: add_filter(), remove_filter(), deduplicate() - if these functions remain
make those functions an interface to inherit?
"""
import collections
import copy
import uuid
from six import iteritems
class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])):
__slots__ = ()
def __new__(cls, field, op, value):
# If value is a list, convert it to a tuple so it is hashable.
if isinstance(value, list):
value = tuple(value)
self = super(Filter, cls).__new__(cls, field, op, value)
return self
def make_id():
return str(uuid.uuid4())
# Currently, only STIX 2.0 common SDO fields (that are not compex objects)
# are supported for filtering on
STIX_COMMON_FIELDS = [
@ -44,10 +57,6 @@ STIX_COMMON_FIELDS = [
"granular_markings"
]
# Required fields in filter(dict)
FILTER_FIELDS = ['field', 'op', 'value']
# Supported filter operations
FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
@ -162,8 +171,7 @@ class DataSource(object):
def __init__(self, name="DataSource"):
self.name = name
self.id = make_id()
self.filters = {}
self.filter_allowed = {}
self.filters = set()
def get(self, stix_id, _composite_filters=None):
"""
@ -230,114 +238,32 @@ class DataSource(object):
raise NotImplementedError()
def add_filter(self, filters):
"""add/attach a filter to the Data Source instance
def add_filters(self, filters):
"""Add multiple filters to the DataSource.
Args:
filters (list): list of filters (dict) to add to the Data Source
Returns:
status (list): list of status/error messages
filter (list): list of filters (dict) to add to the Data Source.
"""
for filter in filters:
self.add_filter(filter)
status = []
errors = []
ids = []
allowed = True
def add_filter(self, filter):
"""Add a filter."""
# check filter field is a supported STIX 2.0 common field
if filter.field not in STIX_COMMON_FIELDS:
raise ValueError("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported")
for filter_ in filters:
# check required filter components ("field", "op", "value") exist
for field in FILTER_FIELDS:
if field not in filter_.keys():
allowed = False
errors.append("Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.")
break
# check filter operator is supported
if filter.op not in FILTER_OPS:
raise ValueError("Filter operation(from 'op' field) not supported")
if allowed:
# no need for further checks if filter is missing parameters
# check filter value type is supported
if type(filter.value) not in FILTER_VALUE_TYPES:
raise ValueError("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary")
# check filter field is a supported STIX 2.0 common field
if filter_['field'] not in STIX_COMMON_FIELDS:
allowed = False
errors.append("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported")
self.filters.add(filter)
# check filter operator is supported
if filter_['op'] not in FILTER_OPS:
allowed = False
errors.append("Filter operation(from 'op' field) not supported")
# check filter value type is supported
if type(filter_['value']) not in FILTER_VALUE_TYPES:
allowed = False
errors.append("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary")
# Filter is added regardless of whether it fits requirements
# to be a common filter. This is done because some filters
# may be added and used by third party Data Sources, where the
# filtering may be conducted within those plugins, just not here
id_ = make_id()
filter_['id'] = id_
self.filters[id_] = filter_
ids.append(id_)
if allowed:
self.filter_allowed[id_] = True
status.append({
"status": "added as a common filter",
"filter": filter_,
"data_source_name": self.name,
"data_source_id": self.id,
})
else:
self.filter_allowed[id_] = False
status.append({
"status": "added but is not a common filter",
"filter": filter_,
"errors": copy.deepcopy(errors),
"data_source_name": self.name,
"data_source_id": self.id,
})
del errors[:]
allowed = True
return ids, status
def remove_filter(self, filter_ids):
"""remove/detach a filter from the Data Source instance
Args:
filter_ids (list): list of filter ids to dettach/remove
from Data Source
Returns:
"""
for filter_id in filter_ids:
try:
if filter_id in self.filters:
del self.filters[filter_id]
del self.filter_allowed[filter_id]
except KeyError:
# filter 'id' not found list of filters attached to Data Source
pass
return
def get_filters(self):
"""return copy of all filters currently attached to Data Source
TODO: make this a property?
Returns:
(list): a copy of all the filters(dict) which are attached
to Data Source
"""
return copy.deepcopy(list(self.filters.values()))
# TODO: Do we need a remove_filter function?
def apply_common_filters(self, stix_objs, query):
"""evaluates filters against a set of STIX 2.0 objects
@ -599,114 +525,6 @@ class CompositeDataSource(object):
"""
return copy.deepcopy(self.data_sources.values())
def add_filter(self, filters):
"""add/attach a filter to the Composite Data Source instance
Args:
filters (list): list of filters (dict) to add to the Data Source
Returns:
status (list): list of status/error messages
"""
status = []
errors = []
ids = []
allowed = True
for filter_ in filters:
# check required filter components ("field", "op", "value") exist
for field in FILTER_FIELDS:
if field not in filter_.keys():
allowed = False
errors.append("Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.")
break
if allowed:
# no need for further checks if filter is missing parameters
# check filter field is a supported STIX 2.0 common field
if filter_['field'] not in STIX_COMMON_FIELDS:
allowed = False
errors.append("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported")
# check filter operator is supported
if filter_['op'] not in FILTER_OPS:
allowed = False
errors.append("Filter operation(from 'op' field) not supported")
# check filter value type is supported
if type(filter_['value']) not in FILTER_VALUE_TYPES:
allowed = False
errors.append("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary")
# Filter is added regardless of whether it fits requirements
# to be a common filter. This is done because some filters
# may be added and used by third party Data Sources, where the
# filtering may be conducted within those plugins, just not here
id_ = make_id()
filter_['id'] = id_
self.filters['id_'] = filter_
ids.append(id_)
if allowed:
self.filter_allowed[id_] = True
status.append({
"status": "added as a common filter",
"filter": filter_,
"data_source_name": self.name,
"data_source_id": self.id
})
else:
self.filter_allowed[id_] = False
status.append({
"status": "added but is not a common filter",
"filter": filter_,
"errors": errors,
"data_source_name": self.name,
"data_source_id": self.id
})
del errors[:]
allowed = True
return ids, status
def remove_filter(self, filter_ids):
"""Remove/detach a filter from the Data Source instance
Args:
filter_ids (list): list of filter id's (which are strings)
dettach from the Composite Data Source
Returns:
"""
for filter_id in filter_ids:
try:
if filter_id in self.filters:
del self.filters[filter_id]
del self.filter_allowed[filter_id]
except KeyError:
# filter id not found in list of filters
# attached to the Composite Data Source
pass
return
@property
def filters(self):
"""return filters attached to Composite Data Source
Returns:
(list): the list of filters currently attached to the Data Source
"""
return copy.deepcopy(list(self.filters.values()))
def deduplicate(self, stix_obj_list):
"""deduplicate a list fo STIX objects to a unique set

View File

@ -1,7 +1,7 @@
import pytest
from taxii2_client import Collection
from stix2.sources import DataSource, taxii
from stix2.sources import DataSource, Filter, taxii
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
@ -74,94 +74,55 @@ def test_parse_taxii_filters():
assert taxii_filters == expected_params
@pytest.skip
def test_add_get_remove_filter():
# First 3 filters are valid, remaining fields are erroneous in some way
filters = [
{
"field": "type",
"op": '=',
"value": "malware"
},
{
"field": "id",
"op": "!=",
"value": "stix object id"
},
{
"field": "labels",
"op": "in",
"value": ["heartbleed", "malicious-activity"]
},
{
"field": "revoked",
"value": "filter missing \'op\' field"
},
{
"field": "description",
"op": "=",
"value": "not supported field - just place holder"
},
{
"field": "modified",
"op": "*",
"value": "not supported operator - just place holder"
},
{
"field": "created",
"op": "=",
"value": set(),
}
valid_filters = [
Filter('type', '=', 'malware'),
Filter('id', '!=', 'stix object id'),
Filter('labels', 'in', ["heartbleed", "malicious-activity"]),
]
expected_errors = [
"Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.",
"Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported",
"Filter operation(from 'op' field) not supported",
"Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary"
invalid_filters = [
Filter('description', '=', 'not supported field - just place holder'),
Filter('modified', '*', 'not supported operator - just place holder'),
Filter('created', '=', object()),
]
ds = DataSource()
# add
ids, statuses = ds.add_filter(filters)
# 7 filters should have been successfully added
assert len(ids) == 7
assert len(ds.filters) == 0
# all filters added to data source
for idx, status in enumerate(statuses):
assert status['filter'] == filters[idx]
ds.add_filter(valid_filters[0])
assert len(ds.filters) == 1
# proper status warnings were triggered
assert statuses[3]['errors'][0] == expected_errors[0]
assert statuses[4]['errors'][0] == expected_errors[1]
assert statuses[5]['errors'][0] == expected_errors[2]
assert statuses[6]['errors'][0] == expected_errors[3]
# Addin the same filter again will have no effect since `filters` uses a set
ds.add_filter(valid_filters[0])
assert len(ds.filters) == 1
# get
ds_filters = ds.get_filters()
ds.add_filter(valid_filters[1])
assert len(ds.filters) == 2
ds.add_filter(valid_filters[2])
assert len(ds.filters) == 3
# TODO: what are we trying to test here?
for idx, flt in enumerate(filters):
assert flt['value'] == ds_filters[idx]['value']
# TODO: make better error messages
with pytest.raises(ValueError) as excinfo:
ds.add_filter(invalid_filters[0])
assert str(excinfo.value) == "Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported"
with pytest.raises(ValueError) as excinfo:
ds.add_filter(invalid_filters[1])
assert str(excinfo.value) == "Filter operation(from 'op' field) not supported"
with pytest.raises(ValueError) as excinfo:
ds.add_filter(invalid_filters[2])
assert str(excinfo.value) == "Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary"
assert set(valid_filters) == ds.filters
# remove
ds.remove_filter([ids[3]])
ds.remove_filter([ids[4]])
ds.remove_filter([ids[5]])
ds.remove_filter([ids[6]])
ds.filters.remove(valid_filters[0])
rem_filters = ds.get_filters()
assert len(rem_filters) == 3
# check remaining filters
rem_ids = [f['id'] for f in rem_filters]
# check remaining
for id_ in rem_ids:
assert id_ in ids[:3]
assert len(ds.filters) == 2
def test_apply_common_filters():