Merge pull request #49 from emmanvg/datastores

Datastores Changes
stix2.1
Greg Back 2017-09-05 16:27:43 +00:00 committed by GitHub
commit c4f459752a
8 changed files with 243 additions and 244 deletions

5
.gitignore vendored
View File

@ -57,9 +57,12 @@ docs/_build/
# PyBuilder
target/
# External data cache
cache.sqlite
# Vim
*.swp
#
# PyCharm
.idea/

View File

@ -121,7 +121,7 @@ class DependentPropertiesError(STIXError, TypeError):
def __str__(self):
msg = "The property dependencies for {0}: ({1}) are not met."
return msg.format(self.cls.__name__,
", ".join(x for x, y in self.dependencies))
", ".join(name for x in self.dependencies for name in x))
class AtLeastOnePropertyError(STIXError, TypeError):

View File

@ -33,17 +33,21 @@ class DataStore(object):
An implementer will create a concrete subclass from
this abstract class for the specific data store.
Attributes:
id (str): A unique UUIDv4 to identify this DataStore.
source (DataStore): An object that implements DataStore class.
sink (DataSink): An object that implements DataSink class.
"""
def __init__(self, name="DataStore", source=None, sink=None):
self.name = name
self.id_ = make_id()
def __init__(self, source=None, sink=None):
self.id = make_id()
self.source = source
self.sink = sink
def get(self, stix_id):
"""
Implement:
Translate API get() call to the appropriate DataSource call
Notes:
Translate API get() call to the appropriate DataSource call.
Args:
stix_id (str): the id of the STIX 2.0 object to retrieve. Should
@ -54,7 +58,7 @@ class DataStore(object):
stix_obj (dictionary): the STIX object to be returned
"""
return self.source.get(stix_id=stix_id)
return self.source.get(stix_id)
def all_versions(self, stix_id):
"""
@ -66,21 +70,18 @@ class DataStore(object):
return a single object, the most recent version of the object
specified by the "id".
_composite_filters (list): list of filters passed along from
the Composite Data Filter.
Returns:
stix_objs (list): a list of STIX objects (where each object is a
STIX object)
"""
return self.source.all_versions(stix_id=stix_id)
return self.source.all_versions(stix_id)
def query(self, query):
"""
Fill:
Notes:
Implement the specific data source API calls, processing,
functionality required for retrieving query from the data source
functionality required for retrieving query from the data source.
Args:
query (list): a list of filters (which collectively are the query)
@ -95,11 +96,11 @@ class DataStore(object):
def add(self, stix_objs):
"""
Fill:
-translate add() to the appropriate DataSink call()
Notes:
Translate add() to the appropriate DataSink call().
"""
return self.sink.add(stix_objs=stix_objs)
return self.sink.add(stix_objs)
class DataSink(object):
@ -108,18 +109,15 @@ class DataSink(object):
different sink components.
Attributes:
id_ (str): A unique UUIDv4 to identify this DataSink.
name (str): The descriptive name that identifies this DataSink.
id (str): A unique UUIDv4 to identify this DataSink.
"""
def __init__(self, name="DataSink"):
self.name = name
self.id_ = make_id()
def __init__(self):
self.id = make_id()
def add(self, stix_objs):
"""
Fill:
Notes:
Implement the specific data sink API calls, processing,
functionality required for adding data to the sink
@ -133,15 +131,12 @@ class DataSource(object):
different source components.
Attributes:
id_ (str): A unique UUIDv4 to identify this DataSource.
name (str): The descriptive name that identifies this DataSource.
id (str): A unique UUIDv4 to identify this DataSource.
filters (set): A collection of filters present in this DataSource.
"""
def __init__(self, name="DataSource"):
self.name = name
self.id_ = make_id()
def __init__(self):
self.id = make_id()
self.filters = set()
def get(self, stix_id, _composite_filters=None):
@ -166,12 +161,11 @@ class DataSource(object):
def all_versions(self, stix_id, _composite_filters=None):
"""
Fill:
-Similar to get() except returns list of all object versions of
the specified "id".
-implement the specific data source API calls, processing,
functionality required for retrieving data from the data source
Notes:
Similar to get() except returns list of all object versions of
the specified "id". In addition, implement the specific data
source API calls, processing, functionality required for retrieving
data from the data source.
Args:
stix_id (str): The id of the STIX 2.0 object to retrieve. Should
@ -212,26 +206,24 @@ class DataSource(object):
Args:
filters (list): list of filters (dict) to add to the Data Source.
"""
for filter_ in filters:
self.add_filter(filter_)
for filter in filters:
self.add_filter(filter)
def add_filter(self, filter_):
def add_filter(self, filter):
"""Add a filter."""
# check filter field is a supported STIX 2.0 common field
if filter_.field not in STIX_COMMON_FIELDS:
if filter.field not in STIX_COMMON_FIELDS:
raise ValueError("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported")
# check filter operator is supported
if filter_.op not in FILTER_OPS:
raise ValueError("Filter operation(from 'op' field) not supported")
if filter.op not in FILTER_OPS:
raise ValueError("Filter operation (from 'op' field) not supported")
# check filter value type is supported
if type(filter_.value) not in FILTER_VALUE_TYPES:
if type(filter.value) not in FILTER_VALUE_TYPES:
raise ValueError("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary")
self.filters.add(filter_)
# TODO: Do we need a remove_filter function?
self.filters.add(filter)
def apply_common_filters(self, stix_objs, query):
"""Evaluates filters against a set of STIX 2.0 objects
@ -321,7 +313,7 @@ class CompositeDataSource(DataSource):
controlled and used by the Data Source Controller object.
"""
def __init__(self, name="CompositeDataSource"):
def __init__(self):
"""
Creates a new STIX Data Source.
@ -330,7 +322,7 @@ class CompositeDataSource(DataSource):
CompositeDataSource instance.
"""
super(CompositeDataSource, self).__init__(name=name)
super(CompositeDataSource, self).__init__()
self.data_sources = {}
def get(self, stix_id, _composite_filters=None):
@ -458,13 +450,13 @@ class CompositeDataSource(DataSource):
"""
for ds in data_sources:
if issubclass(ds.__class__, DataSource):
if ds.id_ in self.data_sources:
if ds.id in self.data_sources:
# data source already attached to Composite Data Source
continue
# add data source to Composite Data Source
# (its id will be its key identifier)
self.data_sources[ds.id_] = ds
self.data_sources[ds.id] = ds
else:
# the Data Source object is not a proper subclass
# of DataSource Abstract Class
@ -480,9 +472,9 @@ class CompositeDataSource(DataSource):
data_source_ids (list): a list of Data Source identifiers.
"""
for id_ in data_source_ids:
if id_ in self.data_sources:
del self.data_sources[id_]
for id in data_source_ids:
if id in self.data_sources:
del self.data_sources[id]
else:
raise ValueError("DataSource 'id' not found in CompositeDataSource collection.")
return

View File

@ -13,14 +13,15 @@ import json
import os
from stix2 import Bundle
from stix2.sources import DataSink, DataSource, DataStore, Filter
from stix2.sources import DataSink, DataSource, DataStore
from stix2.sources.filters import Filter
class FileSystemStore(DataStore):
"""
"""
def __init__(self, name="FileSystemStore", stix_dir="stix_data"):
super(FileSystemStore, self).__init__(name=name)
def __init__(self, stix_dir="stix_data"):
super(FileSystemStore, self).__init__()
self.source = FileSystemSource(stix_dir=stix_dir)
self.sink = FileSystemSink(stix_dir=stix_dir)
@ -28,8 +29,8 @@ class FileSystemStore(DataStore):
class FileSystemSink(DataSink):
"""
"""
def __init__(self, name="FileSystemSink", stix_dir="stix_data"):
super(FileSystemSink, self).__init__(name=name)
def __init__(self, stix_dir="stix_data"):
super(FileSystemSink, self).__init__()
self.stix_dir = os.path.abspath(stix_dir)
# check directory path exists
@ -58,8 +59,8 @@ class FileSystemSink(DataSink):
class FileSystemSource(DataSource):
"""
"""
def __init__(self, name="FileSystemSource", stix_dir="stix_data"):
super(FileSystemSource, self).__init__(name=name)
def __init__(self, stix_dir="stix_data"):
super(FileSystemSource, self).__init__()
self.stix_dir = os.path.abspath(stix_dir)
# check directory path exists
@ -71,8 +72,8 @@ class FileSystemSource(DataSource):
return self.stix_dir
@stix_dir.setter
def stix_dir(self, dir_):
self.stix_dir = dir_
def stix_dir(self, dir):
self.stix_dir = dir
def get(self, stix_id, _composite_filters=None):
"""
@ -92,7 +93,6 @@ class FileSystemSource(DataSource):
of a STIX object, this operation is unnecessary. Pass call to get().
"""
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]
def query(self, query=None, _composite_filters=None):
@ -121,13 +121,13 @@ class FileSystemSource(DataSource):
# the corresponding subdirectories as well
include_paths = []
declude_paths = []
if "type" in [filter_.field for filter_ in file_filters]:
for filter_ in file_filters:
if filter_.field == "type":
if filter_.op == "=":
include_paths.append(os.path.join(self.stix_dir, filter_.value))
elif filter_.op == "!=":
declude_paths.append(os.path.join(self.stix_dir, filter_.value))
if "type" in [filter.field for filter in file_filters]:
for filter in file_filters:
if filter.field == "type":
if filter.op == "=":
include_paths.append(os.path.join(self.stix_dir, filter.value))
elif filter.op == "!=":
declude_paths.append(os.path.join(self.stix_dir, filter.value))
else:
# have to walk entire STIX directory
include_paths.append(self.stix_dir)
@ -144,35 +144,35 @@ class FileSystemSource(DataSource):
# user has specified types that are not wanted (i.e. "!=")
# so query will look in all STIX directories that are not
# the specified type. Compile correct dir paths
for dir_ in os.listdir(self.stix_dir):
if os.path.abspath(dir_) not in declude_paths:
include_paths.append(os.path.abspath(dir_))
for dir in os.listdir(self.stix_dir):
if os.path.abspath(dir) not in declude_paths:
include_paths.append(os.path.abspath(dir))
# grab stix object ID as well - if present in filters, as
# may forgo the loading of STIX content into memory
if "id" in [filter_.field for filter_ in file_filters]:
for filter_ in file_filters:
if filter_.field == "id" and filter_.op == "=":
id_ = filter_.value
if "id" in [filter.field for filter in file_filters]:
for filter in file_filters:
if filter.field == "id" and filter.op == "=":
id = filter.value
break
else:
id_ = None
id = None
else:
id_ = None
id = None
# now iterate through all STIX objs
for path in include_paths:
for root, dirs, files in os.walk(path):
for file_ in files:
if id_:
if id_ == file_.split(".")[0]:
for file in files:
if id:
if id == file.split(".")[0]:
# since ID is specified in one of filters, can evaluate against filename first without loading
stix_obj = json.load(file_)["objects"]
stix_obj = json.load(file)["objects"]
# check against other filters, add if match
all_data.extend(self.apply_common_filters([stix_obj], query))
else:
# have to load into memory regardless to evaluate other filters
stix_obj = json.load(file_)["objects"]
stix_obj = json.load(file)["objects"]
all_data.extend(self.apply_common_filters([stix_obj], query))
all_data = self.deduplicate(all_data)
@ -182,7 +182,7 @@ class FileSystemSource(DataSource):
"""
"""
file_filters = []
for filter_ in query:
if filter_.field == "id" or filter_.field == "type":
file_filters.append(filter_)
for filter in query:
if filter.field == "id" or filter.field == "type":
file_filters.append(filter)
return file_filters

View File

@ -18,20 +18,45 @@ Notes:
"""
import collections
import json
import os
from stix2validator import validate_string
from stix2validator import validate_instance
from stix2 import Bundle
from stix2.sources import DataSink, DataSource, DataStore
from stix2.sources.filters import Filter
def _add(store, stix_data):
"""Adds stix objects to MemoryStore/Source/Sink."""
if isinstance(stix_data, collections.Mapping):
# stix objects are in a bundle
# verify STIX json data
r = validate_instance(stix_data)
# make dictionary of the objects for easy lookup
if r.is_valid:
for stix_obj in stix_data["objects"]:
store.data[stix_obj["id"]] = stix_obj
else:
raise ValueError("Error: data passed was found to not be valid by the STIX 2 Validator: \n%s", r.as_dict())
elif isinstance(stix_data, list):
# stix objects are in a list
for stix_obj in stix_data:
r = validate_instance(stix_obj)
if r.is_valid:
store.data[stix_obj["id"]] = stix_obj
else:
raise ValueError("Error: STIX object %s is not valid under STIX 2 validator.\n%s", stix_obj["id"], r)
else:
raise ValueError("stix_data must be in bundle format or raw list")
class MemoryStore(DataStore):
"""
"""
def __init__(self, name="MemoryStore", stix_data=None):
def __init__(self, stix_data):
"""
Notes:
It doesn't make sense to create a MemoryStore by passing
@ -39,30 +64,11 @@ class MemoryStore(DataStore):
be data concurrency issues. Just as easy to create new MemoryStore.
"""
super(MemoryStore, self).__init__(name=name)
super(MemoryStore, self).__init__()
self.data = {}
if stix_data:
if type(stix_data) == dict:
# stix objects are in a bundle
# verify STIX json data
r = validate_string(json.dumps(stix_data))
# make dictionary of the objects for easy lookup
if r.is_valid:
for stix_obj in stix_data["objects"]:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: json data passed to MemorySink() was found to not be validated by STIX 2 Validator")
print(r)
elif type(stix_data) == list:
# stix objects are in a list
for stix_obj in stix_data:
r = validate_string(json.dumps(stix_obj))
if r.is_valid:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: STIX object %s is not valid under STIX 2 validator." % stix_obj["id"])
print(r)
_add(self, stix_data)
self.source = MemorySource(stix_data=self.data, _store=True)
self.sink = MemorySink(stix_data=self.data, _store=True)
@ -77,72 +83,28 @@ class MemoryStore(DataStore):
class MemorySink(DataSink):
"""
"""
def __init__(self, name="MemorySink", stix_data=None, _store=False):
def __init__(self, stix_data, _store=False):
"""
Args:
stix_data (dictionary OR list): valid STIX 2.0 content in
bundle or a list.
name (string): optional name tag of the data source
_store (bool): if the MemorySink is a part of a DataStore,
in which case "stix_data" is a direct reference to
shared memory with DataSource.
"""
super(MemorySink, self).__init__(name=name)
super(MemorySink, self).__init__()
self.data = {}
if _store:
self.data = stix_data
else:
self.data = {}
if stix_data:
if type(stix_data) == dict:
# stix objects are in a bundle
# verify STIX json data
r = validate_string(json.dumps(stix_data))
# make dictionary of the objects for easy lookup
if r.is_valid:
for stix_obj in stix_data["objects"]:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: json data passed to MemorySink() was found to not be validated by STIX 2 Validator")
print(r)
self.data = {}
elif type(stix_data) == list:
# stix objects are in a list
for stix_obj in stix_data:
r = validate_string(json.dumps(stix_obj))
if r.is_valid:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: STIX object %s is not valid under STIX 2 validator." % stix_obj["id"])
print(r)
else:
raise ValueError("stix_data must be in bundle format or raw list")
elif stix_data:
self.add(stix_data)
def add(self, stix_data):
"""
"""
if type(stix_data) == dict:
# stix data is in bundle
r = validate_string(json.dumps(stix_data))
if r.is_valid:
for stix_obj in stix_data["objects"]:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: json data passed to MemorySink() was found to not be validated by STIX 2 Validator")
print(r)
elif type(stix_data) == list:
# stix data is in list
for stix_obj in stix_data:
r = validate_string(json.dumps(stix_obj))
if r.is_valid:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: STIX object %s is not valid under STIX 2 validator." % stix_obj["id"])
print(r)
else:
raise ValueError("stix_data must be in bundle format or raw list")
_add(self, stix_data)
def save_to_file(self, file_path):
"""
@ -152,47 +114,23 @@ class MemorySink(DataSink):
class MemorySource(DataSource):
def __init__(self, name="MemorySource", stix_data=None, _store=False):
def __init__(self, stix_data, _store=False):
"""
Args:
stix_data (dictionary OR list): valid STIX 2.0 content in
bundle or list.
name (string): optional name tag of the data source.
_store (bool): if the MemorySource is a part of a DataStore,
in which case "stix_data" is a direct reference to shared
memory with DataSink.
"""
super(MemorySource, self).__init__(name=name)
super(MemorySource, self).__init__()
self.data = {}
if _store:
self.data = stix_data
else:
self.data = {}
if stix_data:
if type(stix_data) == dict:
# STIX objects are in a bundle
# verify STIX json data
r = validate_string(json.dumps(stix_data))
# make dictionary of the objects for easy lookup
if r.is_valid:
for stix_obj in stix_data["objects"]:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: json data passed to MemorySource() was found to not be validated by STIX 2 Validator")
print(r.as_dict())
self.data = {}
elif type(stix_data) == list:
# STIX objects are in a list
for stix_obj in stix_data:
r = validate_string(json.dumps(stix_obj))
if r.is_valid:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: STIX object %s is not valid under STIX 2 validator." % stix_obj["id"])
print(r)
else:
raise ValueError("stix_data must be in bundle format or raw list")
elif stix_data:
_add(self, stix_data)
def get(self, stix_id, _composite_filters=None):
"""
@ -221,8 +159,15 @@ class MemorySource(DataSource):
Since Memory sources/sinks don't handle multiple versions of a
STIX object, this operation is unnecessary. Translate call to get().
"""
Args:
stix_id (str): The id of the STIX 2.0 object to retrieve. Should
return a list of objects, all the versions of the object
specified by the "id".
Returns:
(list): STIX object that matched ``stix_id``.
"""
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]
def query(self, query=None, _composite_filters=None):
@ -233,14 +178,11 @@ class MemorySource(DataSource):
# combine all query filters
if self.filters:
query.extend(self.filters.values())
query.extend(list(self.filters))
if _composite_filters:
query.extend(_composite_filters)
# deduplicate data before filtering -> Deduplication is not required as Memory only ever holds one version of an object
# all_data = self.deduplicate(all_data)
# apply STIX common property filters
# Apply STIX common property filters.
all_data = self.apply_common_filters(self.data.values(), query)
return all_data
@ -251,11 +193,10 @@ class MemorySource(DataSource):
file_path = os.path.abspath(file_path)
stix_data = json.load(open(file_path, "r"))
r = validate_string(json.dumps(stix_data))
r = validate_instance(stix_data)
if r.is_valid:
for stix_obj in stix_data["objects"]:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: STIX data loaded from file (%s) was found to not be validated by STIX 2 Validator" % file_path)
print(r)
raise ValueError("Error: STIX data loaded from file (%s) was found to not be validated by STIX 2 Validator.\n%s", file_path, r)

View File

@ -21,7 +21,7 @@ TAXII_FILTERS = ['added_after', 'id', 'type', 'version']
class TAXIICollectionStore(DataStore):
"""
"""
def __init__(self, collection, name="TAXIICollectionStore"):
def __init__(self, collection):
"""
Create a new TAXII Collection Data store
@ -29,7 +29,7 @@ class TAXIICollectionStore(DataStore):
collection (taxii2.Collection): Collection instance
"""
super(TAXIICollectionStore, self).__init__(name=name)
super(TAXIICollectionStore, self).__init__()
self.source = TAXIICollectionSource(collection)
self.sink = TAXIICollectionSink(collection)
@ -37,8 +37,8 @@ class TAXIICollectionStore(DataStore):
class TAXIICollectionSink(DataSink):
"""
"""
def __init__(self, collection, name="TAXIICollectionSink"):
super(TAXIICollectionSink, self).__init__(name=name)
def __init__(self, collection):
super(TAXIICollectionSink, self).__init__()
self.collection = collection
def add(self, stix_obj):
@ -57,8 +57,8 @@ class TAXIICollectionSink(DataSink):
class TAXIICollectionSource(DataSource):
"""
"""
def __init__(self, collection, name="TAXIICollectionSource"):
super(TAXIICollectionSource, self).__init__(name=name)
def __init__(self, collection):
super(TAXIICollectionSource, self).__init__()
self.collection = collection
def get(self, stix_id, _composite_filters=None):

View File

@ -4,7 +4,7 @@ from taxii2client import Collection
from stix2.sources import (CompositeDataSource, DataSink, DataSource,
DataStore, make_id, taxii)
from stix2.sources.filters import Filter
from stix2.sources.memory import MemorySource
from stix2.sources.memory import MemorySource, MemoryStore
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
@ -125,7 +125,7 @@ STIX_OBJS2 = [IND6, IND7, IND8]
STIX_OBJS1 = [IND1, IND2, IND3, IND4, IND5]
def test_ds_smoke():
def test_ds_abstract_class_smoke():
ds1 = DataSource()
ds2 = DataSink()
ds3 = DataStore(source=ds1, sink=ds2)
@ -143,14 +143,36 @@ def test_ds_smoke():
ds3.query([Filter("id", "=", "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")])
def test_memory_store_smoke():
# Initialize MemoryStore with dict
ms = MemoryStore(STIX_OBJS1)
# Add item to sink
ms.add(dict(id="bundle--%s" % make_id(),
objects=STIX_OBJS2,
spec_version="2.0",
type="bundle"))
resp = ms.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
assert len(resp) == 1
resp = ms.get("indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")
assert resp["id"] == "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"
query = [Filter('type', '=', 'malware')]
resp = ms.query(query)
assert len(resp) == 0
def test_ds_taxii(collection):
ds = taxii.TAXIICollectionSource(collection)
assert ds.name == 'TAXIICollectionSource'
assert ds.collection is not None
def test_ds_taxii_name(collection):
ds = taxii.TAXIICollectionSource(collection, name='My Data Source Name')
assert ds.name == "My Data Source Name"
ds = taxii.TAXIICollectionSource(collection)
assert ds.collection is not None
def test_parse_taxii_filters():
@ -211,7 +233,7 @@ def test_add_get_remove_filter(ds):
with pytest.raises(ValueError) as excinfo:
ds.add_filter(invalid_filters[1])
assert str(excinfo.value) == "Filter operation(from 'op' field) not supported"
assert str(excinfo.value) == "Filter operation (from 'op' field) not supported"
with pytest.raises(ValueError) as excinfo:
ds.add_filter(invalid_filters[2])
@ -272,6 +294,22 @@ def test_apply_common_filters(ds):
"source_ref": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade",
"target_ref": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111",
"type": "relationship"
},
{
"id": "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef",
"created": "2016-02-14T00:00:00.000Z",
"created_by_ref": "identity--00000000-0000-0000-0000-b8e91df99dc9",
"modified": "2016-02-14T00:00:00.000Z",
"type": "vulnerability",
"name": "CVE-2014-0160",
"description": "The (1) TLS...",
"external_references": [
{
"source_name": "cve",
"external_id": "CVE-2014-0160"
}
],
"labels": ["heartbleed", "has-logo"]
}
]
@ -286,67 +324,111 @@ def test_apply_common_filters(ds):
Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"),
Filter("granular_markings.selectors", "in", "relationship_type"),
Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"),
Filter("external_references.external_id", "in", "CVE-2014-0160,CVE-2017-6608"),
Filter("created_by_ref", "=", "identity--00000000-0000-0000-0000-b8e91df99dc9"),
Filter("object_marking_refs", "=", "marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9"),
Filter("granular_markings.selectors", "in", "description"),
Filter("external_references.source_name", "=", "CVE"),
]
# "Return any object whose type is not relationship"
resp = ds.apply_common_filters(stix_objs, [filters[0]])
ids = [r['id'] for r in resp]
assert stix_objs[0]['id'] in ids
assert stix_objs[1]['id'] in ids
assert stix_objs[3]['id'] in ids
assert len(ids) == 3
# "Return any object that matched id relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"
resp = ds.apply_common_filters(stix_objs, [filters[1]])
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
# "Return any object that contains remote-access-trojan in labels"
resp = ds.apply_common_filters(stix_objs, [filters[2]])
assert resp[0]['id'] == stix_objs[0]['id']
resp = ds.apply_common_filters(stix_objs, [filters[3]])
assert resp[0]['id'] == stix_objs[0]['id']
assert len(resp) == 1
# "Return any object created after 2015-01-01T01:00:00.000Z"
resp = ds.apply_common_filters(stix_objs, [filters[3]])
assert resp[0]['id'] == stix_objs[0]['id']
assert len(resp) == 2
# "Return any revoked object"
resp = ds.apply_common_filters(stix_objs, [filters[4]])
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
# "Return any object whose not revoked"
# Note that if 'revoked' property is not present in object.
# Currently we can't use such an expression to filter for...
# Currently we can't use such an expression to filter for... :(
resp = ds.apply_common_filters(stix_objs, [filters[5]])
assert len(resp) == 0
# Assert unknown operator for _boolean() raises exception.
with pytest.raises(ValueError) as excinfo:
ds.apply_common_filters(stix_objs, [filters[6]])
assert str(excinfo.value) == ("Error, filter operator: {0} not supported "
"for specified field: {1}").format(filters[6].op,
filters[6].field)
"for specified field: {1}"
.format(filters[6].op, filters[6].field))
# "Return any object that matches marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9 in object_marking_refs"
resp = ds.apply_common_filters(stix_objs, [filters[7]])
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
# "Return any object that contains relationship_type in their selectors AND
# also has marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed in marking_ref"
resp = ds.apply_common_filters(stix_objs, [filters[8], filters[9]])
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
# "Return any object that contains CVE-2014-0160,CVE-2017-6608 in their external_id"
resp = ds.apply_common_filters(stix_objs, [filters[10]])
assert resp[0]['id'] == stix_objs[3]['id']
assert len(resp) == 1
# "Return any object that matches created_by_ref identity--00000000-0000-0000-0000-b8e91df99dc9"
resp = ds.apply_common_filters(stix_objs, [filters[11]])
assert len(resp) == 1
# "Return any object that matches marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9 in object_marking_refs" (None)
resp = ds.apply_common_filters(stix_objs, [filters[12]])
assert len(resp) == 0
# "Return any object that contains description in its selectors" (None)
resp = ds.apply_common_filters(stix_objs, [filters[13]])
assert len(resp) == 0
# "Return any object that object that matches CVE in source_name" (None, case sensitive)
resp = ds.apply_common_filters(stix_objs, [filters[14]])
assert len(resp) == 0
def test_filters0(ds):
# "Return any object modified before 2017-01-28T13:49:53.935Z"
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")])
assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2
def test_filters1(ds):
# "Return any object modified after 2017-01-28T13:49:53.935Z"
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")])
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1
def test_filters2(ds):
# "Return any object modified after or on 2017-01-28T13:49:53.935Z"
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")])
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 3
def test_filters3(ds):
# "Return any object modified before or on 2017-01-28T13:49:53.935Z"
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")])
assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2
@ -354,14 +436,15 @@ def test_filters3(ds):
def test_filters4(ds):
fltr4 = Filter("modified", "?", "2017-01-27T13:49:53.935Z")
# Assert unknown operator for _all() raises exception.
with pytest.raises(ValueError) as excinfo:
ds.apply_common_filters(STIX_OBJS2, [fltr4])
assert str(excinfo.value) == ("Error, filter operator: {0} not supported "
"for specified field: {1}").format(fltr4.op, fltr4.field)
def test_filters5(ds):
# "Return any object whose id is not indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")])
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1
@ -369,6 +452,7 @@ def test_filters5(ds):
def test_filters6(ds):
fltr6 = Filter("id", "?", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")
# Assert unknown operator for _id() raises exception.
with pytest.raises(ValueError) as excinfo:
ds.apply_common_filters(STIX_OBJS2, [fltr6])
@ -378,11 +462,12 @@ def test_filters6(ds):
def test_filters7(ds):
fltr7 = Filter("notacommonproperty", "=", "bar")
# Assert unknown field raises exception.
with pytest.raises(ValueError) as excinfo:
ds.apply_common_filters(STIX_OBJS2, [fltr7])
assert str(excinfo.value) == ("Error, field: {0} is not supported for "
"filtering on.".format(fltr7.field))
"filtering on.").format(fltr7.field)
def test_deduplicate(ds):
@ -413,12 +498,12 @@ def test_add_remove_composite_datasource():
assert len(cds.get_all_data_sources()) == 2
cds.remove_data_source([ds1.id_, ds2.id_])
cds.remove_data_source([ds1.id, ds2.id])
assert len(cds.get_all_data_sources()) == 0
with pytest.raises(ValueError):
cds.remove_data_source([ds3.id_])
cds.remove_data_source([ds3.id])
def test_composite_datasource_operations():
@ -452,25 +537,3 @@ def test_composite_datasource_operations():
# STIX_OBJS2 has indicator with later time, one with different id, one with
# original time in STIX_OBJS1
assert len(results) == 3
# def test_data_source_file():
# ds = file.FileDataSource()
#
# assert ds.name == "DataSource"
#
#
# def test_data_source_name():
# ds = file.FileDataSource(name="My File Data Source")
#
# assert ds.name == "My File Data Source"
#
#
# def test_data_source_get():
# ds = file.FileDataSource(name="My File Data Source")
#
# with pytest.raises(NotImplementedError):
# ds.get("foo")
#
# #filter testing
# def test_add_filter():
# ds = file.FileDataSource()

View File

@ -242,7 +242,7 @@ def test_artifact_example_dependency_error():
stix2.Artifact(url="http://example.com/sirvizio.exe")
assert excinfo.value.dependencies == [("hashes", "url")]
assert str(excinfo.value) == "The property dependencies for Artifact: (hashes) are not met."
assert str(excinfo.value) == "The property dependencies for Artifact: (hashes, url) are not met."
@pytest.mark.parametrize("data", [