From 22c749d0dfb2a07bc003d540ce47c1e06d935f03 Mon Sep 17 00:00:00 2001 From: = Date: Wed, 30 Aug 2017 11:18:11 -0400 Subject: [PATCH 1/3] filter changes --- stix2/sources/__init__.py | 180 +--------------------------- stix2/sources/filesystem.py | 23 +--- stix2/sources/filters.py | 227 ++++++++++++++++++++++++++++++++++++ stix2/sources/memory.py | 23 +--- stix2/sources/taxii.py | 20 +--- 5 files changed, 240 insertions(+), 233 deletions(-) create mode 100644 stix2/sources/filters.py diff --git a/stix2/sources/__init__.py b/stix2/sources/__init__.py index d8676ca..ba5528b 100644 --- a/stix2/sources/__init__.py +++ b/stix2/sources/__init__.py @@ -16,56 +16,19 @@ Notes: """ -import collections import copy import uuid from six import iteritems - -class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])): - __slots__ = () - - def __new__(cls, field, op, value): - # If value is a list, convert it to a tuple so it is hashable. - if isinstance(value, list): - value = tuple(value) - self = super(Filter, cls).__new__(cls, field, op, value) - return self +from filters import (FILTER_OPS, FILTER_VALUE_TYPES, STIX_COMMON_FIELDS, + STIX_COMMON_FILTERS_MAP) def make_id(): return str(uuid.uuid4()) -# Currently, only STIX 2.0 common SDO fields (that are not complex objects) -# are supported for filtering on -STIX_COMMON_FIELDS = [ - "created", - "created_by_ref", - "external_references.source_name", - "external_references.description", - "external_references.url", - "external_references.hashes", - "external_references.external_id", - "granular_markings.marking_ref", - "granular_markings.selectors", - "id", - "labels", - "modified", - "object_marking_refs", - "revoked", - "type", - "granular_markings" -] - -# Supported filter operations -FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<='] - -# Supported filter value types -FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple] - - class DataStore(object): """ An implementer will create a concrete subclass from @@ -306,7 +269,7 @@ class DataSource(object): clean = False break try: - match = getattr(STIXCommonPropertyFilters, filter_.field)(filter_, stix_obj) + match = STIX_COMMON_FILTERS_MAP[filter_.field](filter_, stix_obj) if not match: clean = False break @@ -527,140 +490,3 @@ class CompositeDataSource(DataSource): """ return copy.deepcopy(self.data_sources.values()) - - -class STIXCommonPropertyFilters(object): - """ - """ - @classmethod - def _all(cls, filter_, stix_obj_field): - """all filter operations (for filters whose value type can be applied to any operation type)""" - if filter_.op == "=": - return stix_obj_field == filter_.value - elif filter_.op == "!=": - return stix_obj_field != filter_.value - elif filter_.op == "in": - return stix_obj_field in filter_.value - elif filter_.op == ">": - return stix_obj_field > filter_.value - elif filter_.op == "<": - return stix_obj_field < filter_.value - elif filter_.op == ">=": - return stix_obj_field >= filter_.value - elif filter_.op == "<=": - return stix_obj_field <= filter_.value - else: - return -1 - - @classmethod - def _id(cls, filter_, stix_obj_id): - """base filter types""" - if filter_.op == "=": - return stix_obj_id == filter_.value - elif filter_.op == "!=": - return stix_obj_id != filter_.value - else: - return -1 - - @classmethod - def _boolean(cls, filter_, stix_obj_field): - if filter_.op == "=": - return stix_obj_field == filter_.value - elif filter_.op == "!=": - return stix_obj_field != filter_.value - else: - return -1 - - @classmethod - def _string(cls, filter_, stix_obj_field): - return cls._all(filter_, stix_obj_field) - - @classmethod - def _timestamp(cls, filter_, stix_obj_timestamp): - return cls._all(filter_, stix_obj_timestamp) - - # STIX 2.0 Common Property filters - @classmethod - def created(cls, filter_, stix_obj): - return cls._timestamp(filter_, stix_obj["created"]) - - @classmethod - def created_by_ref(cls, filter_, stix_obj): - return cls._id(filter_, stix_obj["created_by_ref"]) - - @classmethod - def external_references(cls, filter_, stix_obj): - """ - STIX object's can have a list of external references - - external_references properties: - external_references.source_name (string) - external_references.description (string) - external_references.url (string) - external_references.hashes (hash, but for filtering purposes, a string) - external_references.external_id (string) - - """ - for er in stix_obj["external_references"]: - # grab er property name from filter field - filter_field = filter_.field.split(".")[1] - r = cls._string(filter_, er[filter_field]) - if r: - return r - return False - - @classmethod - def granular_markings(cls, filter_, stix_obj): - """ - STIX object's can have a list of granular marking references - - granular_markings properties: - granular_markings.marking_ref (id) - granular_markings.selectors (string) - - """ - for gm in stix_obj["granular_markings"]: - # grab gm property name from filter field - filter_field = filter_.field.split(".")[1] - - if filter_field == "marking_ref": - return cls._id(filter_, gm[filter_field]) - - elif filter_field == "selectors": - for selector in gm[filter_field]: - r = cls._string(filter_, selector) - if r: - return r - return False - - @classmethod - def id(cls, filter_, stix_obj): - return cls._id(filter_, stix_obj["id"]) - - @classmethod - def labels(cls, filter_, stix_obj): - for label in stix_obj["labels"]: - r = cls._string(filter_, label) - if r: - return r - return False - - @classmethod - def modified(cls, filter_, stix_obj): - return cls._timestamp(filter_, stix_obj["created"]) - - @classmethod - def object_markings_ref(cls, filter_, stix_obj): - for marking_id in stix_obj["object_market_refs"]: - r = cls._id(filter_, marking_id) - if r: - return r - return False - - @classmethod - def revoked(cls, filter_, stix_obj): - return cls._boolean(filter_, stix_obj["revoked"]) - - @classmethod - def type(cls, filter_, stix_obj): - return cls._string(filter_, stix_obj["type"]) diff --git a/stix2/sources/filesystem.py b/stix2/sources/filesystem.py index 0613ac0..c8ad0b0 100644 --- a/stix2/sources/filesystem.py +++ b/stix2/sources/filesystem.py @@ -13,7 +13,7 @@ import json import os from stix2 import Bundle -from stix2.sources import DataSink, DataSource, DataStore +from stix2.sources import DataSink, DataSource, DataStore, Filter class FileSystemStore(DataStore): @@ -77,13 +77,7 @@ class FileSystemSource(DataSource): def get(self, stix_id, _composite_filters=None): """ """ - query = [ - { - "field": "id", - "op": "=", - "value": stix_id - } - ] + query = [Filter("id", "=", stix_id)] all_data = self.query(query=query, _composite_filters=_composite_filters) @@ -95,21 +89,10 @@ class FileSystemSource(DataSource): """ Notes: Since FileSystem sources/sinks don't handle multiple versions - of a STIX object, this operation is futile. Pass call to get(). - (Approved by G.B.) + of a STIX object, this operation is unnecessary. Pass call to get(). """ - # query = [ - # { - # "field": "id", - # "op": "=", - # "value": stix_id - # } - # ] - - # all_data = self.query(query=query, _composite_filters=_composite_filters) - return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)] def query(self, query=None, _composite_filters=None): diff --git a/stix2/sources/filters.py b/stix2/sources/filters.py new file mode 100644 index 0000000..baa32c8 --- /dev/null +++ b/stix2/sources/filters.py @@ -0,0 +1,227 @@ +""" +Filters for Python STIX 2.0 DataSources, DataSinks, DataStores + +Classes: + Filter + +TODO: The script at the bottom of the module works (to capture +all the callable filter methods), however it causes this module +to be imported by itself twice. Not sure how big of deal that is, +or if cleaner solution possible. +""" + +import collections +import types + +import filters + +# Currently, only STIX 2.0 common SDO fields (that are not complex objects) +# are supported for filtering on +STIX_COMMON_FIELDS = [ + "created", + "created_by_ref", + "external_references.source_name", + "external_references.description", + "external_references.url", + "external_references.hashes", + "external_references.external_id", + "granular_markings.marking_ref", + "granular_markings.selectors", + "id", + "labels", + "modified", + "object_marking_refs", + "revoked", + "type", + "granular_markings" +] + +# Supported filter operations +FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<='] + +# Supported filter value types +FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple] + +# filter lookup map - STIX 2 common fields -> filter method +STIX_COMMON_FILTERS_MAP = {} + + +class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])): + __slots__ = () + + def __new__(cls, field, op, value): + # If value is a list, convert it to a tuple so it is hashable. + if isinstance(value, list): + value = tuple(value) + self = super(Filter, cls).__new__(cls, field, op, value) + return self + + +# primitive type filters + +def _all_filter(filter_, stix_obj_field): + """all filter operations (for filters whose value type can be applied to any operation type)""" + if filter_.op == "=": + return stix_obj_field == filter_.value + elif filter_.op == "!=": + return stix_obj_field != filter_.value + elif filter_.op == "in": + return stix_obj_field in filter_.value + elif filter_.op == ">": + return stix_obj_field > filter_.value + elif filter_.op == "<": + return stix_obj_field < filter_.value + elif filter_.op == ">=": + return stix_obj_field >= filter_.value + elif filter_.op == "<=": + return stix_obj_field <= filter_.value + else: + return -1 + + +def _id_filter(filter_, stix_obj_id): + """base filter types""" + if filter_.op == "=": + return stix_obj_id == filter_.value + elif filter_.op == "!=": + return stix_obj_id != filter_.value + else: + return -1 + + +def _boolean_filter(filter_, stix_obj_field): + if filter_.op == "=": + return stix_obj_field == filter_.value + elif filter_.op == "!=": + return stix_obj_field != filter_.value + else: + return -1 + + +def _string_filter(filter_, stix_obj_field): + return _all_filter(filter_, stix_obj_field) + + +def _timestamp_filter(filter_, stix_obj_timestamp): + return _all_filter(filter_, stix_obj_timestamp) + +# STIX 2.0 Common Property filters +# The naming of these functions is important as +# they are used to index a mapping dictionary from +# STIX common field names to these filter functions. +# +# REQUIRED naming scheme: +# "check__filter" + + +def check_created_filter(filter_, stix_obj): + return _timestamp_filter(filter_, stix_obj["created"]) + + +def check_created_by_ref_filter(filter_, stix_obj): + return _id_filter(filter_, stix_obj["created_by_ref"]) + + +def check_external_references_filter(filter_, stix_obj): + """ + STIX object's can have a list of external references + + external_references properties: + external_references.source_name (string) + external_references.description (string) + external_references.url (string) + external_references.hashes (hash, but for filtering purposes, a string) + external_references.external_id (string) + + """ + for er in stix_obj["external_references"]: + # grab er property name from filter field + filter_field = filter_.field.split(".")[1] + r = _string_filter(filter_, er[filter_field]) + if r: + return r + return False + + +def check_granular_markings_filter(filter_, stix_obj): + """ + STIX object's can have a list of granular marking references + + granular_markings properties: + granular_markings.marking_ref (id) + granular_markings.selectors (string) + + """ + for gm in stix_obj["granular_markings"]: + # grab gm property name from filter field + filter_field = filter_.field.split(".")[1] + + if filter_field == "marking_ref": + return _id_filter(filter_, gm[filter_field]) + + elif filter_field == "selectors": + for selector in gm[filter_field]: + r = _string_filter(filter_, selector) + if r: + return r + return False + + +def check_id_filter(filter_, stix_obj): + return _id_filter(filter_, stix_obj["id"]) + + +def check_labels_filter(filter_, stix_obj): + for label in stix_obj["labels"]: + r = _string_filter(filter_, label) + if r: + return r + return False + + +def check_modified_filter(filter_, stix_obj): + return _timestamp_filter(filter_, stix_obj["created"]) + + +def check_object_markings_ref_filter(filter_, stix_obj): + for marking_id in stix_obj["object_market_refs"]: + r = _id_filter(filter_, marking_id) + if r: + return r + return False + + +def check_revoked_filter(filter_, stix_obj): + return _boolean_filter(filter_, stix_obj["revoked"]) + + +def check_type_filter(filter_, stix_obj): + return _string_filter(filter_, stix_obj["type"]) + + +# script to collect STIX common field filter +# functions and create mapping to them + +""" +MK: I want to build the filter name -> filter function dictionary +dynamically whenever it is imported. By enumerating the functions +in this module, extracting the "check*" functions and making +pointers to them. But having issues getting an interable of the +modules entities. globals() works but returns an active dictionary +so iterating over it is a no go +""" + +for entity in dir(filters): + if "check_" in str(entity) and isinstance(filters.__dict__.get(entity), types.FunctionType): + field_name = entity.split("_")[1].split("_")[0] + STIX_COMMON_FILTERS_MAP[field_name] = filters.__dict__.get(entity) + +# Tried this to, didnt work ############## +""" +import sys +for entity in dir(sys.modules[__name__]): + print(entity) + if "check_" in str(entity) and type(entity) == "function": + print(sys.modules[__name__].__dict__.get(entity)) + STIX_COMMON_FILTERS_MAP[str(entity)] = sys.modules[__name__].__dict__.get(entity) +""" diff --git a/stix2/sources/memory.py b/stix2/sources/memory.py index 24f3c1f..bf87517 100644 --- a/stix2/sources/memory.py +++ b/stix2/sources/memory.py @@ -22,7 +22,7 @@ import json import os from stix2 import Bundle -from stix2.sources import DataSink, DataSource, DataStore +from stix2.sources import DataSink, DataSource, DataStore, Filter from stix2validator import validate_string @@ -204,13 +204,7 @@ class MemorySource(DataSource): return stix_obj # if there are filters from the composite level, process full query - query = [ - { - "field": "id", - "op": "=", - "value": stix_id - } - ] + query = [Filter("id", "=", stix_id)] all_data = self.query(query=query, _composite_filters=_composite_filters) @@ -223,21 +217,10 @@ class MemorySource(DataSource): """ Notes: Since Memory sources/sinks don't handle multiple versions of a - STIX object, this operation is futile. Translate call to get(). - (Approved by G.B.) + STIX object, this operation is unnecessary. Translate call to get(). """ - # query = [ - # { - # "field": "id", - # "op": "=", - # "value": stix_id - # } - # ] - - # all_data = self.query(query=query, _composite_filters=_composite_filters) - return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)] def query(self, query=None, _composite_filters=None): diff --git a/stix2/sources/taxii.py b/stix2/sources/taxii.py index 47ad8ed..b9dc8c4 100644 --- a/stix2/sources/taxii.py +++ b/stix2/sources/taxii.py @@ -12,7 +12,7 @@ TODO: Test everything import json -from stix2.sources import DataSink, DataSource, DataStore, make_id +from stix2.sources import DataSink, DataSource, DataStore, Filter, make_id TAXII_FILTERS = ['added_after', 'id', 'type', 'version'] @@ -89,16 +89,8 @@ class TAXIICollectionSource(DataSource): """ # make query in TAXII query format since 'id' is TAXII field query = [ - { - "field": "match[id]", - "op": "=", - "value": stix_id - }, - { - "field": "match[version]", - "op": "=", - "value": "all" - } + Filter("match[id]", "=", stix_id), + Filter("match[version]", "=", "all") ] all_data = self.query(query=query, _composite_filters=_composite_filters) @@ -138,11 +130,7 @@ class TAXIICollectionSource(DataSource): For instance - "?match[type]=indicator,sighting" should be in a query dict as follows: - { - "field": "type" - "op": "=", - "value": "indicator,sighting" - } + Filter("type", "=", "indicator,sighting") Args: query (list): list of filters to extract which ones are TAXII From 7b46283a5cc9b19037861726c69960145175338e Mon Sep 17 00:00:00 2001 From: Greg Back Date: Thu, 31 Aug 2017 18:03:12 +0000 Subject: [PATCH 2/3] Build filter function map --- stix2/sources/__init__.py | 6 ++--- stix2/sources/filters.py | 39 +++++++-------------------------- stix2/sources/memory.py | 3 ++- stix2/sources/taxii.py | 3 ++- stix2/test/test_data_sources.py | 3 ++- 5 files changed, 17 insertions(+), 37 deletions(-) diff --git a/stix2/sources/__init__.py b/stix2/sources/__init__.py index 53b005e..7241a0b 100644 --- a/stix2/sources/__init__.py +++ b/stix2/sources/__init__.py @@ -20,8 +20,8 @@ import uuid from six import iteritems -from filters import (FILTER_OPS, FILTER_VALUE_TYPES, STIX_COMMON_FIELDS, - STIX_COMMON_FILTERS_MAP) +from stix2.sources.filters import (FILTER_OPS, FILTER_VALUE_TYPES, + STIX_COMMON_FIELDS, STIX_COMMON_FILTERS_MAP) def make_id(): @@ -273,7 +273,7 @@ class DataSource(object): clean = False break - match = STIX_COMMON_FILTERS_MAP[filter_.field](filter_, stix_obj) + match = STIX_COMMON_FILTERS_MAP[filter_.field.split('.')[0]](filter_, stix_obj) if not match: clean = False break diff --git a/stix2/sources/filters.py b/stix2/sources/filters.py index baa32c8..7758369 100644 --- a/stix2/sources/filters.py +++ b/stix2/sources/filters.py @@ -13,8 +13,6 @@ or if cleaner solution possible. import collections import types -import filters - # Currently, only STIX 2.0 common SDO fields (that are not complex objects) # are supported for filtering on STIX_COMMON_FIELDS = [ @@ -180,11 +178,11 @@ def check_labels_filter(filter_, stix_obj): def check_modified_filter(filter_, stix_obj): - return _timestamp_filter(filter_, stix_obj["created"]) + return _timestamp_filter(filter_, stix_obj["modified"]) -def check_object_markings_ref_filter(filter_, stix_obj): - for marking_id in stix_obj["object_market_refs"]: +def check_object_marking_refs_filter(filter_, stix_obj): + for marking_id in stix_obj["object_marking_refs"]: r = _id_filter(filter_, marking_id) if r: return r @@ -199,29 +197,8 @@ def check_type_filter(filter_, stix_obj): return _string_filter(filter_, stix_obj["type"]) -# script to collect STIX common field filter -# functions and create mapping to them - -""" -MK: I want to build the filter name -> filter function dictionary -dynamically whenever it is imported. By enumerating the functions -in this module, extracting the "check*" functions and making -pointers to them. But having issues getting an interable of the -modules entities. globals() works but returns an active dictionary -so iterating over it is a no go -""" - -for entity in dir(filters): - if "check_" in str(entity) and isinstance(filters.__dict__.get(entity), types.FunctionType): - field_name = entity.split("_")[1].split("_")[0] - STIX_COMMON_FILTERS_MAP[field_name] = filters.__dict__.get(entity) - -# Tried this to, didnt work ############## -""" -import sys -for entity in dir(sys.modules[__name__]): - print(entity) - if "check_" in str(entity) and type(entity) == "function": - print(sys.modules[__name__].__dict__.get(entity)) - STIX_COMMON_FILTERS_MAP[str(entity)] = sys.modules[__name__].__dict__.get(entity) -""" +# Create mapping of field names to filter functions +for name, obj in dict(globals()).items(): + if "check_" in name and isinstance(obj, types.FunctionType): + field_name = "_".join(name.split("_")[1:-1]) + STIX_COMMON_FILTERS_MAP[field_name] = obj diff --git a/stix2/sources/memory.py b/stix2/sources/memory.py index 9f2fa49..8cf8e20 100644 --- a/stix2/sources/memory.py +++ b/stix2/sources/memory.py @@ -24,7 +24,8 @@ import os from stix2validator import validate_string from stix2 import Bundle -from stix2.sources import DataSink, DataSource, DataStore, Filter +from stix2.sources import DataSink, DataSource, DataStore +from stix2.sources.filters import Filter class MemoryStore(DataStore): diff --git a/stix2/sources/taxii.py b/stix2/sources/taxii.py index b9dc8c4..41632ae 100644 --- a/stix2/sources/taxii.py +++ b/stix2/sources/taxii.py @@ -12,7 +12,8 @@ TODO: Test everything import json -from stix2.sources import DataSink, DataSource, DataStore, Filter, make_id +from stix2.sources import DataSink, DataSource, DataStore, make_id +from stix2.sources.filters import Filter TAXII_FILTERS = ['added_after', 'id', 'type', 'version'] diff --git a/stix2/test/test_data_sources.py b/stix2/test/test_data_sources.py index ee37825..79e0c8b 100644 --- a/stix2/test/test_data_sources.py +++ b/stix2/test/test_data_sources.py @@ -2,7 +2,8 @@ import pytest from taxii2client import Collection from stix2.sources import (CompositeDataSource, DataSink, DataSource, - DataStore, Filter, make_id, taxii) + DataStore, make_id, taxii) +from stix2.sources.filters import Filter from stix2.sources.memory import MemorySource COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/' From ba846f95016ae3160c904334a3c7ab4b9ccaa3ce Mon Sep 17 00:00:00 2001 From: Greg Back Date: Thu, 31 Aug 2017 18:23:08 +0000 Subject: [PATCH 3/3] Clean up some tests. --- stix2/sources/__init__.py | 47 +++--- stix2/test/test_data_sources.py | 267 ++++++++++++++++---------------- 2 files changed, 157 insertions(+), 157 deletions(-) diff --git a/stix2/sources/__init__.py b/stix2/sources/__init__.py index 7241a0b..f702748 100644 --- a/stix2/sources/__init__.py +++ b/stix2/sources/__init__.py @@ -253,34 +253,31 @@ class DataSource(object): for stix_obj in stix_objs: clean = True for filter_ in query: - try: - # skip filter as filter was identified (when added) as - # not a common filter - if filter_.field not in STIX_COMMON_FIELDS: - raise Exception("Error, field: {0} is not supported for filtering on.".format(filter_.field)) + # skip filter as filter was identified (when added) as + # not a common filter + if filter_.field not in STIX_COMMON_FIELDS: + raise ValueError("Error, field: {0} is not supported for filtering on.".format(filter_.field)) - # For properties like granular_markings and external_references - # need to break the first property from the string. - if "." in filter_.field: - field = filter_.field.split(".")[0] - else: - field = filter_.field + # For properties like granular_markings and external_references + # need to break the first property from the string. + if "." in filter_.field: + field = filter_.field.split(".")[0] + else: + field = filter_.field - # check filter "field" is in STIX object - if cant be - # applied due to STIX object, STIX object is discarded - # (i.e. did not make it through the filter) - if field not in stix_obj.keys(): - clean = False - break + # check filter "field" is in STIX object - if cant be + # applied due to STIX object, STIX object is discarded + # (i.e. did not make it through the filter) + if field not in stix_obj.keys(): + clean = False + break - match = STIX_COMMON_FILTERS_MAP[filter_.field.split('.')[0]](filter_, stix_obj) - if not match: - clean = False - break - elif match == -1: - raise Exception("Error, filter operator: {0} not supported for specified field: {1}".format(filter_.op, filter_.field)) - except Exception as e: - raise ValueError(e) + match = STIX_COMMON_FILTERS_MAP[filter_.field.split('.')[0]](filter_, stix_obj) + if not match: + clean = False + break + elif match == -1: + raise ValueError("Error, filter operator: {0} not supported for specified field: {1}".format(filter_.op, filter_.field)) # if object unmarked after all filters, add it if clean: diff --git a/stix2/test/test_data_sources.py b/stix2/test/test_data_sources.py index 79e0c8b..76934fb 100644 --- a/stix2/test/test_data_sources.py +++ b/stix2/test/test_data_sources.py @@ -19,107 +19,110 @@ def collection(): return Collection(COLLECTION_URL, MockTAXIIClient()) -STIX_OBJS1 = [ - { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" - }, - { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" - }, - { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.936Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" - }, - { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" - }, - { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" - } -] +@pytest.fixture +def ds(): + return DataSource() -STIX_OBJS2 = [ - { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-31T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" - }, - { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" - }, - { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" - } -] + +IND1 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" +} +IND2 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" +} +IND3 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.936Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" +} +IND4 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" +} +IND5 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" +} +IND6 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-31T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" +} +IND7 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" +} +IND8 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" +} + +STIX_OBJS2 = [IND6, IND7, IND8] +STIX_OBJS1 = [IND1, IND2, IND3, IND4, IND5] def test_ds_smoke(): @@ -173,7 +176,7 @@ def test_parse_taxii_filters(): assert taxii_filters == expected_params -def test_add_get_remove_filter(): +def test_add_get_remove_filter(ds): # First 3 filters are valid, remaining fields are erroneous in some way valid_filters = [ @@ -187,8 +190,6 @@ def test_add_get_remove_filter(): Filter('created', '=', object()), ] - ds = DataSource() - assert len(ds.filters) == 0 ds.add_filter(valid_filters[0]) @@ -226,7 +227,7 @@ def test_add_get_remove_filter(): ds.add_filters(valid_filters) -def test_apply_common_filters(): +def test_apply_common_filters(ds): stix_objs = [ { "created": "2017-01-27T13:49:53.997Z", @@ -287,8 +288,6 @@ def test_apply_common_filters(): Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"), ] - ds = DataSource() - resp = ds.apply_common_filters(stix_objs, [filters[0]]) ids = [r['id'] for r in resp] assert stix_objs[0]['id'] in ids @@ -328,61 +327,65 @@ def test_apply_common_filters(): assert resp[0]['id'] == stix_objs[2]['id'] assert len(resp) == 1 - # These are used with STIX_OBJS2 - more_filters = [ - Filter("modified", "<", "2017-01-28T13:49:53.935Z"), - Filter("modified", ">", "2017-01-28T13:49:53.935Z"), - Filter("modified", ">=", "2017-01-27T13:49:53.935Z"), - Filter("modified", "<=", "2017-01-27T13:49:53.935Z"), - Filter("modified", "?", "2017-01-27T13:49:53.935Z"), - Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"), - Filter("id", "?", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"), - Filter("notacommonproperty", "=", "bar"), - ] - resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[0]]) +def test_filters0(ds): + resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")]) assert resp[0]['id'] == STIX_OBJS2[1]['id'] assert len(resp) == 2 - resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[1]]) + +def test_filters1(ds): + resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")]) assert resp[0]['id'] == STIX_OBJS2[0]['id'] assert len(resp) == 1 - resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[2]]) + +def test_filters2(ds): + resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")]) assert resp[0]['id'] == STIX_OBJS2[0]['id'] assert len(resp) == 3 - resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[3]]) + +def test_filters3(ds): + resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")]) assert resp[0]['id'] == STIX_OBJS2[1]['id'] assert len(resp) == 2 + +def test_filters4(ds): + fltr4 = Filter("modified", "?", "2017-01-27T13:49:53.935Z") with pytest.raises(ValueError) as excinfo: - ds.apply_common_filters(STIX_OBJS2, [more_filters[4]]) + ds.apply_common_filters(STIX_OBJS2, [fltr4]) assert str(excinfo.value) == ("Error, filter operator: {0} not supported " - "for specified field: {1}").format(more_filters[4].op, - more_filters[4].field) + "for specified field: {1}").format(fltr4.op, fltr4.field) - resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[5]]) + +def test_filters5(ds): + resp = ds.apply_common_filters(STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")]) assert resp[0]['id'] == STIX_OBJS2[0]['id'] assert len(resp) == 1 + +def test_filters6(ds): + fltr6 = Filter("id", "?", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f") with pytest.raises(ValueError) as excinfo: - ds.apply_common_filters(STIX_OBJS2, [more_filters[6]]) + ds.apply_common_filters(STIX_OBJS2, [fltr6]) assert str(excinfo.value) == ("Error, filter operator: {0} not supported " - "for specified field: {1}").format(more_filters[6].op, - more_filters[6].field) + "for specified field: {1}").format(fltr6.op, fltr6.field) + +def test_filters7(ds): + fltr7 = Filter("notacommonproperty", "=", "bar") with pytest.raises(ValueError) as excinfo: - ds.apply_common_filters(STIX_OBJS2, [more_filters[7]]) + ds.apply_common_filters(STIX_OBJS2, [fltr7]) assert str(excinfo.value) == ("Error, field: {0} is not supported for " - "filtering on.".format(more_filters[7].field)) + "filtering on.".format(fltr7.field)) -def test_deduplicate(): - ds = DataSource() +def test_deduplicate(ds): unique = ds.deduplicate(STIX_OBJS1) # Only 3 objects are unique