Merge pull request #41 from oasis-open/namedtuple-filters Use Namedtuples for Filters.

stix2.1
Emmanuelle Vargas-Gonzalez 2017-08-14 15:01:07 -04:00
commit 226a41e0ff
4 changed files with 116 additions and 360 deletions

View File

@ -16,12 +16,24 @@ Notes:
"""
import collections
import copy
import uuid
from six import iteritems
class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])):
__slots__ = ()
def __new__(cls, field, op, value):
# If value is a list, convert it to a tuple so it is hashable.
if isinstance(value, list):
value = tuple(value)
self = super(Filter, cls).__new__(cls, field, op, value)
return self
def make_id():
return str(uuid.uuid4())
@ -47,10 +59,6 @@ STIX_COMMON_FIELDS = [
"granular_markings"
]
# Required fields in filter(dict)
FILTER_FIELDS = ['field', 'op', 'value']
# Supported filter operations
FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
@ -174,8 +182,7 @@ class DataSource(object):
def __init__(self, name="DataSource"):
self.name = name
self.id = make_id()
self.filters = {}
self.filter_allowed = {}
self.filters = set()
def get(self, stix_id, _composite_filters=None):
"""
@ -239,110 +246,32 @@ class DataSource(object):
"""
raise NotImplementedError()
def add_filter(self, filters):
"""Add/attach a filter to the Data Source instance
def add_filters(self, filters):
"""Add multiple filters to the DataSource.
Args:
filters (list): list of filters (dict) to add to the Data Source
Returns:
status (list): list of status/error messages
filter (list): list of filters (dict) to add to the Data Source.
"""
status = []
errors = []
ids = []
allowed = True
for filter in filters:
self.add_filter(filter)
for filter_ in filters:
# check required filter components ('field', 'op', 'value') exist
for field in FILTER_FIELDS:
if field not in filter_.keys():
allowed = False
errors.append("Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.")
break
def add_filter(self, filter):
"""Add a filter."""
# check filter field is a supported STIX 2.0 common field
if filter.field not in STIX_COMMON_FIELDS:
raise ValueError("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported")
if allowed:
# no need for further checks if filter is missing parameters
# check filter operator is supported
if filter.op not in FILTER_OPS:
raise ValueError("Filter operation(from 'op' field) not supported")
# check filter field is a supported STIX 2.0 common field
if filter_['field'] not in STIX_COMMON_FIELDS:
allowed = False
errors.append("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported")
# check filter value type is supported
if type(filter.value) not in FILTER_VALUE_TYPES:
raise ValueError("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary")
# check filter operator is supported
if filter_['op'] not in FILTER_OPS:
allowed = False
errors.append("Filter operation(from 'op' field) not supported")
self.filters.add(filter)
# check filter value type is supported
if type(filter_['value']) not in FILTER_VALUE_TYPES:
allowed = False
errors.append("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary")
# Filter is added regardless of whether it fits requirements
# to be a common filter. This is done because some filters
# may be added and used by third party Data Sources, where the
# filtering may be conducted within those plugins, just not here
id_ = make_id()
filter_['id'] = id_
self.filters[id_] = filter_
ids.append(id_)
if allowed:
self.filter_allowed[id_] = True
status.append({
"status": "added as a common filter",
"filter": filter_,
"data_source_name": self.name,
"data_source_id": self.id,
})
else:
self.filter_allowed[id_] = False
status.append({
"status": "added but is not a common filter",
"filter": filter_,
"errors": copy.deepcopy(errors),
"data_source_name": self.name,
"data_source_id": self.id,
})
del errors[:]
allowed = True
return ids, status
def remove_filter(self, filter_ids):
"""Remove/detach a filter from the Data Source instance
Args:
filter_ids (list): list of filter ids to detach/remove
from Data Source.
"""
for filter_id in filter_ids:
try:
if filter_id in self.filters:
del self.filters[filter_id]
del self.filter_allowed[filter_id]
except KeyError:
# filter 'id' not found list of filters attached to Data Source
pass
return
def get_filters(self):
"""Return copy of all filters currently attached to Data Source
TODO: make this a property?
Returns:
(list): a copy of all the filters(dict) which are attached
to Data Source
"""
return copy.deepcopy(list(self.filters.values()))
# TODO: Do we need a remove_filter function?
def apply_common_filters(self, stix_objs, query):
"""Evaluates filters against a set of STIX 2.0 objects
@ -367,17 +296,17 @@ class DataSource(object):
# skip filter as filter was identified (when added) as
# not a common filter
if 'id' in filter_ and self.filter_allowed[filter_['id']] is False:
if filter_.field not in STIX_COMMON_FIELDS:
continue
# check filter "field" is in STIX object - if cant be applied
# due to STIX object, STIX object is discarded (i.e. did not
# make it through the filter)
if filter_['field'] not in stix_obj.keys():
if filter_.field not in stix_obj.keys():
clean = False
break
try:
match = getattr(STIXCommonPropertyFilters, filter_['field'])(filter_, stix_obj)
match = getattr(STIXCommonPropertyFilters, filter_.field)(filter_, stix_obj)
if not match:
clean = False
break
@ -600,109 +529,6 @@ class CompositeDataSource(object):
"""
return copy.deepcopy(self.data_sources.values())
def add_filter(self, filters):
"""Add/attach a filter to the Composite Data Source instance
Args:
filters (list): list of filters (dict) to add to the Data Source
Returns:
status (list): list of status/error messages
"""
status = []
errors = []
ids = []
allowed = True
for filter_ in filters:
# check required filter components ("field", "op", "value") exist
for field in FILTER_FIELDS:
if field not in filter_.keys():
allowed = False
errors.append("Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.")
break
if allowed:
# no need for further checks if filter is missing parameters
# check filter field is a supported STIX 2.0 common field
if filter_['field'] not in STIX_COMMON_FIELDS:
allowed = False
errors.append("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported")
# check filter operator is supported
if filter_['op'] not in FILTER_OPS:
allowed = False
errors.append("Filter operation(from 'op' field) not supported")
# check filter value type is supported
if type(filter_['value']) not in FILTER_VALUE_TYPES:
allowed = False
errors.append("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary")
# Filter is added regardless of whether it fits requirements
# to be a common filter. This is done because some filters
# may be added and used by third party Data Sources, where the
# filtering may be conducted within those plugins, just not here
id_ = make_id()
filter_['id'] = id_
self.filters['id_'] = filter_
ids.append(id_)
if allowed:
self.filter_allowed[id_] = True
status.append({
"status": "added as a common filter",
"filter": filter_,
"data_source_name": self.name,
"data_source_id": self.id
})
else:
self.filter_allowed[id_] = False
status.append({
"status": "added but is not a common filter",
"filter": filter_,
"errors": errors,
"data_source_name": self.name,
"data_source_id": self.id
})
del errors[:]
allowed = True
return ids, status
def remove_filter(self, filter_ids):
"""Remove/detach a filter from the Data Source instance
Args:
filter_ids (list): list of filter id's (which are strings)
detach from the Composite Data Source.
"""
for filter_id in filter_ids:
try:
if filter_id in self.filters:
del self.filters[filter_id]
del self.filter_allowed[filter_id]
except KeyError:
# filter id not found in list of filters
# attached to the Composite Data Source
pass
return
@property
def filters(self):
"""Return filters attached to Composite Data Source
Returns:
(list): the list of filters currently attached to the Data Source
"""
return copy.deepcopy(list(self.filters.values()))
def deduplicate(self, stix_obj_list):
"""Deduplicate a list of STIX objects to a unique set
@ -732,39 +558,39 @@ class STIXCommonPropertyFilters(object):
@classmethod
def _all(cls, filter_, stix_obj_field):
"""all filter operations (for filters whose value type can be applied to any operation type)"""
if filter_["op"] == "=":
return stix_obj_field == filter_["value"]
elif filter_["op"] == "!=":
return stix_obj_field != filter_["value"]
elif filter_["op"] == "in":
return stix_obj_field in filter_["value"]
elif filter_["op"] == ">":
return stix_obj_field > filter_["value"]
elif filter_["op"] == "<":
return stix_obj_field < filter_["value"]
elif filter_["op"] == ">=":
return stix_obj_field >= filter_["value"]
elif filter_["op"] == "<=":
return stix_obj_field <= filter_["value"]
if filter_.op == "=":
return stix_obj_field == filter_.value
elif filter_.op == "!=":
return stix_obj_field != filter_.value
elif filter_.op == "in":
return stix_obj_field in filter_.value
elif filter_.op == ">":
return stix_obj_field > filter_.value
elif filter_.op == "<":
return stix_obj_field < filter_.value
elif filter_.op == ">=":
return stix_obj_field >= filter_.value
elif filter_.op == "<=":
return stix_obj_field <= filter_.value
else:
return -1
@classmethod
def _id(cls, filter_, stix_obj_id):
"""base filter types"""
if filter_["op"] == "=":
return stix_obj_id == filter_["value"]
elif filter_["op"] == "!=":
return stix_obj_id != filter_["value"]
if filter_.op == "=":
return stix_obj_id == filter_.value
elif filter_.op == "!=":
return stix_obj_id != filter_.value
else:
return -1
@classmethod
def _boolean(cls, filter_, stix_obj_field):
if filter_["op"] == "=":
return stix_obj_field == filter_["value"]
elif filter_["op"] == "!=":
return stix_obj_field != filter_["value"]
if filter_.op == "=":
return stix_obj_field == filter_.value
elif filter_.op == "!=":
return stix_obj_field != filter_.value
else:
return -1
@ -800,7 +626,7 @@ class STIXCommonPropertyFilters(object):
"""
for er in stix_obj["external_references"]:
# grab er property name from filter field
filter_field = filter_["field"].split(".")[1]
filter_field = filter_.field.split(".")[1]
r = cls._string(filter_, er[filter_field])
if r:
return r
@ -818,7 +644,7 @@ class STIXCommonPropertyFilters(object):
"""
for gm in stix_obj["granular_markings"]:
# grab gm property name from filter field
filter_field = filter_["field"].split(".")[1]
filter_field = filter_.field.split(".")[1]
if filter_field == "marking_ref":
return cls._id(filter_, gm[filter_field])

View File

@ -138,13 +138,13 @@ class FileSystemSource(DataSource):
# the corresponding subdirectories as well
include_paths = []
declude_paths = []
if "type" in [filter_["field"] for filter_ in file_filters]:
if "type" in [filter_.field for filter_ in file_filters]:
for filter_ in file_filters:
if filter_["field"] == "type":
if filter_["op"] == "=":
include_paths.append(os.path.join(self.stix_dir, filter_["value"]))
elif filter_["op"] == "!=":
declude_paths.append(os.path.join(self.stix_dir, filter_["value"]))
if filter_.field == "type":
if filter_.op == "=":
include_paths.append(os.path.join(self.stix_dir, filter_.value))
elif filter_.op == "!=":
declude_paths.append(os.path.join(self.stix_dir, filter_.value))
else:
# have to walk entire STIX directory
include_paths.append(self.stix_dir)
@ -167,10 +167,10 @@ class FileSystemSource(DataSource):
# grab stix object ID as well - if present in filters, as
# may forgo the loading of STIX content into memory
if "id" in [filter_["field"] for filter_ in file_filters]:
if "id" in [filter_.field for filter_ in file_filters]:
for filter_ in file_filters:
if filter_["field"] == "id" and filter_["op"] == "=":
id_ = filter_["value"]
if filter_.field == "id" and filter_.op == "=":
id_ = filter_.value
break
else:
id_ = None
@ -200,6 +200,6 @@ class FileSystemSource(DataSource):
"""
file_filters = []
for filter_ in query:
if filter_["field"] == "id" or filter_["field"] == "type":
if filter_.field == "id" or filter_.field == "type":
file_filters.append(filter_)
return file_filters

View File

@ -156,10 +156,10 @@ class TAXIICollectionSource(DataSource):
params = {}
for filter_ in query:
if filter_["field"] in TAXII_FILTERS:
if filter_["field"] == "added_after":
params[filter_["field"]] = filter_["value"]
if filter_.field in TAXII_FILTERS:
if filter_.field == "added_after":
params[filter_.field] = filter_.value
else:
taxii_field = "match[%s]" % filter_["field"]
params[taxii_field] = filter_["value"]
taxii_field = "match[%s]" % filter_.field
params[taxii_field] = filter_.value
return params

View File

@ -1,7 +1,7 @@
import pytest
from taxii2_client import Collection
from stix2.sources import DataSource, taxii
from stix2.sources import DataSource, Filter, taxii
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
@ -33,31 +33,11 @@ def test_ds_taxii_name(collection):
def test_parse_taxii_filters():
query = [
{
"field": "added_after",
"op": "=",
"value": "2016-02-01T00:00:01.000Z"
},
{
"field": "id",
"op": "=",
"value": "taxii stix object ID"
},
{
"field": "type",
"op": "=",
"value": "taxii stix object ID"
},
{
"field": "version",
"op": "=",
"value": "first"
},
{
"field": "created_by_ref",
"op": "=",
"value": "Bane"
}
Filter("added_after", "=", "2016-02-01T00:00:01.000Z"),
Filter("id", "=", "taxii stix object ID"),
Filter("type", "=", "taxii stix object ID"),
Filter("version", "=", "first"),
Filter("created_by_ref", "=", "Bane"),
]
expected_params = {
@ -78,90 +58,52 @@ def test_parse_taxii_filters():
def test_add_get_remove_filter():
# First 3 filters are valid, remaining fields are erroneous in some way
filters = [
{
"field": "type",
"op": '=',
"value": "malware"
},
{
"field": "id",
"op": "!=",
"value": "stix object id"
},
{
"field": "labels",
"op": "in",
"value": ["heartbleed", "malicious-activity"]
},
{
"field": "revoked",
"value": "filter missing \'op\' field"
},
{
"field": "description",
"op": "=",
"value": "not supported field - just place holder"
},
{
"field": "modified",
"op": "*",
"value": "not supported operator - just place holder"
},
{
"field": "created",
"op": "=",
"value": set(),
}
valid_filters = [
Filter('type', '=', 'malware'),
Filter('id', '!=', 'stix object id'),
Filter('labels', 'in', ["heartbleed", "malicious-activity"]),
]
expected_errors = [
"Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.",
"Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported",
"Filter operation(from 'op' field) not supported",
"Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary"
invalid_filters = [
Filter('description', '=', 'not supported field - just place holder'),
Filter('modified', '*', 'not supported operator - just place holder'),
Filter('created', '=', object()),
]
ds = DataSource()
# add
ids, statuses = ds.add_filter(filters)
# 7 filters should have been successfully added
assert len(ids) == 7
assert len(ds.filters) == 0
# all filters added to data source
for idx, status in enumerate(statuses):
assert status['filter'] == filters[idx]
ds.add_filter(valid_filters[0])
assert len(ds.filters) == 1
# proper status warnings were triggered
assert statuses[3]['errors'][0] == expected_errors[0]
assert statuses[4]['errors'][0] == expected_errors[1]
assert statuses[5]['errors'][0] == expected_errors[2]
assert statuses[6]['errors'][0] == expected_errors[3]
# Addin the same filter again will have no effect since `filters` uses a set
ds.add_filter(valid_filters[0])
assert len(ds.filters) == 1
# get
ds_filters = ds.get_filters()
ds.add_filter(valid_filters[1])
assert len(ds.filters) == 2
ds.add_filter(valid_filters[2])
assert len(ds.filters) == 3
# TODO: what are we trying to test here?
for idx, flt in enumerate(filters):
assert flt['value'] == ds_filters[idx]['value']
# TODO: make better error messages
with pytest.raises(ValueError) as excinfo:
ds.add_filter(invalid_filters[0])
assert str(excinfo.value) == "Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported"
with pytest.raises(ValueError) as excinfo:
ds.add_filter(invalid_filters[1])
assert str(excinfo.value) == "Filter operation(from 'op' field) not supported"
with pytest.raises(ValueError) as excinfo:
ds.add_filter(invalid_filters[2])
assert str(excinfo.value) == "Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary"
assert set(valid_filters) == ds.filters
# remove
ds.remove_filter([ids[3]])
ds.remove_filter([ids[4]])
ds.remove_filter([ids[5]])
ds.remove_filter([ids[6]])
ds.filters.remove(valid_filters[0])
rem_filters = ds.get_filters()
assert len(rem_filters) == 3
# check remaining filters
rem_ids = [f['id'] for f in rem_filters]
# check remaining
for id_ in rem_ids:
assert id_ in ids[:3]
assert len(ds.filters) == 2
def test_apply_common_filters():
@ -201,21 +143,9 @@ def test_apply_common_filters():
]
filters = [
{
"field": "type",
"op": "!=",
"value": "relationship"
},
{
"field": "id",
"op": "=",
"value": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"
},
{
"field": "labels",
"op": "in",
"value": "remote-access-trojan"
}
Filter("type", "!=", "relationship"),
Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"),
Filter("labels", "in", "remote-access-trojan"),
]
ds = DataSource()