WIP - finding more issues with allowing dicts as filters
parent
27647091a5
commit
ba6fa595c6
|
@ -5,11 +5,13 @@ Filters for Python STIX 2.0 DataSources, DataSinks, DataStores
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
|
|
||||||
|
from stix2.utils import STIXdatetime
|
||||||
|
|
||||||
"""Supported filter operations"""
|
"""Supported filter operations"""
|
||||||
FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
|
FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
|
||||||
|
|
||||||
"""Supported filter value types"""
|
"""Supported filter value types"""
|
||||||
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple]
|
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple, STIXdatetime]
|
||||||
try:
|
try:
|
||||||
FILTER_VALUE_TYPES.append(unicode)
|
FILTER_VALUE_TYPES.append(unicode)
|
||||||
except NameError:
|
except NameError:
|
||||||
|
@ -169,19 +171,26 @@ def _check_filter(filter_, stix_obj):
|
||||||
# Check embedded properties, from e.g. granular_markings or external_references
|
# Check embedded properties, from e.g. granular_markings or external_references
|
||||||
sub_property = filter_.property.split(".", 1)[1]
|
sub_property = filter_.property.split(".", 1)[1]
|
||||||
sub_filter = filter_._replace(property=sub_property)
|
sub_filter = filter_._replace(property=sub_property)
|
||||||
|
|
||||||
if isinstance(stix_obj[prop], list):
|
if isinstance(stix_obj[prop], list):
|
||||||
for elem in stix_obj[prop]:
|
for elem in stix_obj[prop]:
|
||||||
if _check_filter(sub_filter, elem) is True:
|
if _check_filter(sub_filter, elem) is True:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
elif isinstance(stix_obj[prop], dict):
|
||||||
|
return _check_filter(sub_filter, stix_obj[prop])
|
||||||
|
|
||||||
else:
|
else:
|
||||||
return _check_filter(sub_filter, stix_obj[prop])
|
return _check_filter(sub_filter, stix_obj[prop])
|
||||||
|
|
||||||
elif isinstance(stix_obj[prop], list):
|
elif isinstance(stix_obj[prop], list):
|
||||||
# Check each item in list property to see if it matches
|
# Check each item in list property to see if it matches
|
||||||
for elem in stix_obj[prop]:
|
for elem in stix_obj[prop]:
|
||||||
if filter_._check_property(elem) is True:
|
if filter_._check_property(elem) is True:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# Check if property matches
|
# Check if property matches
|
||||||
return filter_._check_property(stix_obj[prop])
|
return filter_._check_property(stix_obj[prop])
|
||||||
|
@ -201,6 +210,10 @@ class FilterSet(object):
|
||||||
for f in self._filters:
|
for f in self._filters:
|
||||||
yield f
|
yield f
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
""" """
|
||||||
|
return len(self._filters)
|
||||||
|
|
||||||
def add(self, filters):
|
def add(self, filters):
|
||||||
""" """
|
""" """
|
||||||
if not isinstance(filters, FilterSet) and not isinstance(filters, list):
|
if not isinstance(filters, FilterSet) and not isinstance(filters, list):
|
||||||
|
|
|
@ -115,7 +115,7 @@ class Environment(DataStoreMixin):
|
||||||
|
|
||||||
def add_filters(self, *args, **kwargs):
|
def add_filters(self, *args, **kwargs):
|
||||||
try:
|
try:
|
||||||
return self.source.filters.update(*args, **kwargs)
|
return self.source.filters.add(*args, **kwargs)
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
raise AttributeError('Environment has no data source')
|
raise AttributeError('Environment has no data source')
|
||||||
|
|
||||||
|
|
|
@ -2,10 +2,11 @@ import pytest
|
||||||
from taxii2client import Collection
|
from taxii2client import Collection
|
||||||
|
|
||||||
from stix2 import Filter, MemorySink, MemorySource
|
from stix2 import Filter, MemorySink, MemorySource
|
||||||
|
from stix2.core import parse
|
||||||
from stix2.datastore import (CompositeDataSource, DataSink, DataSource,
|
from stix2.datastore import (CompositeDataSource, DataSink, DataSource,
|
||||||
make_id, taxii)
|
make_id, taxii)
|
||||||
from stix2.datastore.filters import apply_common_filters
|
from stix2.datastore.filters import apply_common_filters
|
||||||
from stix2.utils import deduplicate
|
from stix2.utils import deduplicate, parse_into_datetime
|
||||||
|
|
||||||
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
|
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
|
||||||
|
|
||||||
|
@ -120,6 +121,9 @@ IND8 = {
|
||||||
STIX_OBJS2 = [IND6, IND7, IND8]
|
STIX_OBJS2 = [IND6, IND7, IND8]
|
||||||
STIX_OBJS1 = [IND1, IND2, IND3, IND4, IND5]
|
STIX_OBJS1 = [IND1, IND2, IND3, IND4, IND5]
|
||||||
|
|
||||||
|
REAL_STIX_OBJS2 = [parse(IND6), parse(IND7), parse(IND8)]
|
||||||
|
REAL_STIX_OBJS1 = [parse(IND1), parse(IND2), parse(IND3), parse(IND4), parse(IND5)]
|
||||||
|
|
||||||
|
|
||||||
def test_ds_abstract_class_smoke():
|
def test_ds_abstract_class_smoke():
|
||||||
with pytest.raises(TypeError):
|
with pytest.raises(TypeError):
|
||||||
|
@ -148,12 +152,12 @@ def test_parse_taxii_filters():
|
||||||
Filter("created_by_ref", "=", "Bane"),
|
Filter("created_by_ref", "=", "Bane"),
|
||||||
]
|
]
|
||||||
|
|
||||||
taxii_filters_expected = set([
|
taxii_filters_expected = [
|
||||||
Filter("added_after", "=", "2016-02-01T00:00:01.000Z"),
|
Filter("added_after", "=", "2016-02-01T00:00:01.000Z"),
|
||||||
Filter("id", "=", "taxii stix object ID"),
|
Filter("id", "=", "taxii stix object ID"),
|
||||||
Filter("type", "=", "taxii stix object ID"),
|
Filter("type", "=", "taxii stix object ID"),
|
||||||
Filter("version", "=", "first")
|
Filter("version", "=", "first")
|
||||||
])
|
]
|
||||||
|
|
||||||
ds = taxii.TAXIICollectionSource(collection)
|
ds = taxii.TAXIICollectionSource(collection)
|
||||||
|
|
||||||
|
@ -177,7 +181,7 @@ def test_add_get_remove_filter():
|
||||||
ds.filters.add(valid_filters[0])
|
ds.filters.add(valid_filters[0])
|
||||||
assert len(ds.filters) == 1
|
assert len(ds.filters) == 1
|
||||||
|
|
||||||
# Addin the same filter again will have no effect since `filters` uses a set
|
# Addin the same filter again will have no effect since `filters` acts like a set
|
||||||
ds.filters.add(valid_filters[0])
|
ds.filters.add(valid_filters[0])
|
||||||
assert len(ds.filters) == 1
|
assert len(ds.filters) == 1
|
||||||
|
|
||||||
|
@ -186,14 +190,14 @@ def test_add_get_remove_filter():
|
||||||
ds.filters.add(valid_filters[2])
|
ds.filters.add(valid_filters[2])
|
||||||
assert len(ds.filters) == 3
|
assert len(ds.filters) == 3
|
||||||
|
|
||||||
assert set(valid_filters) == ds.filters
|
assert valid_filters == [f for f in ds.filters]
|
||||||
|
|
||||||
# remove
|
# remove
|
||||||
ds.filters.remove(valid_filters[0])
|
ds.filters.remove(valid_filters[0])
|
||||||
|
|
||||||
assert len(ds.filters) == 2
|
assert len(ds.filters) == 2
|
||||||
|
|
||||||
ds.filters.update(valid_filters)
|
ds.filters.add(valid_filters)
|
||||||
|
|
||||||
|
|
||||||
def test_filter_ops_check():
|
def test_filter_ops_check():
|
||||||
|
@ -297,9 +301,32 @@ def test_apply_common_filters():
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"labels": ["heartbleed", "has-logo"]
|
"labels": ["heartbleed", "has-logo"]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "observed-data",
|
||||||
|
"id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
|
||||||
|
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
|
||||||
|
"created": "2016-04-06T19:58:16.000Z",
|
||||||
|
"modified": "2016-04-06T19:58:16.000Z",
|
||||||
|
"first_observed": "2015-12-21T19:00:00Z",
|
||||||
|
"last_observed": "2015-12-21T19:00:00Z",
|
||||||
|
"number_observed": 1,
|
||||||
|
"objects": {
|
||||||
|
"0": {
|
||||||
|
"type": "file",
|
||||||
|
"name": "HAL 9000.exe"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# same as above objects but converted to real Python STIX2 objects
|
||||||
|
# to test filters against true Python STIX2 objects
|
||||||
|
print(stix_objs)
|
||||||
|
real_stix_objs = [parse(stix_obj) for stix_obj in stix_objs]
|
||||||
|
print("after\n\n")
|
||||||
|
print(stix_objs)
|
||||||
filters = [
|
filters = [
|
||||||
Filter("type", "!=", "relationship"),
|
Filter("type", "!=", "relationship"),
|
||||||
Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"),
|
Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"),
|
||||||
|
@ -315,6 +342,7 @@ def test_apply_common_filters():
|
||||||
Filter("object_marking_refs", "=", "marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9"),
|
Filter("object_marking_refs", "=", "marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9"),
|
||||||
Filter("granular_markings.selectors", "in", "description"),
|
Filter("granular_markings.selectors", "in", "description"),
|
||||||
Filter("external_references.source_name", "=", "CVE"),
|
Filter("external_references.source_name", "=", "CVE"),
|
||||||
|
Filter("objects", "=", {"0": {"type": "file", "name": "HAL 9000.exe"}})
|
||||||
]
|
]
|
||||||
|
|
||||||
# "Return any object whose type is not relationship"
|
# "Return any object whose type is not relationship"
|
||||||
|
@ -323,66 +351,125 @@ def test_apply_common_filters():
|
||||||
assert stix_objs[0]['id'] in ids
|
assert stix_objs[0]['id'] in ids
|
||||||
assert stix_objs[1]['id'] in ids
|
assert stix_objs[1]['id'] in ids
|
||||||
assert stix_objs[3]['id'] in ids
|
assert stix_objs[3]['id'] in ids
|
||||||
assert len(ids) == 3
|
assert len(ids) == 4
|
||||||
|
|
||||||
|
resp = list(apply_common_filters(real_stix_objs, [filters[0]]))
|
||||||
|
ids = [r.id for r in resp]
|
||||||
|
assert real_stix_objs[0].id in ids
|
||||||
|
assert real_stix_objs[1].id in ids
|
||||||
|
assert real_stix_objs[3].id in ids
|
||||||
|
assert len(ids) == 4
|
||||||
|
|
||||||
# "Return any object that matched id relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"
|
# "Return any object that matched id relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"
|
||||||
resp = list(apply_common_filters(stix_objs, [filters[1]]))
|
resp = list(apply_common_filters(stix_objs, [filters[1]]))
|
||||||
assert resp[0]['id'] == stix_objs[2]['id']
|
assert resp[0]['id'] == stix_objs[2]['id']
|
||||||
assert len(resp) == 1
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
resp = list(apply_common_filters(real_stix_objs, [filters[1]]))
|
||||||
|
assert resp[0].id == real_stix_objs[2].id
|
||||||
|
assert len(resp) == 1
|
||||||
|
|
||||||
# "Return any object that contains remote-access-trojan in labels"
|
# "Return any object that contains remote-access-trojan in labels"
|
||||||
resp = list(apply_common_filters(stix_objs, [filters[2]]))
|
resp = list(apply_common_filters(stix_objs, [filters[2]]))
|
||||||
assert resp[0]['id'] == stix_objs[0]['id']
|
assert resp[0]['id'] == stix_objs[0]['id']
|
||||||
assert len(resp) == 1
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
resp = list(apply_common_filters(real_stix_objs, [filters[2]]))
|
||||||
|
assert resp[0].id == real_stix_objs[0].id
|
||||||
|
assert len(resp) == 1
|
||||||
|
|
||||||
# "Return any object created after 2015-01-01T01:00:00.000Z"
|
# "Return any object created after 2015-01-01T01:00:00.000Z"
|
||||||
resp = list(apply_common_filters(stix_objs, [filters[3]]))
|
resp = list(apply_common_filters(stix_objs, [filters[3]]))
|
||||||
assert resp[0]['id'] == stix_objs[0]['id']
|
assert resp[0]['id'] == stix_objs[0]['id']
|
||||||
assert len(resp) == 2
|
assert len(resp) == 3
|
||||||
|
|
||||||
# "Return any revoked object"
|
# "Return any revoked object"
|
||||||
resp = list(apply_common_filters(stix_objs, [filters[4]]))
|
resp = list(apply_common_filters(stix_objs, [filters[4]]))
|
||||||
assert resp[0]['id'] == stix_objs[2]['id']
|
assert resp[0]['id'] == stix_objs[2]['id']
|
||||||
assert len(resp) == 1
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
resp = list(apply_common_filters(real_stix_objs, [filters[4]]))
|
||||||
|
assert resp[0].id == real_stix_objs[2].id
|
||||||
|
assert len(resp) == 1
|
||||||
|
|
||||||
# "Return any object whose not revoked"
|
# "Return any object whose not revoked"
|
||||||
# Note that if 'revoked' property is not present in object.
|
# Note that if 'revoked' property is not present in object.
|
||||||
# Currently we can't use such an expression to filter for... :(
|
# Currently we can't use such an expression to filter for... :(
|
||||||
resp = list(apply_common_filters(stix_objs, [filters[5]]))
|
resp = list(apply_common_filters(stix_objs, [filters[5]]))
|
||||||
assert len(resp) == 0
|
assert len(resp) == 0
|
||||||
|
|
||||||
|
resp = list(apply_common_filters(real_stix_objs, [filters[5]]))
|
||||||
|
assert len(resp) == 0
|
||||||
|
|
||||||
# "Return any object that matches marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9 in object_marking_refs"
|
# "Return any object that matches marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9 in object_marking_refs"
|
||||||
resp = list(apply_common_filters(stix_objs, [filters[6]]))
|
resp = list(apply_common_filters(stix_objs, [filters[6]]))
|
||||||
assert resp[0]['id'] == stix_objs[2]['id']
|
assert resp[0]['id'] == stix_objs[2]['id']
|
||||||
assert len(resp) == 1
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
resp = list(apply_common_filters(real_stix_objs, [filters[6]]))
|
||||||
|
assert resp[0].id == real_stix_objs[2].id
|
||||||
|
assert len(resp) == 1
|
||||||
|
|
||||||
# "Return any object that contains relationship_type in their selectors AND
|
# "Return any object that contains relationship_type in their selectors AND
|
||||||
# also has marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed in marking_ref"
|
# also has marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed in marking_ref"
|
||||||
resp = list(apply_common_filters(stix_objs, [filters[7], filters[8]]))
|
resp = list(apply_common_filters(stix_objs, [filters[7], filters[8]]))
|
||||||
assert resp[0]['id'] == stix_objs[2]['id']
|
assert resp[0]['id'] == stix_objs[2]['id']
|
||||||
assert len(resp) == 1
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
resp = list(apply_common_filters(real_stix_objs, [filters[7], filters[8]]))
|
||||||
|
assert resp[0].id == real_stix_objs[2].id
|
||||||
|
assert len(resp) == 1
|
||||||
|
|
||||||
# "Return any object that contains CVE-2014-0160,CVE-2017-6608 in their external_id"
|
# "Return any object that contains CVE-2014-0160,CVE-2017-6608 in their external_id"
|
||||||
resp = list(apply_common_filters(stix_objs, [filters[9]]))
|
resp = list(apply_common_filters(stix_objs, [filters[9]]))
|
||||||
assert resp[0]['id'] == stix_objs[3]['id']
|
assert resp[0]['id'] == stix_objs[3]['id']
|
||||||
assert len(resp) == 1
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
resp = list(apply_common_filters(real_stix_objs, [filters[9]]))
|
||||||
|
assert resp[0].id == real_stix_objs[3].id
|
||||||
|
assert len(resp) == 1
|
||||||
|
|
||||||
# "Return any object that matches created_by_ref identity--00000000-0000-0000-0000-b8e91df99dc9"
|
# "Return any object that matches created_by_ref identity--00000000-0000-0000-0000-b8e91df99dc9"
|
||||||
resp = list(apply_common_filters(stix_objs, [filters[10]]))
|
resp = list(apply_common_filters(stix_objs, [filters[10]]))
|
||||||
assert len(resp) == 1
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
resp = list(apply_common_filters(real_stix_objs, [filters[10]]))
|
||||||
|
assert len(resp) == 1
|
||||||
|
|
||||||
# "Return any object that matches marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9 in object_marking_refs" (None)
|
# "Return any object that matches marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9 in object_marking_refs" (None)
|
||||||
resp = list(apply_common_filters(stix_objs, [filters[11]]))
|
resp = list(apply_common_filters(stix_objs, [filters[11]]))
|
||||||
assert len(resp) == 0
|
assert len(resp) == 0
|
||||||
|
|
||||||
|
resp = list(apply_common_filters(real_stix_objs, [filters[11]]))
|
||||||
|
assert len(resp) == 0
|
||||||
|
|
||||||
# "Return any object that contains description in its selectors" (None)
|
# "Return any object that contains description in its selectors" (None)
|
||||||
resp = list(apply_common_filters(stix_objs, [filters[12]]))
|
resp = list(apply_common_filters(stix_objs, [filters[12]]))
|
||||||
assert len(resp) == 0
|
assert len(resp) == 0
|
||||||
|
|
||||||
# "Return any object that object that matches CVE in source_name" (None, case sensitive)
|
resp = list(apply_common_filters(real_stix_objs, [filters[12]]))
|
||||||
|
assert len(resp) == 0
|
||||||
|
|
||||||
|
# "Return any object that matches CVE in source_name" (None, case sensitive)
|
||||||
resp = list(apply_common_filters(stix_objs, [filters[13]]))
|
resp = list(apply_common_filters(stix_objs, [filters[13]]))
|
||||||
assert len(resp) == 0
|
assert len(resp) == 0
|
||||||
|
|
||||||
|
resp = list(apply_common_filters(real_stix_objs, [filters[13]]))
|
||||||
|
assert len(resp) == 0
|
||||||
|
|
||||||
|
# Return any object that matches file object in "objects"
|
||||||
|
# BUG: This test is brokem , weird behavior, the file obj
|
||||||
|
# in stix_objs is being parsed into real python-stix2 obj even though
|
||||||
|
# it never goes through parse() --> BAD <_<
|
||||||
|
print(stix_objs)
|
||||||
|
resp = list(apply_common_filters(stix_objs, [filters[14]]))
|
||||||
|
assert resp[0]["id"] == stix_objs[14]["id"]
|
||||||
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
resp = list(apply_common_filters(real_stix_objs, [filters[14]]))
|
||||||
|
assert resp[0].id == real_stix_objs[14].id
|
||||||
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
|
||||||
def test_filters0():
|
def test_filters0():
|
||||||
# "Return any object modified before 2017-01-28T13:49:53.935Z"
|
# "Return any object modified before 2017-01-28T13:49:53.935Z"
|
||||||
|
@ -390,6 +477,10 @@ def test_filters0():
|
||||||
assert resp[0]['id'] == STIX_OBJS2[1]['id']
|
assert resp[0]['id'] == STIX_OBJS2[1]['id']
|
||||||
assert len(resp) == 2
|
assert len(resp) == 2
|
||||||
|
|
||||||
|
resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("modified", "<", parse_into_datetime("2017-01-28T13:49:53.935Z"))]))
|
||||||
|
assert resp[0].id == REAL_STIX_OBJS2[1].id
|
||||||
|
assert len(resp) == 2
|
||||||
|
|
||||||
|
|
||||||
def test_filters1():
|
def test_filters1():
|
||||||
# "Return any object modified after 2017-01-28T13:49:53.935Z"
|
# "Return any object modified after 2017-01-28T13:49:53.935Z"
|
||||||
|
@ -397,6 +488,10 @@ def test_filters1():
|
||||||
assert resp[0]['id'] == STIX_OBJS2[0]['id']
|
assert resp[0]['id'] == STIX_OBJS2[0]['id']
|
||||||
assert len(resp) == 1
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("modified", ">", parse_into_datetime("2017-01-28T13:49:53.935Z"))]))
|
||||||
|
assert resp[0].id == REAL_STIX_OBJS2[0].id
|
||||||
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
|
||||||
def test_filters2():
|
def test_filters2():
|
||||||
# "Return any object modified after or on 2017-01-28T13:49:53.935Z"
|
# "Return any object modified after or on 2017-01-28T13:49:53.935Z"
|
||||||
|
@ -404,6 +499,10 @@ def test_filters2():
|
||||||
assert resp[0]['id'] == STIX_OBJS2[0]['id']
|
assert resp[0]['id'] == STIX_OBJS2[0]['id']
|
||||||
assert len(resp) == 3
|
assert len(resp) == 3
|
||||||
|
|
||||||
|
resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("modified", ">=", parse_into_datetime("2017-01-27T13:49:53.935Z"))]))
|
||||||
|
assert resp[0].id == REAL_STIX_OBJS2[0].id
|
||||||
|
assert len(resp) == 3
|
||||||
|
|
||||||
|
|
||||||
def test_filters3():
|
def test_filters3():
|
||||||
# "Return any object modified before or on 2017-01-28T13:49:53.935Z"
|
# "Return any object modified before or on 2017-01-28T13:49:53.935Z"
|
||||||
|
@ -411,6 +510,11 @@ def test_filters3():
|
||||||
assert resp[0]['id'] == STIX_OBJS2[1]['id']
|
assert resp[0]['id'] == STIX_OBJS2[1]['id']
|
||||||
assert len(resp) == 2
|
assert len(resp) == 2
|
||||||
|
|
||||||
|
# "Return any object modified before or on 2017-01-28T13:49:53.935Z"
|
||||||
|
resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("modified", "<=", parse_into_datetime("2017-01-27T13:49:53.935Z"))]))
|
||||||
|
assert resp[0].id == REAL_STIX_OBJS2[1].id
|
||||||
|
assert len(resp) == 2
|
||||||
|
|
||||||
|
|
||||||
def test_filters4():
|
def test_filters4():
|
||||||
# Assert invalid Filter cannot be created
|
# Assert invalid Filter cannot be created
|
||||||
|
@ -426,6 +530,10 @@ def test_filters5():
|
||||||
assert resp[0]['id'] == STIX_OBJS2[0]['id']
|
assert resp[0]['id'] == STIX_OBJS2[0]['id']
|
||||||
assert len(resp) == 1
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")]))
|
||||||
|
assert resp[0].id == REAL_STIX_OBJS2[0].id
|
||||||
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
|
||||||
def test_filters6():
|
def test_filters6():
|
||||||
# Test filtering on non-common property
|
# Test filtering on non-common property
|
||||||
|
@ -433,10 +541,14 @@ def test_filters6():
|
||||||
assert resp[0]['id'] == STIX_OBJS2[0]['id']
|
assert resp[0]['id'] == STIX_OBJS2[0]['id']
|
||||||
assert len(resp) == 3
|
assert len(resp) == 3
|
||||||
|
|
||||||
|
resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("name", "=", "Malicious site hosting downloader")]))
|
||||||
|
assert resp[0].id == REAL_STIX_OBJS2[0].id
|
||||||
|
assert len(resp) == 3
|
||||||
|
|
||||||
|
|
||||||
def test_filters7():
|
def test_filters7():
|
||||||
# Test filtering on embedded property
|
# Test filtering on embedded property
|
||||||
stix_objects = list(STIX_OBJS2) + [{
|
obsvd_data_obj = {
|
||||||
"type": "observed-data",
|
"type": "observed-data",
|
||||||
"id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
|
"id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
|
||||||
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
|
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
|
||||||
|
@ -467,11 +579,19 @@ def test_filters7():
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}]
|
}
|
||||||
|
|
||||||
|
stix_objects = list(STIX_OBJS2) + [obsvd_data_obj]
|
||||||
|
real_stix_objects = list(REAL_STIX_OBJS2) + [parse(obsvd_data_obj)]
|
||||||
|
|
||||||
resp = list(apply_common_filters(stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")]))
|
resp = list(apply_common_filters(stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")]))
|
||||||
assert resp[0]['id'] == stix_objects[3]['id']
|
assert resp[0]['id'] == stix_objects[3]['id']
|
||||||
assert len(resp) == 1
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
resp = list(apply_common_filters(real_stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")]))
|
||||||
|
assert resp[0].id == real_stix_objects[3].id
|
||||||
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
|
||||||
def test_deduplicate():
|
def test_deduplicate():
|
||||||
unique = deduplicate(STIX_OBJS1)
|
unique = deduplicate(STIX_OBJS1)
|
||||||
|
@ -548,7 +668,7 @@ def test_composite_datasource_operations():
|
||||||
Filter("valid_from", "=", "2017-01-27T13:49:53.935382Z")
|
Filter("valid_from", "=", "2017-01-27T13:49:53.935382Z")
|
||||||
]
|
]
|
||||||
|
|
||||||
cds1.filters.update(query2)
|
cds1.filters.add(query2)
|
||||||
|
|
||||||
results = cds1.query(query1)
|
results = cds1.query(query1)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue