From 924c72e98ad5af8c9884b5df6dcb4c87d10fa3d2 Mon Sep 17 00:00:00 2001 From: Chris Lenk Date: Mon, 30 Oct 2017 17:25:32 -0400 Subject: [PATCH] Change filters to allow filtering all properties (Not just common properties) --- stix2/sources/filters.py | 234 +++++++------------------------- stix2/test/test_data_sources.py | 48 ++++++- 2 files changed, 99 insertions(+), 183 deletions(-) diff --git a/stix2/sources/filters.py b/stix2/sources/filters.py index 060d2c3..684c792 100644 --- a/stix2/sources/filters.py +++ b/stix2/sources/filters.py @@ -4,7 +4,6 @@ Filters for Python STIX 2.0 DataSources, DataSinks, DataStores """ import collections -import types # Currently, only STIX 2.0 common SDO fields (that are not complex objects) # are supported for filtering on @@ -34,12 +33,9 @@ FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<='] """Supported filter value types""" FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple] -# filter lookup map - STIX 2 common fields -> filter method -STIX_COMMON_FILTERS_MAP = {} - def _check_filter_components(field, op, value): - """check filter meets minimum validity + """Check that filter meets minimum validity. Note: Currently can create Filters that are not valid STIX2 object common properties, as filter.field value @@ -57,7 +53,7 @@ def _check_filter_components(field, op, value): if type(value) not in FILTER_VALUE_TYPES: # check filter value type is supported - raise TypeError("Filter value type '%s' is not supported. The type must be a python immutable type or dictionary" % type(value)) + raise TypeError("Filter value type '%s' is not supported. The type must be a Python immutable type or dictionary" % type(value)) return True @@ -66,13 +62,11 @@ class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])): """STIX 2 filters that support the querying functionality of STIX 2 DataStores and DataSources. - Initialized like a python tuple + Initialized like a Python tuple. Args: field (str): filter field name, corresponds to STIX 2 object property - op (str): operator of the filter - value (str): filter field value Example: @@ -91,26 +85,11 @@ class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])): self = super(Filter, cls).__new__(cls, field, op, value) return self - @property - def common(self): - """return whether Filter is valid STIX2 Object common property - - Note: The Filter operator and Filter value type are checked when - the filter is created, thus only leaving the Filter field to be - checked to make sure a valid STIX2 Object common property. - - Note: Filters that are not valid STIX2 Object common property - Filters are still allowed to be created for extended usage of - Filter. (e.g. TAXII specific filters can be created, which are - then extracted and sent to TAXII endpoint.) - """ - return self.field in STIX_COMMON_FIELDS - def apply_common_filters(stix_objs, query): """Evaluate filters against a set of STIX 2.0 objects. - Supports only STIX 2.0 common property fields + Supports only STIX 2.0 common property fields. Args: stix_objs (list): list of STIX objects to apply the query to @@ -124,25 +103,7 @@ def apply_common_filters(stix_objs, query): for stix_obj in stix_objs: clean = True for filter_ in query: - if not filter_.common: - # skip filter as it is not a STIX2 Object common property filter - continue - - if "." in filter_.field: - # For properties like granular_markings and external_references - # need to extract the first property from the string. - field = filter_.field.split(".")[0] - else: - field = filter_.field - - if field not in stix_obj.keys(): - # check filter "field" is in STIX object - if cant be - # applied to STIX object, STIX object is discarded - # (i.e. did not make it through the filter) - clean = False - break - - match = STIX_COMMON_FILTERS_MAP[filter_.field.split('.')[0]](filter_, stix_obj) + match = _check_filter(filter_, stix_obj) if not match: clean = False @@ -155,7 +116,53 @@ def apply_common_filters(stix_objs, query): yield stix_obj -"""Base type filters""" +def _check_filter(filter_, stix_obj): + """Evaluate a single filter against a single STIX 2.0 object. + + Args: + filter_ (Filter): filter to match against + stix_obj: STIX object to apply the filter to + + Returns: + True if the stix_obj matches the filter, + False if not. + + """ + if "." in filter_.field: + # For properties like granular_markings and external_references + # need to extract the first property from the string. + field = filter_.field.split(".")[0] + else: + field = filter_.field + + if field not in stix_obj.keys(): + # check filter "field" is in STIX object - if cant be + # applied to STIX object, STIX object is discarded + # (i.e. did not make it through the filter) + return False + + if "." in filter_.field: + # Check embedded properties, from e.g. granular_markings or external_references + sub_field = filter_.field.split(".", 1)[1] + sub_filter = filter_._replace(field=sub_field) + if isinstance(stix_obj[field], list): + for elem in stix_obj[field]: + r = _check_filter(sub_filter, elem) + if r: + return r + return False + else: + return _check_filter(sub_filter, stix_obj[field]) + elif isinstance(stix_obj[field], list): + # Check each item in list property to see if it matches + for elem in stix_obj[field]: + r = _all_filter(filter_, elem) + if r: + return r + return False + else: + # Check if property matches + return _all_filter(filter_, stix_obj[field]) def _all_filter(filter_, stix_obj_field): @@ -176,140 +183,3 @@ def _all_filter(filter_, stix_obj_field): return stix_obj_field <= filter_.value else: return -1 - - -def _id_filter(filter_, stix_obj_id): - """base STIX id filter""" - if filter_.op == "=": - return stix_obj_id == filter_.value - elif filter_.op == "!=": - return stix_obj_id != filter_.value - else: - return -1 - - -def _boolean_filter(filter_, stix_obj_field): - """base boolean filter""" - if filter_.op == "=": - return stix_obj_field == filter_.value - elif filter_.op == "!=": - return stix_obj_field != filter_.value - else: - return -1 - - -def _string_filter(filter_, stix_obj_field): - """base string filter""" - return _all_filter(filter_, stix_obj_field) - - -def _timestamp_filter(filter_, stix_obj_timestamp): - """base STIX 2 timestamp filter""" - return _all_filter(filter_, stix_obj_timestamp) - - -"""STIX 2.0 Common Property Filters - -The naming of these functions is important as -they are used to index a mapping dictionary from -STIX common field names to these filter functions. - -REQUIRED naming scheme: - "check__filter" - -""" - - -def check_created_filter(filter_, stix_obj): - return _timestamp_filter(filter_, stix_obj["created"]) - - -def check_created_by_ref_filter(filter_, stix_obj): - return _id_filter(filter_, stix_obj["created_by_ref"]) - - -def check_external_references_filter(filter_, stix_obj): - """ - STIX object's can have a list of external references - - external_references properties supported: - external_references.source_name (string) - external_references.description (string) - external_references.url (string) - external_references.external_id (string) - - external_references properties not supported: - external_references.hashes - - """ - for er in stix_obj["external_references"]: - # grab er property name from filter field - filter_field = filter_.field.split(".")[1] - if filter_field in er: - r = _string_filter(filter_, er[filter_field]) - if r: - return r - return False - - -def check_granular_markings_filter(filter_, stix_obj): - """ - STIX object's can have a list of granular marking references - - granular_markings properties: - granular_markings.marking_ref (id) - granular_markings.selectors (string) - - """ - for gm in stix_obj["granular_markings"]: - # grab gm property name from filter field - filter_field = filter_.field.split(".")[1] - - if filter_field == "marking_ref": - return _id_filter(filter_, gm[filter_field]) - - elif filter_field == "selectors": - for selector in gm[filter_field]: - r = _string_filter(filter_, selector) - if r: - return r - return False - - -def check_id_filter(filter_, stix_obj): - return _id_filter(filter_, stix_obj["id"]) - - -def check_labels_filter(filter_, stix_obj): - for label in stix_obj["labels"]: - r = _string_filter(filter_, label) - if r: - return r - return False - - -def check_modified_filter(filter_, stix_obj): - return _timestamp_filter(filter_, stix_obj["modified"]) - - -def check_object_marking_refs_filter(filter_, stix_obj): - for marking_id in stix_obj["object_marking_refs"]: - r = _id_filter(filter_, marking_id) - if r: - return r - return False - - -def check_revoked_filter(filter_, stix_obj): - return _boolean_filter(filter_, stix_obj["revoked"]) - - -def check_type_filter(filter_, stix_obj): - return _string_filter(filter_, stix_obj["type"]) - - -# Create mapping of field names to filter functions -for name, obj in dict(globals()).items(): - if "check_" in name and isinstance(obj, types.FunctionType): - field_name = "_".join(name.split("_")[1:-1]) - STIX_COMMON_FILTERS_MAP[field_name] = obj diff --git a/stix2/test/test_data_sources.py b/stix2/test/test_data_sources.py index 689fe8c..583acea 100644 --- a/stix2/test/test_data_sources.py +++ b/stix2/test/test_data_sources.py @@ -226,7 +226,7 @@ def test_add_get_remove_filter(ds): Filter('created', '=', object()) # On Python 2, the type of object() is `` On Python 3, it's ``. assert str(excinfo.value).startswith("Filter value type") - assert str(excinfo.value).endswith("is not supported. The type must be a python immutable type or dictionary") + assert str(excinfo.value).endswith("is not supported. The type must be a Python immutable type or dictionary") assert len(ds.filters) == 0 @@ -443,6 +443,52 @@ def test_filters5(ds): assert len(resp) == 1 +def test_filters6(ds): + # Test filtering on non-common property + resp = list(apply_common_filters(STIX_OBJS2, [Filter("name", "=", "Malicious site hosting downloader")])) + assert resp[0]['id'] == STIX_OBJS2[0]['id'] + assert len(resp) == 3 + + +def test_filters7(ds): + # Test filtering on embedded property + stix_objects = list(STIX_OBJS2) + [{ + "type": "observed-data", + "id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", + "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + "created": "2016-04-06T19:58:16.000Z", + "modified": "2016-04-06T19:58:16.000Z", + "first_observed": "2015-12-21T19:00:00Z", + "last_observed": "2015-12-21T19:00:00Z", + "number_observed": 50, + "objects": { + "0": { + "type": "file", + "hashes": { + "SHA-256": "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100f" + }, + "extensions": { + "pdf-ext": { + "version": "1.7", + "document_info_dict": { + "Title": "Sample document", + "Author": "Adobe Systems Incorporated", + "Creator": "Adobe FrameMaker 5.5.3 for Power Macintosh", + "Producer": "Acrobat Distiller 3.01 for Power Macintosh", + "CreationDate": "20070412090123-02" + }, + "pdfid0": "DFCE52BD827ECF765649852119D", + "pdfid1": "57A1E0F9ED2AE523E313C" + } + } + } + } + }] + resp = list(apply_common_filters(stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")])) + assert resp[0]['id'] == stix_objects[3]['id'] + assert len(resp) == 1 + + def test_deduplicate(ds): unique = deduplicate(STIX_OBJS1)