Merge pull request #48 from oasis-open/filter

Filter updates
stix2.1
Greg Back 2017-08-31 18:39:39 +00:00 committed by GitHub
commit 458b32d124
6 changed files with 374 additions and 371 deletions

View File

@ -16,55 +16,18 @@ Notes:
"""
import collections
import uuid
from six import iteritems
class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])):
__slots__ = ()
def __new__(cls, field, op, value):
# If value is a list, convert it to a tuple so it is hashable.
if isinstance(value, list):
value = tuple(value)
self = super(Filter, cls).__new__(cls, field, op, value)
return self
from stix2.sources.filters import (FILTER_OPS, FILTER_VALUE_TYPES,
STIX_COMMON_FIELDS, STIX_COMMON_FILTERS_MAP)
def make_id():
return str(uuid.uuid4())
# Currently, only STIX 2.0 common SDO fields (that are not complex objects)
# are supported for filtering on
STIX_COMMON_FIELDS = [
"created",
"created_by_ref",
"external_references.source_name",
"external_references.description",
"external_references.url",
"external_references.hashes",
"external_references.external_id",
"granular_markings.marking_ref",
"granular_markings.selectors",
"id",
"labels",
"modified",
"object_marking_refs",
"revoked",
"type",
"granular_markings"
]
# Supported filter operations
FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
# Supported filter value types
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple]
class DataStore(object):
"""
An implementer will create a concrete subclass from
@ -290,34 +253,31 @@ class DataSource(object):
for stix_obj in stix_objs:
clean = True
for filter_ in query:
try:
# skip filter as filter was identified (when added) as
# not a common filter
if filter_.field not in STIX_COMMON_FIELDS:
raise Exception("Error, field: {0} is not supported for filtering on.".format(filter_.field))
# skip filter as filter was identified (when added) as
# not a common filter
if filter_.field not in STIX_COMMON_FIELDS:
raise ValueError("Error, field: {0} is not supported for filtering on.".format(filter_.field))
# For properties like granular_markings and external_references
# need to break the first property from the string.
if "." in filter_.field:
field = filter_.field.split(".")[0]
else:
field = filter_.field
# For properties like granular_markings and external_references
# need to break the first property from the string.
if "." in filter_.field:
field = filter_.field.split(".")[0]
else:
field = filter_.field
# check filter "field" is in STIX object - if cant be
# applied due to STIX object, STIX object is discarded
# (i.e. did not make it through the filter)
if field not in stix_obj.keys():
clean = False
break
# check filter "field" is in STIX object - if cant be
# applied due to STIX object, STIX object is discarded
# (i.e. did not make it through the filter)
if field not in stix_obj.keys():
clean = False
break
match = getattr(STIXCommonPropertyFilters, field)(filter_, stix_obj)
if not match:
clean = False
break
elif match == -1:
raise Exception("Error, filter operator: {0} not supported for specified field: {1}".format(filter_.op, filter_.field))
except Exception as e:
raise ValueError(e)
match = STIX_COMMON_FILTERS_MAP[filter_.field.split('.')[0]](filter_, stix_obj)
if not match:
clean = False
break
elif match == -1:
raise ValueError("Error, filter operator: {0} not supported for specified field: {1}".format(filter_.op, filter_.field))
# if object unmarked after all filters, add it
if clean:
@ -532,140 +492,3 @@ class CompositeDataSource(DataSource):
"""
return self.data_sources.values()
class STIXCommonPropertyFilters(object):
"""
"""
@classmethod
def _all(cls, filter_, stix_obj_field):
"""all filter operations (for filters whose value type can be applied to any operation type)"""
if filter_.op == "=":
return stix_obj_field == filter_.value
elif filter_.op == "!=":
return stix_obj_field != filter_.value
elif filter_.op == "in":
return stix_obj_field in filter_.value
elif filter_.op == ">":
return stix_obj_field > filter_.value
elif filter_.op == "<":
return stix_obj_field < filter_.value
elif filter_.op == ">=":
return stix_obj_field >= filter_.value
elif filter_.op == "<=":
return stix_obj_field <= filter_.value
else:
return -1
@classmethod
def _id(cls, filter_, stix_obj_id):
"""base filter types"""
if filter_.op == "=":
return stix_obj_id == filter_.value
elif filter_.op == "!=":
return stix_obj_id != filter_.value
else:
return -1
@classmethod
def _boolean(cls, filter_, stix_obj_field):
if filter_.op == "=":
return stix_obj_field == filter_.value
elif filter_.op == "!=":
return stix_obj_field != filter_.value
else:
return -1
@classmethod
def _string(cls, filter_, stix_obj_field):
return cls._all(filter_, stix_obj_field)
@classmethod
def _timestamp(cls, filter_, stix_obj_timestamp):
return cls._all(filter_, stix_obj_timestamp)
# STIX 2.0 Common Property filters
@classmethod
def created(cls, filter_, stix_obj):
return cls._timestamp(filter_, stix_obj["created"])
@classmethod
def created_by_ref(cls, filter_, stix_obj):
return cls._id(filter_, stix_obj["created_by_ref"])
@classmethod
def external_references(cls, filter_, stix_obj):
"""
STIX object's can have a list of external references
external_references properties:
external_references.source_name (string)
external_references.description (string)
external_references.url (string)
external_references.hashes (hash, but for filtering purposes, a string)
external_references.external_id (string)
"""
for er in stix_obj["external_references"]:
# grab er property name from filter field
filter_field = filter_.field.split(".")[1]
r = cls._string(filter_, er[filter_field])
if r:
return r
return False
@classmethod
def granular_markings(cls, filter_, stix_obj):
"""
STIX object's can have a list of granular marking references
granular_markings properties:
granular_markings.marking_ref (id)
granular_markings.selectors (string)
"""
for gm in stix_obj["granular_markings"]:
# grab gm property name from filter field
filter_field = filter_.field.split(".")[1]
if filter_field == "marking_ref":
return cls._id(filter_, gm[filter_field])
elif filter_field == "selectors":
for selector in gm[filter_field]:
r = cls._string(filter_, selector)
if r:
return r
return False
@classmethod
def id(cls, filter_, stix_obj):
return cls._id(filter_, stix_obj["id"])
@classmethod
def labels(cls, filter_, stix_obj):
for label in stix_obj["labels"]:
r = cls._string(filter_, label)
if r:
return r
return False
@classmethod
def modified(cls, filter_, stix_obj):
return cls._timestamp(filter_, stix_obj["modified"])
@classmethod
def object_marking_refs(cls, filter_, stix_obj):
for marking_id in stix_obj["object_marking_refs"]:
r = cls._id(filter_, marking_id)
if r:
return r
return False
@classmethod
def revoked(cls, filter_, stix_obj):
return cls._boolean(filter_, stix_obj["revoked"])
@classmethod
def type(cls, filter_, stix_obj):
return cls._string(filter_, stix_obj["type"])

View File

@ -77,9 +77,7 @@ class FileSystemSource(DataSource):
def get(self, stix_id, _composite_filters=None):
"""
"""
query = [
Filter("id", "=", stix_id)
]
query = [Filter("id", "=", stix_id)]
all_data = self.query(query=query, _composite_filters=_composite_filters)
@ -91,21 +89,10 @@ class FileSystemSource(DataSource):
"""
Notes:
Since FileSystem sources/sinks don't handle multiple versions
of a STIX object, this operation is futile. Pass call to get().
(Approved by G.B.)
of a STIX object, this operation is unnecessary. Pass call to get().
"""
# query = [
# {
# "field": "id",
# "op": "=",
# "value": stix_id
# }
# ]
# all_data = self.query(query=query, _composite_filters=_composite_filters)
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]
def query(self, query=None, _composite_filters=None):

204
stix2/sources/filters.py Normal file
View File

@ -0,0 +1,204 @@
"""
Filters for Python STIX 2.0 DataSources, DataSinks, DataStores
Classes:
Filter
TODO: The script at the bottom of the module works (to capture
all the callable filter methods), however it causes this module
to be imported by itself twice. Not sure how big of deal that is,
or if cleaner solution possible.
"""
import collections
import types
# Currently, only STIX 2.0 common SDO fields (that are not complex objects)
# are supported for filtering on
STIX_COMMON_FIELDS = [
"created",
"created_by_ref",
"external_references.source_name",
"external_references.description",
"external_references.url",
"external_references.hashes",
"external_references.external_id",
"granular_markings.marking_ref",
"granular_markings.selectors",
"id",
"labels",
"modified",
"object_marking_refs",
"revoked",
"type",
"granular_markings"
]
# Supported filter operations
FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
# Supported filter value types
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple]
# filter lookup map - STIX 2 common fields -> filter method
STIX_COMMON_FILTERS_MAP = {}
class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])):
__slots__ = ()
def __new__(cls, field, op, value):
# If value is a list, convert it to a tuple so it is hashable.
if isinstance(value, list):
value = tuple(value)
self = super(Filter, cls).__new__(cls, field, op, value)
return self
# primitive type filters
def _all_filter(filter_, stix_obj_field):
"""all filter operations (for filters whose value type can be applied to any operation type)"""
if filter_.op == "=":
return stix_obj_field == filter_.value
elif filter_.op == "!=":
return stix_obj_field != filter_.value
elif filter_.op == "in":
return stix_obj_field in filter_.value
elif filter_.op == ">":
return stix_obj_field > filter_.value
elif filter_.op == "<":
return stix_obj_field < filter_.value
elif filter_.op == ">=":
return stix_obj_field >= filter_.value
elif filter_.op == "<=":
return stix_obj_field <= filter_.value
else:
return -1
def _id_filter(filter_, stix_obj_id):
"""base filter types"""
if filter_.op == "=":
return stix_obj_id == filter_.value
elif filter_.op == "!=":
return stix_obj_id != filter_.value
else:
return -1
def _boolean_filter(filter_, stix_obj_field):
if filter_.op == "=":
return stix_obj_field == filter_.value
elif filter_.op == "!=":
return stix_obj_field != filter_.value
else:
return -1
def _string_filter(filter_, stix_obj_field):
return _all_filter(filter_, stix_obj_field)
def _timestamp_filter(filter_, stix_obj_timestamp):
return _all_filter(filter_, stix_obj_timestamp)
# STIX 2.0 Common Property filters
# The naming of these functions is important as
# they are used to index a mapping dictionary from
# STIX common field names to these filter functions.
#
# REQUIRED naming scheme:
# "check_<STIX field name>_filter"
def check_created_filter(filter_, stix_obj):
return _timestamp_filter(filter_, stix_obj["created"])
def check_created_by_ref_filter(filter_, stix_obj):
return _id_filter(filter_, stix_obj["created_by_ref"])
def check_external_references_filter(filter_, stix_obj):
"""
STIX object's can have a list of external references
external_references properties:
external_references.source_name (string)
external_references.description (string)
external_references.url (string)
external_references.hashes (hash, but for filtering purposes, a string)
external_references.external_id (string)
"""
for er in stix_obj["external_references"]:
# grab er property name from filter field
filter_field = filter_.field.split(".")[1]
r = _string_filter(filter_, er[filter_field])
if r:
return r
return False
def check_granular_markings_filter(filter_, stix_obj):
"""
STIX object's can have a list of granular marking references
granular_markings properties:
granular_markings.marking_ref (id)
granular_markings.selectors (string)
"""
for gm in stix_obj["granular_markings"]:
# grab gm property name from filter field
filter_field = filter_.field.split(".")[1]
if filter_field == "marking_ref":
return _id_filter(filter_, gm[filter_field])
elif filter_field == "selectors":
for selector in gm[filter_field]:
r = _string_filter(filter_, selector)
if r:
return r
return False
def check_id_filter(filter_, stix_obj):
return _id_filter(filter_, stix_obj["id"])
def check_labels_filter(filter_, stix_obj):
for label in stix_obj["labels"]:
r = _string_filter(filter_, label)
if r:
return r
return False
def check_modified_filter(filter_, stix_obj):
return _timestamp_filter(filter_, stix_obj["modified"])
def check_object_marking_refs_filter(filter_, stix_obj):
for marking_id in stix_obj["object_marking_refs"]:
r = _id_filter(filter_, marking_id)
if r:
return r
return False
def check_revoked_filter(filter_, stix_obj):
return _boolean_filter(filter_, stix_obj["revoked"])
def check_type_filter(filter_, stix_obj):
return _string_filter(filter_, stix_obj["type"])
# Create mapping of field names to filter functions
for name, obj in dict(globals()).items():
if "check_" in name and isinstance(obj, types.FunctionType):
field_name = "_".join(name.split("_")[1:-1])
STIX_COMMON_FILTERS_MAP[field_name] = obj

View File

@ -24,7 +24,8 @@ import os
from stix2validator import validate_string
from stix2 import Bundle
from stix2.sources import DataSink, DataSource, DataStore, Filter
from stix2.sources import DataSink, DataSource, DataStore
from stix2.sources.filters import Filter
class MemoryStore(DataStore):
@ -205,9 +206,7 @@ class MemorySource(DataSource):
return stix_obj
# if there are filters from the composite level, process full query
query = [
Filter("id", "=", stix_id)
]
query = [Filter("id", "=", stix_id)]
all_data = self.query(query=query, _composite_filters=_composite_filters)
@ -220,21 +219,10 @@ class MemorySource(DataSource):
"""
Notes:
Since Memory sources/sinks don't handle multiple versions of a
STIX object, this operation is futile. Translate call to get().
(Approved by G.B.)
STIX object, this operation is unnecessary. Translate call to get().
"""
# query = [
# {
# "field": "id",
# "op": "=",
# "value": stix_id
# }
# ]
# all_data = self.query(query=query, _composite_filters=_composite_filters)
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]
def query(self, query=None, _composite_filters=None):

View File

@ -12,7 +12,8 @@ TODO: Test everything
import json
from stix2.sources import DataSink, DataSource, DataStore, Filter, make_id
from stix2.sources import DataSink, DataSource, DataStore, make_id
from stix2.sources.filters import Filter
TAXII_FILTERS = ['added_after', 'id', 'type', 'version']
@ -130,11 +131,7 @@ class TAXIICollectionSource(DataSource):
For instance - "?match[type]=indicator,sighting" should be in a
query dict as follows:
{
"field": "type"
"op": "=",
"value": "indicator,sighting"
}
Filter("type", "=", "indicator,sighting")
Args:
query (list): list of filters to extract which ones are TAXII

View File

@ -2,7 +2,8 @@ import pytest
from taxii2client import Collection
from stix2.sources import (CompositeDataSource, DataSink, DataSource,
DataStore, Filter, make_id, taxii)
DataStore, make_id, taxii)
from stix2.sources.filters import Filter
from stix2.sources.memory import MemorySource
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
@ -18,107 +19,110 @@ def collection():
return Collection(COLLECTION_URL, MockTAXIIClient())
STIX_OBJS1 = [
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.936Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
]
@pytest.fixture
def ds():
return DataSource()
STIX_OBJS2 = [
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-31T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
]
IND1 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND2 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND3 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.936Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND4 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND5 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND6 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-31T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND7 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND8 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
STIX_OBJS2 = [IND6, IND7, IND8]
STIX_OBJS1 = [IND1, IND2, IND3, IND4, IND5]
def test_ds_smoke():
@ -172,7 +176,7 @@ def test_parse_taxii_filters():
assert taxii_filters == expected_params
def test_add_get_remove_filter():
def test_add_get_remove_filter(ds):
# First 3 filters are valid, remaining fields are erroneous in some way
valid_filters = [
@ -186,8 +190,6 @@ def test_add_get_remove_filter():
Filter('created', '=', object()),
]
ds = DataSource()
assert len(ds.filters) == 0
ds.add_filter(valid_filters[0])
@ -225,7 +227,7 @@ def test_add_get_remove_filter():
ds.add_filters(valid_filters)
def test_apply_common_filters():
def test_apply_common_filters(ds):
stix_objs = [
{
"created": "2017-01-27T13:49:53.997Z",
@ -286,8 +288,6 @@ def test_apply_common_filters():
Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"),
]
ds = DataSource()
resp = ds.apply_common_filters(stix_objs, [filters[0]])
ids = [r['id'] for r in resp]
assert stix_objs[0]['id'] in ids
@ -327,61 +327,65 @@ def test_apply_common_filters():
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
# These are used with STIX_OBJS2
more_filters = [
Filter("modified", "<", "2017-01-28T13:49:53.935Z"),
Filter("modified", ">", "2017-01-28T13:49:53.935Z"),
Filter("modified", ">=", "2017-01-27T13:49:53.935Z"),
Filter("modified", "<=", "2017-01-27T13:49:53.935Z"),
Filter("modified", "?", "2017-01-27T13:49:53.935Z"),
Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"),
Filter("id", "?", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"),
Filter("notacommonproperty", "=", "bar"),
]
resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[0]])
def test_filters0(ds):
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")])
assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2
resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[1]])
def test_filters1(ds):
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")])
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1
resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[2]])
def test_filters2(ds):
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")])
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 3
resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[3]])
def test_filters3(ds):
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")])
assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2
def test_filters4(ds):
fltr4 = Filter("modified", "?", "2017-01-27T13:49:53.935Z")
with pytest.raises(ValueError) as excinfo:
ds.apply_common_filters(STIX_OBJS2, [more_filters[4]])
ds.apply_common_filters(STIX_OBJS2, [fltr4])
assert str(excinfo.value) == ("Error, filter operator: {0} not supported "
"for specified field: {1}").format(more_filters[4].op,
more_filters[4].field)
"for specified field: {1}").format(fltr4.op, fltr4.field)
resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[5]])
def test_filters5(ds):
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")])
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1
def test_filters6(ds):
fltr6 = Filter("id", "?", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")
with pytest.raises(ValueError) as excinfo:
ds.apply_common_filters(STIX_OBJS2, [more_filters[6]])
ds.apply_common_filters(STIX_OBJS2, [fltr6])
assert str(excinfo.value) == ("Error, filter operator: {0} not supported "
"for specified field: {1}").format(more_filters[6].op,
more_filters[6].field)
"for specified field: {1}").format(fltr6.op, fltr6.field)
def test_filters7(ds):
fltr7 = Filter("notacommonproperty", "=", "bar")
with pytest.raises(ValueError) as excinfo:
ds.apply_common_filters(STIX_OBJS2, [more_filters[7]])
ds.apply_common_filters(STIX_OBJS2, [fltr7])
assert str(excinfo.value) == ("Error, field: {0} is not supported for "
"filtering on.".format(more_filters[7].field))
"filtering on.".format(fltr7.field))
def test_deduplicate():
ds = DataSource()
def test_deduplicate(ds):
unique = ds.deduplicate(STIX_OBJS1)
# Only 3 objects are unique