Convert rest of code to use namedtuple Filters

stix2.1
Greg Back 2017-08-09 19:25:06 +00:00
parent 87f7503c0a
commit 961dfdc984
4 changed files with 51 additions and 83 deletions

View File

@ -289,17 +289,17 @@ class DataSource(object):
# skip filter as filter was identified (when added) as
# not a common filter
if 'id' in filter_ and self.filter_allowed[filter_['id']] is False:
if filter_.field not in STIX_COMMON_FIELDS:
continue
# check filter "field" is in STIX object - if cant be applied
# due to STIX object, STIX object is discarded (i.e. did not
# make it through the filter)
if filter_['field'] not in stix_obj.keys():
if filter_.field not in stix_obj.keys():
clean = False
break
try:
match = getattr(STIXCommonPropertyFilters, filter_['field'])(filter_, stix_obj)
match = getattr(STIXCommonPropertyFilters, filter_.field)(filter_, stix_obj)
if not match:
clean = False
break
@ -553,39 +553,39 @@ class STIXCommonPropertyFilters():
@classmethod
def _all(cls, filter_, stix_obj_field):
"""all filter operations (for filters whose value type can be applied to any operation type)"""
if filter_["op"] == '=':
return stix_obj_field == filter_["value"]
elif filter_["op"] == "!=":
return stix_obj_field != filter_["value"]
elif filter_["op"] == "in":
return stix_obj_field in filter_["value"]
elif filter_["op"] == ">":
return stix_obj_field > filter_["value"]
elif filter_["op"] == "<":
return stix_obj_field < filter_["value"]
elif filter_["op"] == ">=":
return stix_obj_field >= filter_["value"]
elif filter_["op"] == "<=":
return stix_obj_field <= filter_["value"]
if filter_.op == '=':
return stix_obj_field == filter_.value
elif filter_.op == "!=":
return stix_obj_field != filter_.value
elif filter_.op == "in":
return stix_obj_field in filter_.value
elif filter_.op == ">":
return stix_obj_field > filter_.value
elif filter_.op == "<":
return stix_obj_field < filter_.value
elif filter_.op == ">=":
return stix_obj_field >= filter_.value
elif filter_.op == "<=":
return stix_obj_field <= filter_.value
else:
return -1
@classmethod
def _id(cls, filter_, stix_obj_id):
"""base filter types"""
if filter_["op"] == "=":
return stix_obj_id == filter_["value"]
elif filter_["op"] == "!=":
return stix_obj_id != filter_["value"]
if filter_.op == "=":
return stix_obj_id == filter_.value
elif filter_.op == "!=":
return stix_obj_id != filter_.value
else:
return -1
@classmethod
def _boolean(cls, filter_, stix_obj_field):
if filter_["op"] == "=":
return stix_obj_field == filter_["value"]
elif filter_["op"] == "!=":
return stix_obj_field != filter_["value"]
if filter_.op == "=":
return stix_obj_field == filter_.value
elif filter_.op == "!=":
return stix_obj_field != filter_.value
else:
return -1
@ -620,7 +620,7 @@ class STIXCommonPropertyFilters():
"""
for er in stix_obj["external_references"]:
# grab er property name from filter field
filter_field = filter_["field"].split(".")[1]
filter_field = filter_.field.split(".")[1]
r = cls._string(filter_, er[filter_field])
if r:
return r
@ -637,7 +637,7 @@ class STIXCommonPropertyFilters():
"""
for gm in stix_obj["granular_markings"]:
# grab gm property name from filter field
filter_field = filter_["field"].split(".")[1]
filter_field = filter_.field.split(".")[1]
if filter_field == "marking_ref":
return cls._id(filter_, gm[filter_field])

View File

@ -12,8 +12,8 @@ TODO: Test everything
import json
import os
from stix2.sources import DataSink, DataSource, DataStore, make_id
from stix2 import Bundle
from stix2.sources import DataSink, DataSource, DataStore, make_id
class FileSystemStore(DataStore):
@ -136,13 +136,13 @@ class FileSystemSource(DataSource):
# the corresponding subdirectories as well
include_paths = []
declude_paths = []
if "type" in [filter_["field"] for filter_ in file_filters]:
if "type" in [filter_.field for filter_ in file_filters]:
for filter_ in file_filters:
if filter_["field"] == "type":
if filter_["op"] == '=':
include_paths.append(os.path.join(self.stix_dir, filter_["value"]))
elif filter_["op"] == "!=":
declude_paths.append(os.path.join(self.stix_dir, filter_["value"]))
if filter_.field == "type":
if filter_.op == '=':
include_paths.append(os.path.join(self.stix_dir, filter_.value))
elif filter_.op == "!=":
declude_paths.append(os.path.join(self.stix_dir, filter_.value))
else:
# have to walk entire STIX directory
include_paths.append(self.stix_dir)
@ -165,10 +165,10 @@ class FileSystemSource(DataSource):
# grab stix object ID as well - if present in filters, as
# may forgo the loading of STIX content into memory
if "id" in [filter_["field"] for filter_ in file_filters]:
if "id" in [filter_.field for filter_ in file_filters]:
for filter_ in file_filters:
if filter_["field"] == "id" and filter_["field"] == '=':
id_ = filter_["value"]
if filter_.field == "id" and filter_.field == '=':
id_ = filter_.value
else:
id_ = None
@ -196,6 +196,6 @@ class FileSystemSource(DataSource):
"""
file_filters = []
for filter_ in query:
if filter_["field"] == "id" or filter_["field"] == "type":
if filter_.field == "id" or filter_.field == "type":
file_filters.append(filter_)
return file_filters

View File

@ -159,10 +159,10 @@ class TAXIICollectionSource(DataSource):
params = {}
for filter_ in query:
if filter_["field"] in TAXII_FILTERS:
if filter_["field"] == "added_after":
params[filter_["field"]] = filter_["value"]
if filter_.field in TAXII_FILTERS:
if filter_.field == "added_after":
params[filter_.field] = filter_.value
else:
taxii_field = "match[" + filter_["field"] + ']'
params[taxii_field] = filter_["value"]
taxii_field = "match[" + filter_.field + ']'
params[taxii_field] = filter_.value
return params

View File

@ -33,31 +33,11 @@ def test_ds_taxii_name(collection):
def test_parse_taxii_filters():
query = [
{
"field": "added_after",
"op": "=",
"value": "2016-02-01T00:00:01.000Z"
},
{
"field": "id",
"op": "=",
"value": "taxii stix object ID"
},
{
"field": "type",
"op": "=",
"value": "taxii stix object ID"
},
{
"field": "version",
"op": "=",
"value": "first"
},
{
"field": "created_by_ref",
"op": "=",
"value": "Bane"
}
Filter("added_after", "=", "2016-02-01T00:00:01.000Z"),
Filter("id", "=", "taxii stix object ID"),
Filter("type", "=", "taxii stix object ID"),
Filter("version", "=", "first"),
Filter("created_by_ref", "=", "Bane"),
]
expected_params = {
@ -162,21 +142,9 @@ def test_apply_common_filters():
]
filters = [
{
"field": "type",
"op": "!=",
"value": "relationship"
},
{
"field": "id",
"op": "=",
"value": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"
},
{
"field": "labels",
"op": "in",
"value": "remote-access-trojan"
}
Filter("type", "!=", "relationship"),
Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"),
Filter("labels", "in", "remote-access-trojan"),
]
ds = DataSource()