filter changes

stix2.1
= 2017-08-30 11:18:11 -04:00
parent 681be1a5d9
commit 22c749d0df
5 changed files with 240 additions and 233 deletions

View File

@ -16,56 +16,19 @@ Notes:
"""
import collections
import copy
import uuid
from six import iteritems
class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])):
__slots__ = ()
def __new__(cls, field, op, value):
# If value is a list, convert it to a tuple so it is hashable.
if isinstance(value, list):
value = tuple(value)
self = super(Filter, cls).__new__(cls, field, op, value)
return self
from filters import (FILTER_OPS, FILTER_VALUE_TYPES, STIX_COMMON_FIELDS,
STIX_COMMON_FILTERS_MAP)
def make_id():
return str(uuid.uuid4())
# Currently, only STIX 2.0 common SDO fields (that are not complex objects)
# are supported for filtering on
STIX_COMMON_FIELDS = [
"created",
"created_by_ref",
"external_references.source_name",
"external_references.description",
"external_references.url",
"external_references.hashes",
"external_references.external_id",
"granular_markings.marking_ref",
"granular_markings.selectors",
"id",
"labels",
"modified",
"object_marking_refs",
"revoked",
"type",
"granular_markings"
]
# Supported filter operations
FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
# Supported filter value types
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple]
class DataStore(object):
"""
An implementer will create a concrete subclass from
@ -306,7 +269,7 @@ class DataSource(object):
clean = False
break
try:
match = getattr(STIXCommonPropertyFilters, filter_.field)(filter_, stix_obj)
match = STIX_COMMON_FILTERS_MAP[filter_.field](filter_, stix_obj)
if not match:
clean = False
break
@ -527,140 +490,3 @@ class CompositeDataSource(DataSource):
"""
return copy.deepcopy(self.data_sources.values())
class STIXCommonPropertyFilters(object):
"""
"""
@classmethod
def _all(cls, filter_, stix_obj_field):
"""all filter operations (for filters whose value type can be applied to any operation type)"""
if filter_.op == "=":
return stix_obj_field == filter_.value
elif filter_.op == "!=":
return stix_obj_field != filter_.value
elif filter_.op == "in":
return stix_obj_field in filter_.value
elif filter_.op == ">":
return stix_obj_field > filter_.value
elif filter_.op == "<":
return stix_obj_field < filter_.value
elif filter_.op == ">=":
return stix_obj_field >= filter_.value
elif filter_.op == "<=":
return stix_obj_field <= filter_.value
else:
return -1
@classmethod
def _id(cls, filter_, stix_obj_id):
"""base filter types"""
if filter_.op == "=":
return stix_obj_id == filter_.value
elif filter_.op == "!=":
return stix_obj_id != filter_.value
else:
return -1
@classmethod
def _boolean(cls, filter_, stix_obj_field):
if filter_.op == "=":
return stix_obj_field == filter_.value
elif filter_.op == "!=":
return stix_obj_field != filter_.value
else:
return -1
@classmethod
def _string(cls, filter_, stix_obj_field):
return cls._all(filter_, stix_obj_field)
@classmethod
def _timestamp(cls, filter_, stix_obj_timestamp):
return cls._all(filter_, stix_obj_timestamp)
# STIX 2.0 Common Property filters
@classmethod
def created(cls, filter_, stix_obj):
return cls._timestamp(filter_, stix_obj["created"])
@classmethod
def created_by_ref(cls, filter_, stix_obj):
return cls._id(filter_, stix_obj["created_by_ref"])
@classmethod
def external_references(cls, filter_, stix_obj):
"""
STIX object's can have a list of external references
external_references properties:
external_references.source_name (string)
external_references.description (string)
external_references.url (string)
external_references.hashes (hash, but for filtering purposes, a string)
external_references.external_id (string)
"""
for er in stix_obj["external_references"]:
# grab er property name from filter field
filter_field = filter_.field.split(".")[1]
r = cls._string(filter_, er[filter_field])
if r:
return r
return False
@classmethod
def granular_markings(cls, filter_, stix_obj):
"""
STIX object's can have a list of granular marking references
granular_markings properties:
granular_markings.marking_ref (id)
granular_markings.selectors (string)
"""
for gm in stix_obj["granular_markings"]:
# grab gm property name from filter field
filter_field = filter_.field.split(".")[1]
if filter_field == "marking_ref":
return cls._id(filter_, gm[filter_field])
elif filter_field == "selectors":
for selector in gm[filter_field]:
r = cls._string(filter_, selector)
if r:
return r
return False
@classmethod
def id(cls, filter_, stix_obj):
return cls._id(filter_, stix_obj["id"])
@classmethod
def labels(cls, filter_, stix_obj):
for label in stix_obj["labels"]:
r = cls._string(filter_, label)
if r:
return r
return False
@classmethod
def modified(cls, filter_, stix_obj):
return cls._timestamp(filter_, stix_obj["created"])
@classmethod
def object_markings_ref(cls, filter_, stix_obj):
for marking_id in stix_obj["object_market_refs"]:
r = cls._id(filter_, marking_id)
if r:
return r
return False
@classmethod
def revoked(cls, filter_, stix_obj):
return cls._boolean(filter_, stix_obj["revoked"])
@classmethod
def type(cls, filter_, stix_obj):
return cls._string(filter_, stix_obj["type"])

View File

@ -13,7 +13,7 @@ import json
import os
from stix2 import Bundle
from stix2.sources import DataSink, DataSource, DataStore
from stix2.sources import DataSink, DataSource, DataStore, Filter
class FileSystemStore(DataStore):
@ -77,13 +77,7 @@ class FileSystemSource(DataSource):
def get(self, stix_id, _composite_filters=None):
"""
"""
query = [
{
"field": "id",
"op": "=",
"value": stix_id
}
]
query = [Filter("id", "=", stix_id)]
all_data = self.query(query=query, _composite_filters=_composite_filters)
@ -95,21 +89,10 @@ class FileSystemSource(DataSource):
"""
Notes:
Since FileSystem sources/sinks don't handle multiple versions
of a STIX object, this operation is futile. Pass call to get().
(Approved by G.B.)
of a STIX object, this operation is unnecessary. Pass call to get().
"""
# query = [
# {
# "field": "id",
# "op": "=",
# "value": stix_id
# }
# ]
# all_data = self.query(query=query, _composite_filters=_composite_filters)
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]
def query(self, query=None, _composite_filters=None):

227
stix2/sources/filters.py Normal file
View File

@ -0,0 +1,227 @@
"""
Filters for Python STIX 2.0 DataSources, DataSinks, DataStores
Classes:
Filter
TODO: The script at the bottom of the module works (to capture
all the callable filter methods), however it causes this module
to be imported by itself twice. Not sure how big of deal that is,
or if cleaner solution possible.
"""
import collections
import types
import filters
# Currently, only STIX 2.0 common SDO fields (that are not complex objects)
# are supported for filtering on
STIX_COMMON_FIELDS = [
"created",
"created_by_ref",
"external_references.source_name",
"external_references.description",
"external_references.url",
"external_references.hashes",
"external_references.external_id",
"granular_markings.marking_ref",
"granular_markings.selectors",
"id",
"labels",
"modified",
"object_marking_refs",
"revoked",
"type",
"granular_markings"
]
# Supported filter operations
FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
# Supported filter value types
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple]
# filter lookup map - STIX 2 common fields -> filter method
STIX_COMMON_FILTERS_MAP = {}
class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])):
__slots__ = ()
def __new__(cls, field, op, value):
# If value is a list, convert it to a tuple so it is hashable.
if isinstance(value, list):
value = tuple(value)
self = super(Filter, cls).__new__(cls, field, op, value)
return self
# primitive type filters
def _all_filter(filter_, stix_obj_field):
"""all filter operations (for filters whose value type can be applied to any operation type)"""
if filter_.op == "=":
return stix_obj_field == filter_.value
elif filter_.op == "!=":
return stix_obj_field != filter_.value
elif filter_.op == "in":
return stix_obj_field in filter_.value
elif filter_.op == ">":
return stix_obj_field > filter_.value
elif filter_.op == "<":
return stix_obj_field < filter_.value
elif filter_.op == ">=":
return stix_obj_field >= filter_.value
elif filter_.op == "<=":
return stix_obj_field <= filter_.value
else:
return -1
def _id_filter(filter_, stix_obj_id):
"""base filter types"""
if filter_.op == "=":
return stix_obj_id == filter_.value
elif filter_.op == "!=":
return stix_obj_id != filter_.value
else:
return -1
def _boolean_filter(filter_, stix_obj_field):
if filter_.op == "=":
return stix_obj_field == filter_.value
elif filter_.op == "!=":
return stix_obj_field != filter_.value
else:
return -1
def _string_filter(filter_, stix_obj_field):
return _all_filter(filter_, stix_obj_field)
def _timestamp_filter(filter_, stix_obj_timestamp):
return _all_filter(filter_, stix_obj_timestamp)
# STIX 2.0 Common Property filters
# The naming of these functions is important as
# they are used to index a mapping dictionary from
# STIX common field names to these filter functions.
#
# REQUIRED naming scheme:
# "check_<STIX field name>_filter"
def check_created_filter(filter_, stix_obj):
return _timestamp_filter(filter_, stix_obj["created"])
def check_created_by_ref_filter(filter_, stix_obj):
return _id_filter(filter_, stix_obj["created_by_ref"])
def check_external_references_filter(filter_, stix_obj):
"""
STIX object's can have a list of external references
external_references properties:
external_references.source_name (string)
external_references.description (string)
external_references.url (string)
external_references.hashes (hash, but for filtering purposes, a string)
external_references.external_id (string)
"""
for er in stix_obj["external_references"]:
# grab er property name from filter field
filter_field = filter_.field.split(".")[1]
r = _string_filter(filter_, er[filter_field])
if r:
return r
return False
def check_granular_markings_filter(filter_, stix_obj):
"""
STIX object's can have a list of granular marking references
granular_markings properties:
granular_markings.marking_ref (id)
granular_markings.selectors (string)
"""
for gm in stix_obj["granular_markings"]:
# grab gm property name from filter field
filter_field = filter_.field.split(".")[1]
if filter_field == "marking_ref":
return _id_filter(filter_, gm[filter_field])
elif filter_field == "selectors":
for selector in gm[filter_field]:
r = _string_filter(filter_, selector)
if r:
return r
return False
def check_id_filter(filter_, stix_obj):
return _id_filter(filter_, stix_obj["id"])
def check_labels_filter(filter_, stix_obj):
for label in stix_obj["labels"]:
r = _string_filter(filter_, label)
if r:
return r
return False
def check_modified_filter(filter_, stix_obj):
return _timestamp_filter(filter_, stix_obj["created"])
def check_object_markings_ref_filter(filter_, stix_obj):
for marking_id in stix_obj["object_market_refs"]:
r = _id_filter(filter_, marking_id)
if r:
return r
return False
def check_revoked_filter(filter_, stix_obj):
return _boolean_filter(filter_, stix_obj["revoked"])
def check_type_filter(filter_, stix_obj):
return _string_filter(filter_, stix_obj["type"])
# script to collect STIX common field filter
# functions and create mapping to them
"""
MK: I want to build the filter name -> filter function dictionary
dynamically whenever it is imported. By enumerating the functions
in this module, extracting the "check*" functions and making
pointers to them. But having issues getting an interable of the
modules entities. globals() works but returns an active dictionary
so iterating over it is a no go
"""
for entity in dir(filters):
if "check_" in str(entity) and isinstance(filters.__dict__.get(entity), types.FunctionType):
field_name = entity.split("_")[1].split("_")[0]
STIX_COMMON_FILTERS_MAP[field_name] = filters.__dict__.get(entity)
# Tried this to, didnt work ##############
"""
import sys
for entity in dir(sys.modules[__name__]):
print(entity)
if "check_" in str(entity) and type(entity) == "function":
print(sys.modules[__name__].__dict__.get(entity))
STIX_COMMON_FILTERS_MAP[str(entity)] = sys.modules[__name__].__dict__.get(entity)
"""

View File

@ -22,7 +22,7 @@ import json
import os
from stix2 import Bundle
from stix2.sources import DataSink, DataSource, DataStore
from stix2.sources import DataSink, DataSource, DataStore, Filter
from stix2validator import validate_string
@ -204,13 +204,7 @@ class MemorySource(DataSource):
return stix_obj
# if there are filters from the composite level, process full query
query = [
{
"field": "id",
"op": "=",
"value": stix_id
}
]
query = [Filter("id", "=", stix_id)]
all_data = self.query(query=query, _composite_filters=_composite_filters)
@ -223,21 +217,10 @@ class MemorySource(DataSource):
"""
Notes:
Since Memory sources/sinks don't handle multiple versions of a
STIX object, this operation is futile. Translate call to get().
(Approved by G.B.)
STIX object, this operation is unnecessary. Translate call to get().
"""
# query = [
# {
# "field": "id",
# "op": "=",
# "value": stix_id
# }
# ]
# all_data = self.query(query=query, _composite_filters=_composite_filters)
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]
def query(self, query=None, _composite_filters=None):

View File

@ -12,7 +12,7 @@ TODO: Test everything
import json
from stix2.sources import DataSink, DataSource, DataStore, make_id
from stix2.sources import DataSink, DataSource, DataStore, Filter, make_id
TAXII_FILTERS = ['added_after', 'id', 'type', 'version']
@ -89,16 +89,8 @@ class TAXIICollectionSource(DataSource):
"""
# make query in TAXII query format since 'id' is TAXII field
query = [
{
"field": "match[id]",
"op": "=",
"value": stix_id
},
{
"field": "match[version]",
"op": "=",
"value": "all"
}
Filter("match[id]", "=", stix_id),
Filter("match[version]", "=", "all")
]
all_data = self.query(query=query, _composite_filters=_composite_filters)
@ -138,11 +130,7 @@ class TAXIICollectionSource(DataSource):
For instance - "?match[type]=indicator,sighting" should be in a
query dict as follows:
{
"field": "type"
"op": "=",
"value": "indicator,sighting"
}
Filter("type", "=", "indicator,sighting")
Args:
query (list): list of filters to extract which ones are TAXII