diff --git a/stix2/sources/__init__.py b/stix2/sources/__init__.py index 5a61339..a813a13 100644 --- a/stix2/sources/__init__.py +++ b/stix2/sources/__init__.py @@ -14,15 +14,28 @@ NOTE: add_filter(), remove_filter(), deduplicate() - if these functions remain make those functions an interface to inherit? """ +import collections import copy import uuid from six import iteritems +class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])): + __slots__ = () + + def __new__(cls, field, op, value): + # If value is a list, convert it to a tuple so it is hashable. + if isinstance(value, list): + value = tuple(value) + self = super(Filter, cls).__new__(cls, field, op, value) + return self + + def make_id(): return str(uuid.uuid4()) + # Currently, only STIX 2.0 common SDO fields (that are not compex objects) # are supported for filtering on STIX_COMMON_FIELDS = [ @@ -44,10 +57,6 @@ STIX_COMMON_FIELDS = [ "granular_markings" ] - -# Required fields in filter(dict) -FILTER_FIELDS = ['field', 'op', 'value'] - # Supported filter operations FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<='] @@ -162,8 +171,7 @@ class DataSource(object): def __init__(self, name="DataSource"): self.name = name self.id = make_id() - self.filters = {} - self.filter_allowed = {} + self.filters = set() def get(self, stix_id, _composite_filters=None): """ @@ -230,114 +238,32 @@ class DataSource(object): raise NotImplementedError() - def add_filter(self, filters): - """add/attach a filter to the Data Source instance + def add_filters(self, filters): + """Add multiple filters to the DataSource. Args: - filters (list): list of filters (dict) to add to the Data Source - - Returns: - status (list): list of status/error messages - + filter (list): list of filters (dict) to add to the Data Source. """ + for filter in filters: + self.add_filter(filter) - status = [] - errors = [] - ids = [] - allowed = True + def add_filter(self, filter): + """Add a filter.""" + # check filter field is a supported STIX 2.0 common field + if filter.field not in STIX_COMMON_FIELDS: + raise ValueError("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported") - for filter_ in filters: - # check required filter components ("field", "op", "value") exist - for field in FILTER_FIELDS: - if field not in filter_.keys(): - allowed = False - errors.append("Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.") - break + # check filter operator is supported + if filter.op not in FILTER_OPS: + raise ValueError("Filter operation(from 'op' field) not supported") - if allowed: - # no need for further checks if filter is missing parameters + # check filter value type is supported + if type(filter.value) not in FILTER_VALUE_TYPES: + raise ValueError("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary") - # check filter field is a supported STIX 2.0 common field - if filter_['field'] not in STIX_COMMON_FIELDS: - allowed = False - errors.append("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported") + self.filters.add(filter) - # check filter operator is supported - if filter_['op'] not in FILTER_OPS: - allowed = False - errors.append("Filter operation(from 'op' field) not supported") - - # check filter value type is supported - if type(filter_['value']) not in FILTER_VALUE_TYPES: - allowed = False - errors.append("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary") - - # Filter is added regardless of whether it fits requirements - # to be a common filter. This is done because some filters - # may be added and used by third party Data Sources, where the - # filtering may be conducted within those plugins, just not here - - id_ = make_id() - filter_['id'] = id_ - self.filters[id_] = filter_ - ids.append(id_) - - if allowed: - self.filter_allowed[id_] = True - status.append({ - "status": "added as a common filter", - "filter": filter_, - "data_source_name": self.name, - "data_source_id": self.id, - }) - else: - self.filter_allowed[id_] = False - status.append({ - "status": "added but is not a common filter", - "filter": filter_, - "errors": copy.deepcopy(errors), - "data_source_name": self.name, - "data_source_id": self.id, - }) - del errors[:] - - allowed = True - - return ids, status - - def remove_filter(self, filter_ids): - """remove/detach a filter from the Data Source instance - - Args: - filter_ids (list): list of filter ids to dettach/remove - from Data Source - - Returns: - - - """ - for filter_id in filter_ids: - try: - if filter_id in self.filters: - del self.filters[filter_id] - del self.filter_allowed[filter_id] - except KeyError: - # filter 'id' not found list of filters attached to Data Source - pass - - return - - def get_filters(self): - """return copy of all filters currently attached to Data Source - - TODO: make this a property? - - Returns: - (list): a copy of all the filters(dict) which are attached - to Data Source - - """ - return copy.deepcopy(list(self.filters.values())) + # TODO: Do we need a remove_filter function? def apply_common_filters(self, stix_objs, query): """evaluates filters against a set of STIX 2.0 objects @@ -599,114 +525,6 @@ class CompositeDataSource(object): """ return copy.deepcopy(self.data_sources.values()) - def add_filter(self, filters): - """add/attach a filter to the Composite Data Source instance - - Args: - filters (list): list of filters (dict) to add to the Data Source - - Returns: - status (list): list of status/error messages - - """ - - status = [] - errors = [] - ids = [] - allowed = True - - for filter_ in filters: - # check required filter components ("field", "op", "value") exist - for field in FILTER_FIELDS: - if field not in filter_.keys(): - allowed = False - errors.append("Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.") - break - - if allowed: - # no need for further checks if filter is missing parameters - - # check filter field is a supported STIX 2.0 common field - if filter_['field'] not in STIX_COMMON_FIELDS: - allowed = False - errors.append("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported") - - # check filter operator is supported - if filter_['op'] not in FILTER_OPS: - allowed = False - errors.append("Filter operation(from 'op' field) not supported") - - # check filter value type is supported - if type(filter_['value']) not in FILTER_VALUE_TYPES: - allowed = False - errors.append("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary") - - # Filter is added regardless of whether it fits requirements - # to be a common filter. This is done because some filters - # may be added and used by third party Data Sources, where the - # filtering may be conducted within those plugins, just not here - - id_ = make_id() - filter_['id'] = id_ - self.filters['id_'] = filter_ - ids.append(id_) - - if allowed: - self.filter_allowed[id_] = True - status.append({ - "status": "added as a common filter", - "filter": filter_, - "data_source_name": self.name, - "data_source_id": self.id - }) - else: - self.filter_allowed[id_] = False - status.append({ - "status": "added but is not a common filter", - "filter": filter_, - "errors": errors, - "data_source_name": self.name, - "data_source_id": self.id - }) - del errors[:] - - allowed = True - - return ids, status - - def remove_filter(self, filter_ids): - """Remove/detach a filter from the Data Source instance - - Args: - filter_ids (list): list of filter id's (which are strings) - dettach from the Composite Data Source - - Returns: - - """ - - for filter_id in filter_ids: - try: - if filter_id in self.filters: - del self.filters[filter_id] - del self.filter_allowed[filter_id] - except KeyError: - # filter id not found in list of filters - # attached to the Composite Data Source - pass - - return - - @property - def filters(self): - """return filters attached to Composite Data Source - - Returns: - (list): the list of filters currently attached to the Data Source - - """ - return copy.deepcopy(list(self.filters.values())) - def deduplicate(self, stix_obj_list): """deduplicate a list fo STIX objects to a unique set diff --git a/stix2/test/test_data_sources.py b/stix2/test/test_data_sources.py index b733a19..54fde11 100644 --- a/stix2/test/test_data_sources.py +++ b/stix2/test/test_data_sources.py @@ -1,7 +1,7 @@ import pytest from taxii2_client import Collection -from stix2.sources import DataSource, taxii +from stix2.sources import DataSource, Filter, taxii COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/' @@ -74,94 +74,55 @@ def test_parse_taxii_filters(): assert taxii_filters == expected_params -@pytest.skip def test_add_get_remove_filter(): # First 3 filters are valid, remaining fields are erroneous in some way - filters = [ - { - "field": "type", - "op": '=', - "value": "malware" - }, - { - "field": "id", - "op": "!=", - "value": "stix object id" - }, - { - "field": "labels", - "op": "in", - "value": ["heartbleed", "malicious-activity"] - }, - { - "field": "revoked", - "value": "filter missing \'op\' field" - }, - { - "field": "description", - "op": "=", - "value": "not supported field - just place holder" - }, - { - "field": "modified", - "op": "*", - "value": "not supported operator - just place holder" - }, - { - "field": "created", - "op": "=", - "value": set(), - } + valid_filters = [ + Filter('type', '=', 'malware'), + Filter('id', '!=', 'stix object id'), + Filter('labels', 'in', ["heartbleed", "malicious-activity"]), ] - - expected_errors = [ - "Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.", - "Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported", - "Filter operation(from 'op' field) not supported", - "Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary" + invalid_filters = [ + Filter('description', '=', 'not supported field - just place holder'), + Filter('modified', '*', 'not supported operator - just place holder'), + Filter('created', '=', object()), ] ds = DataSource() - # add - ids, statuses = ds.add_filter(filters) - # 7 filters should have been successfully added - assert len(ids) == 7 + assert len(ds.filters) == 0 - # all filters added to data source - for idx, status in enumerate(statuses): - assert status['filter'] == filters[idx] + ds.add_filter(valid_filters[0]) + assert len(ds.filters) == 1 - # proper status warnings were triggered - assert statuses[3]['errors'][0] == expected_errors[0] - assert statuses[4]['errors'][0] == expected_errors[1] - assert statuses[5]['errors'][0] == expected_errors[2] - assert statuses[6]['errors'][0] == expected_errors[3] + # Addin the same filter again will have no effect since `filters` uses a set + ds.add_filter(valid_filters[0]) + assert len(ds.filters) == 1 - # get - ds_filters = ds.get_filters() + ds.add_filter(valid_filters[1]) + assert len(ds.filters) == 2 + ds.add_filter(valid_filters[2]) + assert len(ds.filters) == 3 - # TODO: what are we trying to test here? - for idx, flt in enumerate(filters): - assert flt['value'] == ds_filters[idx]['value'] + # TODO: make better error messages + with pytest.raises(ValueError) as excinfo: + ds.add_filter(invalid_filters[0]) + assert str(excinfo.value) == "Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported" + + with pytest.raises(ValueError) as excinfo: + ds.add_filter(invalid_filters[1]) + assert str(excinfo.value) == "Filter operation(from 'op' field) not supported" + + with pytest.raises(ValueError) as excinfo: + ds.add_filter(invalid_filters[2]) + assert str(excinfo.value) == "Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary" + + assert set(valid_filters) == ds.filters # remove - ds.remove_filter([ids[3]]) - ds.remove_filter([ids[4]]) - ds.remove_filter([ids[5]]) - ds.remove_filter([ids[6]]) + ds.filters.remove(valid_filters[0]) - rem_filters = ds.get_filters() - - assert len(rem_filters) == 3 - - # check remaining filters - rem_ids = [f['id'] for f in rem_filters] - - # check remaining - for id_ in rem_ids: - assert id_ in ids[:3] + assert len(ds.filters) == 2 def test_apply_common_filters():