Merge pull request #166 from oasis-open/139-dict-filter-value

139 dict filter value
stix2.0
Chris Lenk 2018-04-13 16:35:32 -04:00 committed by GitHub
commit f127d145c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 491 additions and 242 deletions

View File

@ -4,6 +4,7 @@
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true,
"nbsphinx": "hidden"
},
"outputs": [],
@ -24,6 +25,7 @@
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true,
"nbsphinx": "hidden"
},
"outputs": [],
@ -383,8 +385,10 @@
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import sys\n",
@ -415,7 +419,7 @@
},
{
"cell_type": "code",
"execution_count": 9,
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
@ -428,11 +432,11 @@
"fs.source.filters.add(f)\n",
"\n",
"# attach multiple filters to FileSystemStore\n",
"fs.source.filters.update([f1,f2])\n",
"fs.source.filters.add([f1,f2])\n",
"\n",
"# can also attach filters to a Source\n",
"# attach multiple filters to FileSystemSource\n",
"fs_source.filters.update([f3, f4])\n",
"fs_source.filters.add([f3, f4])\n",
"\n",
"\n",
"mem = MemoryStore()\n",
@ -442,7 +446,7 @@
"mem.source.filters.add(f)\n",
"\n",
"# attach multiple filters to a MemoryStore\n",
"mem.source.filters.update([f1,f2])"
"mem.source.filters.add([f1,f2])"
]
},
{
@ -457,7 +461,9 @@
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from stix2 import Campaign, Identity, Indicator, Malware, Relationship\n",
@ -719,21 +725,21 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"display_name": "cti-python-stix2",
"language": "python",
"name": "python3"
"name": "cti-python-stix2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.3"
"pygments_lexer": "ipython2",
"version": "2.7.12"
}
},
"nbformat": 4,

View File

@ -50,7 +50,7 @@ from .patterns import (AndBooleanExpression, AndObservationExpression,
ReferenceObjectPathComponent, RepeatQualifier,
StartStopQualifier, StringConstant, TimestampConstant,
WithinQualifier)
from .utils import get_dict, new_version, revoke
from .utils import new_version, revoke
from .v20 import * # This import will always be the latest STIX 2.X version
from .version import __version__

View File

@ -9,7 +9,7 @@ import stix2
from . import exceptions
from .base import _STIXBase
from .properties import IDProperty, ListProperty, Property, TypeProperty
from .utils import get_class_hierarchy_names, get_dict
from .utils import _get_dict, get_class_hierarchy_names
class STIXObjectProperty(Property):
@ -25,7 +25,7 @@ class STIXObjectProperty(Property):
for x in get_class_hierarchy_names(value)):
return value
try:
dictified = get_dict(value)
dictified = _get_dict(value)
except ValueError:
raise ValueError("This property may only contain a dictionary or object")
if dictified == {}:
@ -95,7 +95,7 @@ def parse(data, allow_custom=False, version=None):
"""
# convert STIX object to dict, if not already
obj = get_dict(data)
obj = _get_dict(data)
# convert dict to full python-stix2 obj
obj = dict_to_stix2(obj, allow_custom, version)

View File

@ -16,7 +16,7 @@ import uuid
from six import with_metaclass
from stix2.datastore.filters import Filter, _assemble_filters
from stix2.datastore.filters import Filter, FilterSet
from stix2.utils import deduplicate
@ -222,13 +222,13 @@ class DataSource(with_metaclass(ABCMeta)):
Attributes:
id (str): A unique UUIDv4 to identify this DataSource.
filters (set): A collection of filters attached to this DataSource.
filters (FilterSet): A collection of filters attached to this DataSource.
"""
def __init__(self):
super(DataSource, self).__init__()
self.id = make_id()
self.filters = set()
self.filters = FilterSet()
@abstractmethod
def get(self, stix_id):
@ -379,10 +379,10 @@ class DataSource(with_metaclass(ABCMeta)):
ids.discard(obj_id)
# Assemble filters
filter_list = _assemble_filters(filters)
filter_list = FilterSet(filters)
for i in ids:
results.extend(self.query(filter_list + [Filter('id', '=', i)]))
results.extend(self.query([f for f in filter_list] + [Filter('id', '=', i)]))
return results
@ -427,7 +427,7 @@ class CompositeDataSource(DataSource):
Args:
stix_id (str): the id of the STIX object to retrieve.
_composite_filters (list): a list of filters passed from a
_composite_filters (FilterSet): a collection of filters passed from a
CompositeDataSource (i.e. if this CompositeDataSource is attached
to another parent CompositeDataSource), not user supplied.
@ -439,11 +439,12 @@ class CompositeDataSource(DataSource):
raise AttributeError('CompositeDataSource has no data sources')
all_data = []
all_filters = set()
all_filters.update(self.filters)
all_filters = FilterSet()
all_filters.add(self.filters)
if _composite_filters:
all_filters.update(_composite_filters)
all_filters.add(_composite_filters)
# for every configured Data Source, call its retrieve handler
for ds in self.data_sources:
@ -473,7 +474,7 @@ class CompositeDataSource(DataSource):
Args:
stix_id (str): id of the STIX objects to retrieve.
_composite_filters (list): a list of filters passed from a
_composite_filters (FilterSet): a collection of filters passed from a
CompositeDataSource (i.e. if this CompositeDataSource is
attached to a parent CompositeDataSource), not user supplied.
@ -485,12 +486,12 @@ class CompositeDataSource(DataSource):
raise AttributeError('CompositeDataSource has no data sources')
all_data = []
all_filters = set()
all_filters = FilterSet()
all_filters.update(self.filters)
all_filters.add(self.filters)
if _composite_filters:
all_filters.update(_composite_filters)
all_filters.add(_composite_filters)
# retrieve STIX objects from all configured data sources
for ds in self.data_sources:
@ -512,7 +513,7 @@ class CompositeDataSource(DataSource):
Args:
query (list): list of filters to search on.
_composite_filters (list): a list of filters passed from a
_composite_filters (FilterSet): a collection of filters passed from a
CompositeDataSource (i.e. if this CompositeDataSource is
attached to a parent CompositeDataSource), not user supplied.
@ -524,17 +525,17 @@ class CompositeDataSource(DataSource):
raise AttributeError('CompositeDataSource has no data sources')
if not query:
# don't mess with the query (i.e. convert to a set, as that's done
# don't mess with the query (i.e. deduplicate, as that's done
# within the specific DataSources that are called)
query = []
all_data = []
all_filters = FilterSet()
all_filters = set()
all_filters.update(self.filters)
all_filters.add(self.filters)
if _composite_filters:
all_filters.update(_composite_filters)
all_filters.add(_composite_filters)
# federate query to all attached data sources,
# pass composite filters to id

View File

@ -8,7 +8,7 @@ import os
from stix2.core import Bundle, parse
from stix2.datastore import DataSink, DataSource, DataStoreMixin
from stix2.datastore.filters import Filter, apply_common_filters
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
from stix2.utils import deduplicate, get_class_hierarchy_names
@ -165,7 +165,7 @@ class FileSystemSource(DataSource):
Args:
stix_id (str): The STIX ID of the STIX object to be retrieved.
_composite_filters (set): set of filters passed from the parent
_composite_filters (FilterSet): collection of filters passed from the parent
CompositeDataSource, not user supplied
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
@ -195,7 +195,7 @@ class FileSystemSource(DataSource):
Args:
stix_id (str): The STIX ID of the STIX objects to be retrieved.
_composite_filters (set): set of filters passed from the parent
_composite_filters (FilterSet): collection of filters passed from the parent
CompositeDataSource, not user supplied
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
@ -217,7 +217,7 @@ class FileSystemSource(DataSource):
Args:
query (list): list of filters to search on
_composite_filters (set): set of filters passed from the
_composite_filters (FilterSet): collection of filters passed from the
CompositeDataSource, not user supplied
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
@ -231,20 +231,13 @@ class FileSystemSource(DataSource):
all_data = []
if query is None:
query = set()
else:
if not isinstance(query, list):
# make sure dont make set from a Filter object,
# need to make a set from a list of Filter objects (even if just one Filter)
query = [query]
query = set(query)
query = FilterSet(query)
# combine all query filters
if self.filters:
query.update(self.filters)
query.add(self.filters)
if _composite_filters:
query.update(_composite_filters)
query.add(_composite_filters)
# extract any filters that are for "type" or "id" , as we can then do
# filtering before reading in the STIX objects. A STIX 'type' filter
@ -343,8 +336,8 @@ class FileSystemSource(DataSource):
search space of a FileSystemStore (or FileSystemSink).
"""
file_filters = set()
file_filters = []
for filter_ in query:
if filter_.property == "id" or filter_.property == "type":
file_filters.add(filter_)
file_filters.append(filter_)
return file_filters

View File

@ -4,6 +4,9 @@ Filters for Python STIX 2.0 DataSources, DataSinks, DataStores
"""
import collections
from datetime import datetime
from stix2.utils import format_datetime
"""Supported filter operations"""
FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
@ -44,37 +47,6 @@ def _check_filter_components(prop, op, value):
return True
def _assemble_filters(filters1=None, filters2=None):
"""Assemble a list of filters.
This can be used to allow certain functions to work correctly no matter if
the user provides a single filter or a list of them.
Args:
filters1 (Filter or list, optional): The single Filter or list of Filters to
coerce into a list of Filters.
filters2 (Filter or list, optional): The single Filter or list of Filters to
append to the list of Filters.
Returns:
List of Filters.
"""
if filters1 is None:
filter_list = []
elif not isinstance(filters1, list):
filter_list = [filters1]
else:
filter_list = filters1
if isinstance(filters2, list):
filter_list.extend(filters2)
elif filters2 is not None:
filter_list.append(filters2)
return filter_list
class Filter(collections.namedtuple("Filter", ['property', 'op', 'value'])):
"""STIX 2 filters that support the querying functionality of STIX 2
DataStores and DataSources.
@ -97,6 +69,10 @@ class Filter(collections.namedtuple("Filter", ['property', 'op', 'value'])):
if isinstance(value, list):
value = tuple(value)
if isinstance(value, datetime):
# if value is a datetime obj, convert to str
value = format_datetime(value)
_check_filter_components(prop, op, value)
self = super(Filter, cls).__new__(cls, prop, op, value)
@ -112,6 +88,12 @@ class Filter(collections.namedtuple("Filter", ['property', 'op', 'value'])):
True if property matches the filter,
False otherwise.
"""
if isinstance(stix_obj_property, datetime):
# if a datetime obj, convert to str format before comparison
# NOTE: this check seems like it should be done upstream
# but will put here for now
stix_obj_property = format_datetime(stix_obj_property)
if self.op == "=":
return stix_obj_property == self.value
elif self.op == "!=":
@ -183,19 +165,94 @@ def _check_filter(filter_, stix_obj):
# Check embedded properties, from e.g. granular_markings or external_references
sub_property = filter_.property.split(".", 1)[1]
sub_filter = filter_._replace(property=sub_property)
if isinstance(stix_obj[prop], list):
for elem in stix_obj[prop]:
if _check_filter(sub_filter, elem) is True:
return True
return False
else:
return _check_filter(sub_filter, stix_obj[prop])
elif isinstance(stix_obj[prop], list):
# Check each item in list property to see if it matches
for elem in stix_obj[prop]:
if filter_._check_property(elem) is True:
return True
return False
else:
# Check if property matches
return filter_._check_property(stix_obj[prop])
class FilterSet(object):
"""Internal STIX2 class to facilitate the grouping of Filters
into sets. The primary motivation for this class came from the problem
that Filters that had a dict as a value could not be added to a Python
set as dicts are not hashable. Thus this class provides set functionality
but internally stores filters in a list.
"""
def __init__(self, filters=None):
"""
Args:
filters: see FilterSet.add()
"""
self._filters = []
if filters:
self.add(filters)
def __iter__(self):
"""provide iteration functionality of FilterSet"""
for f in self._filters:
yield f
def __len__(self):
"""provide built-in len() utility of FilterSet"""
return len(self._filters)
def add(self, filters=None):
"""add a Filter, FilterSet, or list of Filters to the FilterSet
Operates like set, only adding unique stix2.Filters to the FilterSet
NOTE: method designed to be very accomodating (i.e. even accepting filters=None)
as it allows for blind calls (very useful in DataStore)
Args:
filters: stix2.Filter OR list of stix2.Filter OR stix2.FilterSet
"""
if not filters:
# so add() can be called blindly, useful for
# DataStore/Environment usage of filter operations
return
if not isinstance(filters, (FilterSet, list)):
filters = [filters]
for f in filters:
if f not in self._filters:
self._filters.append(f)
def remove(self, filters=None):
"""remove a Filter, list of Filters, or FilterSet from the FilterSet
NOTE: method designed to be very accomodating (i.e. even accepting filters=None)
as it allows for blind calls (very useful in DataStore)
Args:
filters: stix2.Filter OR list of stix2.Filter or stix2.FilterSet
"""
if not filters:
# so remove() can be called blindly, useful for
# DataStore/Environemnt usage of filter ops
return
if not isinstance(filters, (FilterSet, list)):
filters = [filters]
for f in filters:
self._filters.remove(f)

View File

@ -18,7 +18,7 @@ import os
from stix2.base import _STIXBase
from stix2.core import Bundle, parse
from stix2.datastore import DataSink, DataSource, DataStoreMixin
from stix2.datastore.filters import Filter, apply_common_filters
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
def _add(store, stix_data=None, version=None):
@ -197,7 +197,7 @@ class MemorySource(DataSource):
Args:
stix_id (str): The STIX ID of the STIX object to be retrieved.
_composite_filters (set): set of filters passed from the parent
_composite_filters (FilterSet): collection of filters passed from the parent
CompositeDataSource, not user supplied
Returns:
@ -236,7 +236,7 @@ class MemorySource(DataSource):
Args:
stix_id (str): The STIX ID of the STIX 2 object to retrieve.
_composite_filters (set): set of filters passed from the parent
_composite_filters (FilterSet): collection of filters passed from the parent
CompositeDataSource, not user supplied
Returns:
@ -258,7 +258,7 @@ class MemorySource(DataSource):
Args:
query (list): list of filters to search on
_composite_filters (set): set of filters passed from the
_composite_filters (FilterSet): collection of filters passed from the
CompositeDataSource, not user supplied
Returns:
@ -269,19 +269,15 @@ class MemorySource(DataSource):
"""
if query is None:
query = set()
query = FilterSet()
else:
if not isinstance(query, list):
# make sure don't make set from a Filter object,
# need to make a set from a list of Filter objects (even if just one Filter)
query = [query]
query = set(query)
query = FilterSet(query)
# combine all query filters
if self.filters:
query.update(self.filters)
query.add(self.filters)
if _composite_filters:
query.update(_composite_filters)
query.add(_composite_filters)
# Apply STIX common property filters.
all_data = list(apply_common_filters(self._data.values(), query))

View File

@ -6,7 +6,7 @@ from requests.exceptions import HTTPError
from stix2.base import _STIXBase
from stix2.core import Bundle, parse
from stix2.datastore import DataSink, DataSource, DataStoreMixin
from stix2.datastore.filters import Filter, apply_common_filters
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
from stix2.utils import deduplicate
TAXII_FILTERS = ['added_after', 'id', 'type', 'version']
@ -120,7 +120,7 @@ class TAXIICollectionSource(DataSource):
Args:
stix_id (str): The STIX ID of the STIX object to be retrieved.
_composite_filters (set): set of filters passed from the parent
_composite_filters (FilterSet): collection of filters passed from the parent
CompositeDataSource, not user supplied
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
@ -132,11 +132,12 @@ class TAXIICollectionSource(DataSource):
"""
# combine all query filters
query = set()
query = FilterSet()
if self.filters:
query.update(self.filters)
query.add(self.filters)
if _composite_filters:
query.update(_composite_filters)
query.add(_composite_filters)
# dont extract TAXII filters from query (to send to TAXII endpoint)
# as directly retrieveing a STIX object by ID
@ -164,7 +165,7 @@ class TAXIICollectionSource(DataSource):
Args:
stix_id (str): The STIX ID of the STIX objects to be retrieved.
_composite_filters (set): set of filters passed from the parent
_composite_filters (FilterSet): collection of filters passed from the parent
CompositeDataSource, not user supplied
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
@ -198,7 +199,7 @@ class TAXIICollectionSource(DataSource):
Args:
query (list): list of filters to search on
_composite_filters (set): set of filters passed from the
_composite_filters (FilterSet): collection of filters passed from the
CompositeDataSource, not user supplied
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
@ -209,20 +210,13 @@ class TAXIICollectionSource(DataSource):
parsed into python STIX objects and then returned.
"""
if query is None:
query = set()
else:
if not isinstance(query, list):
# make sure dont make set from a Filter object,
# need to make a set from a list of Filter objects (even if just one Filter)
query = [query]
query = set(query)
query = FilterSet(query)
# combine all query filters
if self.filters:
query.update(self.filters)
query.add(self.filters)
if _composite_filters:
query.update(_composite_filters)
query.add(_composite_filters)
# parse taxii query params (that can be applied remotely)
taxii_filters = self._parse_taxii_filters(query)
@ -268,17 +262,16 @@ class TAXIICollectionSource(DataSource):
Args:
query (set): set of filters to extract which ones are TAXII
query (list): list of filters to extract which ones are TAXII
specific.
Returns:
taxii_filters (set): set of the TAXII filters
Returns: a list of the TAXII filters
"""
taxii_filters = set()
taxii_filters = []
for filter_ in query:
if filter_.property in TAXII_FILTERS:
taxii_filters.add(filter_)
taxii_filters.append(filter_)
return taxii_filters

View File

@ -158,7 +158,7 @@ class Environment(DataStoreMixin):
set_default_object_marking_refs.__doc__ = ObjectFactory.set_default_object_marking_refs.__doc__
def add_filters(self, *args, **kwargs):
return self.source.filters.update(*args, **kwargs)
return self.source.filters.add(*args, **kwargs)
def add_filter(self, *args, **kwargs):
return self.source.filters.add(*args, **kwargs)

View File

@ -12,7 +12,7 @@ from stix2patterns.validator import run_validator
from .base import _STIXBase
from .exceptions import DictionaryKeyError
from .utils import get_dict, parse_into_datetime
from .utils import _get_dict, parse_into_datetime
class Property(object):
@ -234,7 +234,7 @@ class DictionaryProperty(Property):
def clean(self, value):
try:
dictified = get_dict(value)
dictified = _get_dict(value)
except ValueError:
raise ValueError("The dictionary property must contain a dictionary")
if dictified == {}:

View File

@ -2,10 +2,11 @@ import pytest
from taxii2client import Collection
from stix2 import Filter, MemorySink, MemorySource
from stix2.core import parse
from stix2.datastore import (CompositeDataSource, DataSink, DataSource,
make_id, taxii)
from stix2.datastore.filters import _assemble_filters, apply_common_filters
from stix2.utils import deduplicate
from stix2.datastore.filters import apply_common_filters
from stix2.utils import STIXdatetime, deduplicate, parse_into_datetime
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
@ -20,6 +21,108 @@ def collection():
return Collection(COLLECTION_URL, MockTAXIIClient())
stix_objs = [
{
"created": "2017-01-27T13:49:53.997Z",
"description": "\n\nTITLE:\n\tPoison Ivy",
"id": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111",
"labels": [
"remote-access-trojan"
],
"modified": "2017-01-27T13:49:53.997Z",
"name": "Poison Ivy",
"type": "malware"
},
{
"created": "2014-05-08T09:00:00.000Z",
"id": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade",
"labels": [
"file-hash-watchlist"
],
"modified": "2014-05-08T09:00:00.000Z",
"name": "File hash for Poison Ivy variant",
"pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']",
"type": "indicator",
"valid_from": "2014-05-08T09:00:00.000000Z"
},
{
"created": "2014-05-08T09:00:00.000Z",
"granular_markings": [
{
"marking_ref": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed",
"selectors": [
"relationship_type"
]
}
],
"id": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463",
"modified": "2014-05-08T09:00:00.000Z",
"object_marking_refs": [
"marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
],
"relationship_type": "indicates",
"revoked": True,
"source_ref": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade",
"target_ref": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111",
"type": "relationship"
},
{
"id": "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef",
"created": "2016-02-14T00:00:00.000Z",
"created_by_ref": "identity--00000000-0000-0000-0000-b8e91df99dc9",
"modified": "2016-02-14T00:00:00.000Z",
"type": "vulnerability",
"name": "CVE-2014-0160",
"description": "The (1) TLS...",
"external_references": [
{
"source_name": "cve",
"external_id": "CVE-2014-0160"
}
],
"labels": ["heartbleed", "has-logo"]
},
{
"type": "observed-data",
"id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"created": "2016-04-06T19:58:16.000Z",
"modified": "2016-04-06T19:58:16.000Z",
"first_observed": "2015-12-21T19:00:00Z",
"last_observed": "2015-12-21T19:00:00Z",
"number_observed": 1,
"objects": {
"0": {
"type": "file",
"name": "HAL 9000.exe"
}
}
}
]
# same as above objects but converted to real Python STIX2 objects
# to test filters against true Python STIX2 objects
real_stix_objs = [parse(stix_obj) for stix_obj in stix_objs]
filters = [
Filter("type", "!=", "relationship"),
Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"),
Filter("labels", "in", "remote-access-trojan"),
Filter("created", ">", "2015-01-01T01:00:00.000Z"),
Filter("revoked", "=", True),
Filter("revoked", "!=", True),
Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"),
Filter("granular_markings.selectors", "in", "relationship_type"),
Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"),
Filter("external_references.external_id", "in", "CVE-2014-0160,CVE-2017-6608"),
Filter("created_by_ref", "=", "identity--00000000-0000-0000-0000-b8e91df99dc9"),
Filter("object_marking_refs", "=", "marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9"),
Filter("granular_markings.selectors", "in", "description"),
Filter("external_references.source_name", "=", "CVE"),
Filter("objects", "=", {"0": {"type": "file", "name": "HAL 9000.exe"}})
]
IND1 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
@ -120,6 +223,9 @@ IND8 = {
STIX_OBJS2 = [IND6, IND7, IND8]
STIX_OBJS1 = [IND1, IND2, IND3, IND4, IND5]
REAL_STIX_OBJS2 = [parse(IND6), parse(IND7), parse(IND8)]
REAL_STIX_OBJS1 = [parse(IND1), parse(IND2), parse(IND3), parse(IND4), parse(IND5)]
def test_ds_abstract_class_smoke():
with pytest.raises(TypeError):
@ -148,12 +254,12 @@ def test_parse_taxii_filters():
Filter("created_by_ref", "=", "Bane"),
]
taxii_filters_expected = set([
taxii_filters_expected = [
Filter("added_after", "=", "2016-02-01T00:00:01.000Z"),
Filter("id", "=", "taxii stix object ID"),
Filter("type", "=", "taxii stix object ID"),
Filter("version", "=", "first")
])
]
ds = taxii.TAXIICollectionSource(collection)
@ -177,7 +283,7 @@ def test_add_get_remove_filter():
ds.filters.add(valid_filters[0])
assert len(ds.filters) == 1
# Addin the same filter again will have no effect since `filters` uses a set
# Addin the same filter again will have no effect since `filters` acts like a set
ds.filters.add(valid_filters[0])
assert len(ds.filters) == 1
@ -186,14 +292,14 @@ def test_add_get_remove_filter():
ds.filters.add(valid_filters[2])
assert len(ds.filters) == 3
assert set(valid_filters) == ds.filters
assert valid_filters == [f for f in ds.filters]
# remove
ds.filters.remove(valid_filters[0])
assert len(ds.filters) == 2
ds.filters.update(valid_filters)
ds.filters.add(valid_filters)
def test_filter_ops_check():
@ -236,153 +342,199 @@ def test_filter_type_underscore_check():
assert "Filter for property 'type' cannot have its value 'oh_underscore'" in str(excinfo.value)
def test_apply_common_filters():
stix_objs = [
{
"created": "2017-01-27T13:49:53.997Z",
"description": "\n\nTITLE:\n\tPoison Ivy",
"id": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111",
"labels": [
"remote-access-trojan"
],
"modified": "2017-01-27T13:49:53.997Z",
"name": "Poison Ivy",
"type": "malware"
},
{
"created": "2014-05-08T09:00:00.000Z",
"id": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade",
"labels": [
"file-hash-watchlist"
],
"modified": "2014-05-08T09:00:00.000Z",
"name": "File hash for Poison Ivy variant",
"pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']",
"type": "indicator",
"valid_from": "2014-05-08T09:00:00.000000Z"
},
{
"created": "2014-05-08T09:00:00.000Z",
"granular_markings": [
{
"marking_ref": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed",
"selectors": [
"relationship_type"
]
}
],
"id": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463",
"modified": "2014-05-08T09:00:00.000Z",
"object_marking_refs": [
"marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
],
"relationship_type": "indicates",
"revoked": True,
"source_ref": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade",
"target_ref": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111",
"type": "relationship"
},
{
"id": "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef",
"created": "2016-02-14T00:00:00.000Z",
"created_by_ref": "identity--00000000-0000-0000-0000-b8e91df99dc9",
"modified": "2016-02-14T00:00:00.000Z",
"type": "vulnerability",
"name": "CVE-2014-0160",
"description": "The (1) TLS...",
"external_references": [
{
"source_name": "cve",
"external_id": "CVE-2014-0160"
}
],
"labels": ["heartbleed", "has-logo"]
}
]
filters = [
Filter("type", "!=", "relationship"),
Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"),
Filter("labels", "in", "remote-access-trojan"),
Filter("created", ">", "2015-01-01T01:00:00.000Z"),
Filter("revoked", "=", True),
Filter("revoked", "!=", True),
Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"),
Filter("granular_markings.selectors", "in", "relationship_type"),
Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"),
Filter("external_references.external_id", "in", "CVE-2014-0160,CVE-2017-6608"),
Filter("created_by_ref", "=", "identity--00000000-0000-0000-0000-b8e91df99dc9"),
Filter("object_marking_refs", "=", "marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9"),
Filter("granular_markings.selectors", "in", "description"),
Filter("external_references.source_name", "=", "CVE"),
]
def test_apply_common_filters0():
# "Return any object whose type is not relationship"
resp = list(apply_common_filters(stix_objs, [filters[0]]))
ids = [r['id'] for r in resp]
assert stix_objs[0]['id'] in ids
assert stix_objs[1]['id'] in ids
assert stix_objs[3]['id'] in ids
assert len(ids) == 3
assert len(ids) == 4
resp = list(apply_common_filters(real_stix_objs, [filters[0]]))
ids = [r.id for r in resp]
assert real_stix_objs[0].id in ids
assert real_stix_objs[1].id in ids
assert real_stix_objs[3].id in ids
assert len(ids) == 4
def test_apply_common_filters1():
# "Return any object that matched id relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"
resp = list(apply_common_filters(stix_objs, [filters[1]]))
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
resp = list(apply_common_filters(real_stix_objs, [filters[1]]))
assert resp[0].id == real_stix_objs[2].id
assert len(resp) == 1
def test_apply_common_filters2():
# "Return any object that contains remote-access-trojan in labels"
resp = list(apply_common_filters(stix_objs, [filters[2]]))
assert resp[0]['id'] == stix_objs[0]['id']
assert len(resp) == 1
resp = list(apply_common_filters(real_stix_objs, [filters[2]]))
assert resp[0].id == real_stix_objs[0].id
assert len(resp) == 1
def test_apply_common_filters3():
# "Return any object created after 2015-01-01T01:00:00.000Z"
resp = list(apply_common_filters(stix_objs, [filters[3]]))
assert resp[0]['id'] == stix_objs[0]['id']
assert len(resp) == 2
assert len(resp) == 3
resp = list(apply_common_filters(real_stix_objs, [filters[3]]))
assert resp[0].id == real_stix_objs[0].id
assert len(resp) == 3
def test_apply_common_filters4():
# "Return any revoked object"
resp = list(apply_common_filters(stix_objs, [filters[4]]))
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
resp = list(apply_common_filters(real_stix_objs, [filters[4]]))
assert resp[0].id == real_stix_objs[2].id
assert len(resp) == 1
def test_apply_common_filters5():
# "Return any object whose not revoked"
# Note that if 'revoked' property is not present in object.
# Currently we can't use such an expression to filter for... :(
resp = list(apply_common_filters(stix_objs, [filters[5]]))
assert len(resp) == 0
resp = list(apply_common_filters(real_stix_objs, [filters[5]]))
assert len(resp) == 0
def test_apply_common_filters6():
# "Return any object that matches marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9 in object_marking_refs"
resp = list(apply_common_filters(stix_objs, [filters[6]]))
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
resp = list(apply_common_filters(real_stix_objs, [filters[6]]))
assert resp[0].id == real_stix_objs[2].id
assert len(resp) == 1
def test_apply_common_filters7():
# "Return any object that contains relationship_type in their selectors AND
# also has marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed in marking_ref"
resp = list(apply_common_filters(stix_objs, [filters[7], filters[8]]))
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
resp = list(apply_common_filters(real_stix_objs, [filters[7], filters[8]]))
assert resp[0].id == real_stix_objs[2].id
assert len(resp) == 1
def test_apply_common_filters8():
# "Return any object that contains CVE-2014-0160,CVE-2017-6608 in their external_id"
resp = list(apply_common_filters(stix_objs, [filters[9]]))
assert resp[0]['id'] == stix_objs[3]['id']
assert len(resp) == 1
resp = list(apply_common_filters(real_stix_objs, [filters[9]]))
assert resp[0].id == real_stix_objs[3].id
assert len(resp) == 1
def test_apply_common_filters9():
# "Return any object that matches created_by_ref identity--00000000-0000-0000-0000-b8e91df99dc9"
resp = list(apply_common_filters(stix_objs, [filters[10]]))
assert len(resp) == 1
resp = list(apply_common_filters(real_stix_objs, [filters[10]]))
assert len(resp) == 1
def test_apply_common_filters10():
# "Return any object that matches marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9 in object_marking_refs" (None)
resp = list(apply_common_filters(stix_objs, [filters[11]]))
assert len(resp) == 0
resp = list(apply_common_filters(real_stix_objs, [filters[11]]))
assert len(resp) == 0
def test_apply_common_filters11():
# "Return any object that contains description in its selectors" (None)
resp = list(apply_common_filters(stix_objs, [filters[12]]))
assert len(resp) == 0
# "Return any object that object that matches CVE in source_name" (None, case sensitive)
resp = list(apply_common_filters(real_stix_objs, [filters[12]]))
assert len(resp) == 0
def test_apply_common_filters12():
# "Return any object that matches CVE in source_name" (None, case sensitive)
resp = list(apply_common_filters(stix_objs, [filters[13]]))
assert len(resp) == 0
resp = list(apply_common_filters(real_stix_objs, [filters[13]]))
assert len(resp) == 0
def test_apply_common_filters13():
# Return any object that matches file object in "objects"
resp = list(apply_common_filters(stix_objs, [filters[14]]))
assert resp[0]["id"] == stix_objs[4]["id"]
assert len(resp) == 1
# important additional check to make sure original File dict was
# not converted to File object. (this was a deep bug found)
assert isinstance(resp[0]["objects"]["0"], dict)
resp = list(apply_common_filters(real_stix_objs, [filters[14]]))
assert resp[0].id == real_stix_objs[4].id
assert len(resp) == 1
def test_datetime_filter_behavior():
"""if a filter is initialized with its value being a datetime object
OR the STIX object property being filtered on is a datetime object, all
resulting comparisons executed are done on the string representations
of the datetime objects, as the Filter functionality will convert
all datetime objects to there string forms using format_datetim()
This test makes sure all datetime comparisons are carried out correctly
"""
filter_with_dt_obj = Filter("created", "=", parse_into_datetime("2016-02-14T00:00:00.000Z", "millisecond"))
filter_with_str = Filter("created", "=", "2016-02-14T00:00:00.000Z")
# check that filter value is converted from datetime to str
assert isinstance(filter_with_dt_obj.value, str)
# compare datetime string to filter w/ datetime obj
resp = list(apply_common_filters(stix_objs, [filter_with_dt_obj]))
assert len(resp) == 1
assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
# compare datetime obj to filter w/ datetime obj
resp = list(apply_common_filters(real_stix_objs, [filter_with_dt_obj]))
assert len(resp) == 1
assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
assert isinstance(resp[0].created, STIXdatetime) # make sure original object not altered
# compare datetime string to filter w/ str
resp = list(apply_common_filters(stix_objs, [filter_with_str]))
assert len(resp) == 1
assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
# compare datetime obj to filter w/ str
resp = list(apply_common_filters(real_stix_objs, [filter_with_str]))
assert len(resp) == 1
assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
assert isinstance(resp[0].created, STIXdatetime) # make sure original object not altered
def test_filters0():
# "Return any object modified before 2017-01-28T13:49:53.935Z"
@ -390,6 +542,10 @@ def test_filters0():
assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2
resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("modified", "<", parse_into_datetime("2017-01-28T13:49:53.935Z"))]))
assert resp[0].id == REAL_STIX_OBJS2[1].id
assert len(resp) == 2
def test_filters1():
# "Return any object modified after 2017-01-28T13:49:53.935Z"
@ -397,6 +553,10 @@ def test_filters1():
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1
resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("modified", ">", parse_into_datetime("2017-01-28T13:49:53.935Z"))]))
assert resp[0].id == REAL_STIX_OBJS2[0].id
assert len(resp) == 1
def test_filters2():
# "Return any object modified after or on 2017-01-28T13:49:53.935Z"
@ -404,6 +564,10 @@ def test_filters2():
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 3
resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("modified", ">=", parse_into_datetime("2017-01-27T13:49:53.935Z"))]))
assert resp[0].id == REAL_STIX_OBJS2[0].id
assert len(resp) == 3
def test_filters3():
# "Return any object modified before or on 2017-01-28T13:49:53.935Z"
@ -411,6 +575,12 @@ def test_filters3():
assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2
# "Return any object modified before or on 2017-01-28T13:49:53.935Z"
fv = Filter("modified", "<=", parse_into_datetime("2017-01-27T13:49:53.935Z"))
resp = list(apply_common_filters(REAL_STIX_OBJS2, [fv]))
assert resp[0].id == REAL_STIX_OBJS2[1].id
assert len(resp) == 2
def test_filters4():
# Assert invalid Filter cannot be created
@ -426,6 +596,10 @@ def test_filters5():
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1
resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")]))
assert resp[0].id == REAL_STIX_OBJS2[0].id
assert len(resp) == 1
def test_filters6():
# Test filtering on non-common property
@ -433,10 +607,14 @@ def test_filters6():
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 3
resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("name", "=", "Malicious site hosting downloader")]))
assert resp[0].id == REAL_STIX_OBJS2[0].id
assert len(resp) == 3
def test_filters7():
# Test filtering on embedded property
stix_objects = list(STIX_OBJS2) + [{
obsvd_data_obj = {
"type": "observed-data",
"id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
@ -467,19 +645,18 @@ def test_filters7():
}
}
}
}]
}
stix_objects = list(STIX_OBJS2) + [obsvd_data_obj]
real_stix_objects = list(REAL_STIX_OBJS2) + [parse(obsvd_data_obj)]
resp = list(apply_common_filters(stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")]))
assert resp[0]['id'] == stix_objects[3]['id']
assert len(resp) == 1
def test_assemble_filters():
filter1 = Filter("name", "=", "Malicious site hosting downloader")
filter2 = Filter("modified", ">", "2017-01-28T13:49:53.935Z")
result = _assemble_filters(filter1, filter2)
assert len(result) == 2
assert result[0].property == 'name'
assert result[1].property == 'modified'
resp = list(apply_common_filters(real_stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")]))
assert resp[0].id == real_stix_objects[3].id
assert len(resp) == 1
def test_deduplicate():
@ -557,7 +734,7 @@ def test_composite_datasource_operations():
Filter("valid_from", "=", "2017-01-27T13:49:53.935382Z")
]
cds1.filters.update(query2)
cds1.filters.add(query2)
results = cds1.query(query1)

View File

@ -62,7 +62,7 @@ def test_parse_datetime_invalid(ts):
[("a", 1,)],
])
def test_get_dict(data):
assert stix2.utils.get_dict(data)
assert stix2.utils._get_dict(data)
@pytest.mark.parametrize('data', [
@ -73,7 +73,7 @@ def test_get_dict(data):
])
def test_get_dict_invalid(data):
with pytest.raises(ValueError):
stix2.utils.get_dict(data)
stix2.utils._get_dict(data)
@pytest.mark.parametrize('stix_id, typ', [

View File

@ -140,7 +140,7 @@ def parse_into_datetime(value, precision=None):
return STIXdatetime(ts, precision=precision)
def get_dict(data):
def _get_dict(data):
"""Return data as a dictionary.
Input can be a dictionary, string, or file-like object.

View File

@ -7,7 +7,7 @@ from ..markings import _MarkingsMixin
from ..properties import (HashesProperty, IDProperty, ListProperty, Property,
ReferenceProperty, SelectorProperty, StringProperty,
TimestampProperty, TypeProperty)
from ..utils import NOW, get_dict
from ..utils import NOW, _get_dict
class ExternalReference(_STIXBase):
@ -125,7 +125,7 @@ class MarkingDefinition(_STIXBase, _MarkingsMixin):
raise ValueError("definition_type must be a valid marking type")
if not isinstance(kwargs['definition'], marking_type):
defn = get_dict(kwargs['definition'])
defn = _get_dict(kwargs['definition'])
kwargs['definition'] = marking_type(**defn)
super(MarkingDefinition, self).__init__(**kwargs)

View File

@ -6,6 +6,7 @@ Observable and do not have a ``_type`` attribute.
"""
from collections import OrderedDict
import copy
from ..base import _Extension, _Observable, _STIXBase
from ..exceptions import (AtLeastOnePropertyError, CustomContentError,
@ -15,7 +16,7 @@ from ..properties import (BinaryProperty, BooleanProperty, DictionaryProperty,
HashesProperty, HexProperty, IntegerProperty,
ListProperty, ObjectReferenceProperty, Property,
StringProperty, TimestampProperty, TypeProperty)
from ..utils import get_dict
from ..utils import _get_dict
class ObservableProperty(Property):
@ -24,7 +25,11 @@ class ObservableProperty(Property):
def clean(self, value):
try:
dictified = get_dict(value)
dictified = _get_dict(value)
# get deep copy since we are going modify the dict and might
# modify the original dict as _get_dict() does not return new
# dict when passed a dict
dictified = copy.deepcopy(dictified)
except ValueError:
raise ValueError("The observable property must contain a dictionary")
if dictified == {}:
@ -49,7 +54,11 @@ class ExtensionsProperty(DictionaryProperty):
def clean(self, value):
try:
dictified = get_dict(value)
dictified = _get_dict(value)
# get deep copy since we are going modify the dict and might
# modify the original dict as _get_dict() does not return new
# dict when passed a dict
dictified = copy.deepcopy(dictified)
except ValueError:
raise ValueError("The extensions property must contain a dictionary")
if dictified == {}:
@ -915,7 +924,12 @@ def parse_observable(data, _valid_refs=None, allow_custom=False):
An instantiated Python STIX Cyber Observable object.
"""
obj = get_dict(data)
obj = _get_dict(data)
# get deep copy since we are going modify the dict and might
# modify the original dict as _get_dict() does not return new
# dict when passed a dict
obj = copy.deepcopy(obj)
obj['_valid_refs'] = _valid_refs or []
if 'type' not in obj:

View File

@ -48,7 +48,7 @@ from . import (AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, # n
WindowsPEOptionalHeaderType, WindowsPESection,
WindowsProcessExt, WindowsRegistryKey, WindowsRegistryValueType,
WindowsServiceExt, X509Certificate, X509V3ExtenstionsType)
from .datastore.filters import _assemble_filters
from .datastore.filters import FilterSet
# Use an implicit MemoryStore
_environ = Environment(store=MemoryStore())
@ -156,7 +156,8 @@ def attack_patterns(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'attack-pattern')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'attack-pattern'))
return query(filter_list)
@ -168,7 +169,8 @@ def campaigns(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'campaign')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'campaign'))
return query(filter_list)
@ -180,7 +182,8 @@ def courses_of_action(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'course-of-action')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'course-of-action'))
return query(filter_list)
@ -192,7 +195,8 @@ def identities(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'identity')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'identity'))
return query(filter_list)
@ -204,7 +208,8 @@ def indicators(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'indicator')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'indicator'))
return query(filter_list)
@ -216,7 +221,8 @@ def intrusion_sets(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'intrusion-set')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'intrusion-set'))
return query(filter_list)
@ -228,7 +234,8 @@ def malware(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'malware')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'malware'))
return query(filter_list)
@ -240,7 +247,8 @@ def observed_data(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'observed-data')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'observed-data'))
return query(filter_list)
@ -252,7 +260,8 @@ def reports(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'report')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'report'))
return query(filter_list)
@ -264,7 +273,8 @@ def threat_actors(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'threat-actor')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'threat-actor'))
return query(filter_list)
@ -276,7 +286,8 @@ def tools(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'tool')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'tool'))
return query(filter_list)
@ -288,5 +299,6 @@ def vulnerabilities(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'vulnerability')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'vulnerability'))
return query(filter_list)