Don't Bundlify data in FileSystemStore
Don't wrap objects in a Bundle when adding them to a FileSystemStore, but still support getting objects from FileSystemStore that were saved in a bundle. Also: - Add option to allow custom content in FileSystemStore - Simplify an if statement - Improve FileSystem docstrings - Remove an unnecessary parse() call in FileSystemSource.get()stix2.0
parent
84094e9f79
commit
e1d8c2872e
|
@ -44,7 +44,7 @@ class DataStore(object):
|
||||||
self.source = source
|
self.source = source
|
||||||
self.sink = sink
|
self.sink = sink
|
||||||
|
|
||||||
def get(self, stix_id):
|
def get(self, stix_id, allow_custom=False):
|
||||||
"""Retrieve the most recent version of a single STIX object by ID.
|
"""Retrieve the most recent version of a single STIX object by ID.
|
||||||
|
|
||||||
Translate get() call to the appropriate DataSource call.
|
Translate get() call to the appropriate DataSource call.
|
||||||
|
@ -57,9 +57,9 @@ class DataStore(object):
|
||||||
object specified by the "id".
|
object specified by the "id".
|
||||||
|
|
||||||
"""
|
"""
|
||||||
return self.source.get(stix_id)
|
return self.source.get(stix_id, allow_custom=allow_custom)
|
||||||
|
|
||||||
def all_versions(self, stix_id):
|
def all_versions(self, stix_id, allow_custom=False):
|
||||||
"""Retrieve all versions of a single STIX object by ID.
|
"""Retrieve all versions of a single STIX object by ID.
|
||||||
|
|
||||||
Implement: Translate all_versions() call to the appropriate DataSource call
|
Implement: Translate all_versions() call to the appropriate DataSource call
|
||||||
|
@ -71,9 +71,9 @@ class DataStore(object):
|
||||||
stix_objs (list): a list of STIX objects
|
stix_objs (list): a list of STIX objects
|
||||||
|
|
||||||
"""
|
"""
|
||||||
return self.source.all_versions(stix_id)
|
return self.source.all_versions(stix_id, allow_custom=allow_custom)
|
||||||
|
|
||||||
def query(self, query):
|
def query(self, query, allow_custom=False):
|
||||||
"""Retrieve STIX objects matching a set of filters.
|
"""Retrieve STIX objects matching a set of filters.
|
||||||
|
|
||||||
Implement: Specific data source API calls, processing,
|
Implement: Specific data source API calls, processing,
|
||||||
|
@ -89,7 +89,7 @@ class DataStore(object):
|
||||||
"""
|
"""
|
||||||
return self.source.query(query=query)
|
return self.source.query(query=query)
|
||||||
|
|
||||||
def add(self, stix_objs):
|
def add(self, stix_objs, allow_custom=False):
|
||||||
"""Store STIX objects.
|
"""Store STIX objects.
|
||||||
|
|
||||||
Translates add() to the appropriate DataSink call.
|
Translates add() to the appropriate DataSink call.
|
||||||
|
@ -97,7 +97,7 @@ class DataStore(object):
|
||||||
Args:
|
Args:
|
||||||
stix_objs (list): a list of STIX objects
|
stix_objs (list): a list of STIX objects
|
||||||
"""
|
"""
|
||||||
return self.sink.add(stix_objs)
|
return self.sink.add(stix_objs, allow_custom=allow_custom)
|
||||||
|
|
||||||
|
|
||||||
class DataSink(object):
|
class DataSink(object):
|
||||||
|
@ -111,7 +111,7 @@ class DataSink(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.id = make_id()
|
self.id = make_id()
|
||||||
|
|
||||||
def add(self, stix_objs):
|
def add(self, stix_objs, allow_custom=False):
|
||||||
"""Store STIX objects.
|
"""Store STIX objects.
|
||||||
|
|
||||||
Implement: Specific data sink API calls, processing,
|
Implement: Specific data sink API calls, processing,
|
||||||
|
@ -139,7 +139,7 @@ class DataSource(object):
|
||||||
self.id = make_id()
|
self.id = make_id()
|
||||||
self.filters = set()
|
self.filters = set()
|
||||||
|
|
||||||
def get(self, stix_id, _composite_filters=None):
|
def get(self, stix_id, _composite_filters=None, allow_custom=False):
|
||||||
"""
|
"""
|
||||||
Implement: Specific data source API calls, processing,
|
Implement: Specific data source API calls, processing,
|
||||||
functionality required for retrieving data from the data source
|
functionality required for retrieving data from the data source
|
||||||
|
@ -158,7 +158,7 @@ class DataSource(object):
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def all_versions(self, stix_id, _composite_filters=None):
|
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
|
||||||
"""
|
"""
|
||||||
Implement: Similar to get() except returns list of all object versions of
|
Implement: Similar to get() except returns list of all object versions of
|
||||||
the specified "id". In addition, implement the specific data
|
the specified "id". In addition, implement the specific data
|
||||||
|
@ -179,7 +179,7 @@ class DataSource(object):
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def query(self, query, _composite_filters=None):
|
def query(self, query, _composite_filters=None, allow_custom=False):
|
||||||
"""
|
"""
|
||||||
Implement:Implement the specific data source API calls, processing,
|
Implement:Implement the specific data source API calls, processing,
|
||||||
functionality required for retrieving query from the data source
|
functionality required for retrieving query from the data source
|
||||||
|
@ -224,7 +224,7 @@ class CompositeDataSource(DataSource):
|
||||||
super(CompositeDataSource, self).__init__()
|
super(CompositeDataSource, self).__init__()
|
||||||
self.data_sources = []
|
self.data_sources = []
|
||||||
|
|
||||||
def get(self, stix_id, _composite_filters=None):
|
def get(self, stix_id, _composite_filters=None, allow_custom=False):
|
||||||
"""Retrieve STIX object by STIX ID
|
"""Retrieve STIX object by STIX ID
|
||||||
|
|
||||||
Federated retrieve method, iterates through all DataSources
|
Federated retrieve method, iterates through all DataSources
|
||||||
|
@ -259,7 +259,7 @@ class CompositeDataSource(DataSource):
|
||||||
|
|
||||||
# for every configured Data Source, call its retrieve handler
|
# for every configured Data Source, call its retrieve handler
|
||||||
for ds in self.data_sources:
|
for ds in self.data_sources:
|
||||||
data = ds.get(stix_id=stix_id, _composite_filters=all_filters)
|
data = ds.get(stix_id=stix_id, _composite_filters=all_filters, allow_custom=allow_custom)
|
||||||
if data:
|
if data:
|
||||||
all_data.append(data)
|
all_data.append(data)
|
||||||
|
|
||||||
|
@ -272,7 +272,7 @@ class CompositeDataSource(DataSource):
|
||||||
|
|
||||||
return stix_obj
|
return stix_obj
|
||||||
|
|
||||||
def all_versions(self, stix_id, _composite_filters=None):
|
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
|
||||||
"""Retrieve STIX objects by STIX ID
|
"""Retrieve STIX objects by STIX ID
|
||||||
|
|
||||||
Federated all_versions retrieve method - iterates through all DataSources
|
Federated all_versions retrieve method - iterates through all DataSources
|
||||||
|
@ -305,7 +305,7 @@ class CompositeDataSource(DataSource):
|
||||||
|
|
||||||
# retrieve STIX objects from all configured data sources
|
# retrieve STIX objects from all configured data sources
|
||||||
for ds in self.data_sources:
|
for ds in self.data_sources:
|
||||||
data = ds.all_versions(stix_id=stix_id, _composite_filters=all_filters)
|
data = ds.all_versions(stix_id=stix_id, _composite_filters=all_filters, allow_custom=allow_custom)
|
||||||
all_data.extend(data)
|
all_data.extend(data)
|
||||||
|
|
||||||
# remove exact duplicates (where duplicates are STIX 2.0 objects
|
# remove exact duplicates (where duplicates are STIX 2.0 objects
|
||||||
|
@ -315,7 +315,7 @@ class CompositeDataSource(DataSource):
|
||||||
|
|
||||||
return all_data
|
return all_data
|
||||||
|
|
||||||
def query(self, query=None, _composite_filters=None):
|
def query(self, query=None, _composite_filters=None, allow_custom=False):
|
||||||
"""Retrieve STIX objects that match query
|
"""Retrieve STIX objects that match query
|
||||||
|
|
||||||
Federate the query to all DataSources attached to the
|
Federate the query to all DataSources attached to the
|
||||||
|
@ -351,7 +351,7 @@ class CompositeDataSource(DataSource):
|
||||||
# federate query to all attached data sources,
|
# federate query to all attached data sources,
|
||||||
# pass composite filters to id
|
# pass composite filters to id
|
||||||
for ds in self.data_sources:
|
for ds in self.data_sources:
|
||||||
data = ds.query(query=query, _composite_filters=all_filters)
|
data = ds.query(query=query, _composite_filters=all_filters, allow_custom=allow_custom)
|
||||||
all_data.extend(data)
|
all_data.extend(data)
|
||||||
|
|
||||||
# remove exact duplicates (where duplicates are STIX 2.0
|
# remove exact duplicates (where duplicates are STIX 2.0
|
||||||
|
|
|
@ -18,9 +18,8 @@ from stix2.utils import deduplicate
|
||||||
|
|
||||||
|
|
||||||
class FileSystemStore(DataStore):
|
class FileSystemStore(DataStore):
|
||||||
"""FileSystemStore
|
"""Interface to a file directory of STIX objects.
|
||||||
|
|
||||||
Provides an interface to an file directory of STIX objects.
|
|
||||||
FileSystemStore is a wrapper around a paired FileSystemSink
|
FileSystemStore is a wrapper around a paired FileSystemSink
|
||||||
and FileSystemSource.
|
and FileSystemSource.
|
||||||
|
|
||||||
|
@ -40,10 +39,8 @@ class FileSystemStore(DataStore):
|
||||||
|
|
||||||
|
|
||||||
class FileSystemSink(DataSink):
|
class FileSystemSink(DataSink):
|
||||||
"""FileSystemSink
|
"""Interface for adding/pushing STIX objects to file directory of STIX
|
||||||
|
objects.
|
||||||
Provides an interface for adding/pushing STIX objects
|
|
||||||
to file directory of STIX objects.
|
|
||||||
|
|
||||||
Can be paired with a FileSystemSource, together as the two
|
Can be paired with a FileSystemSource, together as the two
|
||||||
components of a FileSystemStore.
|
components of a FileSystemStore.
|
||||||
|
@ -63,15 +60,19 @@ class FileSystemSink(DataSink):
|
||||||
def stix_dir(self):
|
def stix_dir(self):
|
||||||
return self._stix_dir
|
return self._stix_dir
|
||||||
|
|
||||||
def add(self, stix_data=None):
|
def add(self, stix_data=None, allow_custom=False):
|
||||||
"""add STIX objects to file directory
|
"""Add STIX objects to file directory.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_data (STIX object OR dict OR str OR list): valid STIX 2.0 content
|
stix_data (STIX object OR dict OR str OR list): valid STIX 2.0 content
|
||||||
in a STIX object(or list of), dict (or list of), or a STIX 2.0
|
in a STIX object (or list of), dict (or list of), or a STIX 2.0
|
||||||
json encoded string
|
json encoded string.
|
||||||
|
|
||||||
|
Note:
|
||||||
|
``stix_data`` can be a Bundle object, but each object in it will be
|
||||||
|
saved separately; you will be able to retrieve any of the objects
|
||||||
|
the Bundle contained, but not the Bundle itself.
|
||||||
|
|
||||||
TODO: Bundlify STIX content or no? When dumping to disk.
|
|
||||||
"""
|
"""
|
||||||
def _check_path_and_write(stix_dir, stix_obj):
|
def _check_path_and_write(stix_dir, stix_obj):
|
||||||
path = os.path.join(stix_dir, stix_obj["type"], stix_obj["id"] + ".json")
|
path = os.path.join(stix_dir, stix_obj["type"], stix_obj["id"] + ".json")
|
||||||
|
@ -80,45 +81,41 @@ class FileSystemSink(DataSink):
|
||||||
os.makedirs(os.path.dirname(path))
|
os.makedirs(os.path.dirname(path))
|
||||||
|
|
||||||
with open(path, "w") as f:
|
with open(path, "w") as f:
|
||||||
# Bundle() can take dict or STIX obj as argument
|
f.write(str(stix_obj))
|
||||||
f.write(str(Bundle(stix_obj)))
|
|
||||||
|
|
||||||
if isinstance(stix_data, (STIXDomainObject, STIXRelationshipObject, MarkingDefinition)):
|
if isinstance(stix_data, (STIXDomainObject, STIXRelationshipObject, MarkingDefinition)):
|
||||||
# adding python STIX object
|
# adding python STIX object
|
||||||
_check_path_and_write(self._stix_dir, stix_data)
|
_check_path_and_write(self._stix_dir, stix_data)
|
||||||
|
|
||||||
elif isinstance(stix_data, dict):
|
elif isinstance(stix_data, (str, dict)):
|
||||||
|
stix_data = parse(stix_data, allow_custom)
|
||||||
if stix_data["type"] == "bundle":
|
if stix_data["type"] == "bundle":
|
||||||
# adding json-formatted Bundle - extracting STIX objects
|
# extract STIX objects
|
||||||
for stix_obj in stix_data["objects"]:
|
for stix_obj in stix_data.get("objects", []):
|
||||||
self.add(stix_obj)
|
self.add(stix_obj)
|
||||||
else:
|
else:
|
||||||
# adding json-formatted STIX
|
# adding json-formatted STIX
|
||||||
_check_path_and_write(self._stix_dir, stix_data)
|
_check_path_and_write(self._stix_dir, stix_data)
|
||||||
|
|
||||||
elif isinstance(stix_data, (str, Bundle)):
|
elif isinstance(stix_data, Bundle):
|
||||||
# adding json encoded string of STIX content
|
# recursively add individual STIX objects
|
||||||
stix_data = parse(stix_data)
|
for stix_obj in stix_data.get("objects", []):
|
||||||
if stix_data["type"] == "bundle":
|
self.add(stix_obj)
|
||||||
for stix_obj in stix_data.get("objects", []):
|
|
||||||
self.add(stix_obj)
|
|
||||||
else:
|
|
||||||
self.add(stix_data)
|
|
||||||
|
|
||||||
elif isinstance(stix_data, list):
|
elif isinstance(stix_data, list):
|
||||||
# if list, recurse call on individual STIX objects
|
# recursively add individual STIX objects
|
||||||
for stix_obj in stix_data:
|
for stix_obj in stix_data:
|
||||||
self.add(stix_obj)
|
self.add(stix_obj)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise ValueError("stix_data must be a STIX object(or list of), json formatted STIX(or list of) or a json formatted STIX bundle")
|
raise ValueError("stix_data must be a STIX object (or list of), "
|
||||||
|
"json formatted STIX (or list of), "
|
||||||
|
"or a json formatted STIX bundle")
|
||||||
|
|
||||||
|
|
||||||
class FileSystemSource(DataSource):
|
class FileSystemSource(DataSource):
|
||||||
"""FileSystemSource
|
"""Interface for searching/retrieving STIX objects from a STIX object file
|
||||||
|
directory.
|
||||||
Provides an interface for searching/retrieving
|
|
||||||
STIX objects from a STIX object file directory.
|
|
||||||
|
|
||||||
Can be paired with a FileSystemSink, together as the two
|
Can be paired with a FileSystemSink, together as the two
|
||||||
components of a FileSystemStore.
|
components of a FileSystemStore.
|
||||||
|
@ -138,8 +135,8 @@ class FileSystemSource(DataSource):
|
||||||
def stix_dir(self):
|
def stix_dir(self):
|
||||||
return self._stix_dir
|
return self._stix_dir
|
||||||
|
|
||||||
def get(self, stix_id, _composite_filters=None):
|
def get(self, stix_id, _composite_filters=None, allow_custom=False):
|
||||||
"""retrieve STIX object from file directory via STIX ID
|
"""Retrieve STIX object from file directory via STIX ID.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_id (str): The STIX ID of the STIX object to be retrieved.
|
stix_id (str): The STIX ID of the STIX object to be retrieved.
|
||||||
|
@ -155,18 +152,17 @@ class FileSystemSource(DataSource):
|
||||||
"""
|
"""
|
||||||
query = [Filter("id", "=", stix_id)]
|
query = [Filter("id", "=", stix_id)]
|
||||||
|
|
||||||
all_data = self.query(query=query, _composite_filters=_composite_filters)
|
all_data = self.query(query=query, _composite_filters=_composite_filters, allow_custom=allow_custom)
|
||||||
|
|
||||||
if all_data:
|
if all_data:
|
||||||
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0]
|
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0]
|
||||||
stix_obj = parse(stix_obj)
|
|
||||||
else:
|
else:
|
||||||
stix_obj = None
|
stix_obj = None
|
||||||
|
|
||||||
return stix_obj
|
return stix_obj
|
||||||
|
|
||||||
def all_versions(self, stix_id, _composite_filters=None):
|
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
|
||||||
"""retrieve STIX object from file directory via STIX ID, all versions
|
"""Retrieve STIX object from file directory via STIX ID, all versions.
|
||||||
|
|
||||||
Note: Since FileSystem sources/sinks don't handle multiple versions
|
Note: Since FileSystem sources/sinks don't handle multiple versions
|
||||||
of a STIX object, this operation is unnecessary. Pass call to get().
|
of a STIX object, this operation is unnecessary. Pass call to get().
|
||||||
|
@ -181,11 +177,12 @@ class FileSystemSource(DataSource):
|
||||||
(list): of STIX objects that has the supplied STIX ID.
|
(list): of STIX objects that has the supplied STIX ID.
|
||||||
The STIX objects are loaded from their json files, parsed into
|
The STIX objects are loaded from their json files, parsed into
|
||||||
a python STIX objects and then returned
|
a python STIX objects and then returned
|
||||||
"""
|
|
||||||
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]
|
|
||||||
|
|
||||||
def query(self, query=None, _composite_filters=None):
|
"""
|
||||||
"""search and retrieve STIX objects based on the complete query
|
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters, allow_custom=allow_custom)]
|
||||||
|
|
||||||
|
def query(self, query=None, _composite_filters=None, allow_custom=False):
|
||||||
|
"""Search and retrieve STIX objects based on the complete query.
|
||||||
|
|
||||||
A "complete query" includes the filters from the query, the filters
|
A "complete query" includes the filters from the query, the filters
|
||||||
attached to MemorySource, and any filters passed from a
|
attached to MemorySource, and any filters passed from a
|
||||||
|
@ -275,34 +272,32 @@ class FileSystemSource(DataSource):
|
||||||
for path in include_paths:
|
for path in include_paths:
|
||||||
for root, dirs, files in os.walk(path):
|
for root, dirs, files in os.walk(path):
|
||||||
for file_ in files:
|
for file_ in files:
|
||||||
if id_:
|
if not id_ or id_ == file_.split(".")[0]:
|
||||||
if id_ == file_.split(".")[0]:
|
|
||||||
# since ID is specified in one of filters, can evaluate against filename first without loading
|
|
||||||
stix_obj = json.load(open(os.path.join(root, file_)))["objects"][0]
|
|
||||||
# check against other filters, add if match
|
|
||||||
all_data.extend(apply_common_filters([stix_obj], query))
|
|
||||||
else:
|
|
||||||
# have to load into memory regardless to evaluate other filters
|
# have to load into memory regardless to evaluate other filters
|
||||||
stix_obj = json.load(open(os.path.join(root, file_)))["objects"][0]
|
stix_obj = json.load(open(os.path.join(root, file_)))
|
||||||
|
if stix_obj.get('type', '') == 'bundle':
|
||||||
|
stix_obj = stix_obj['objects'][0]
|
||||||
|
# check against other filters, add if match
|
||||||
all_data.extend(apply_common_filters([stix_obj], query))
|
all_data.extend(apply_common_filters([stix_obj], query))
|
||||||
|
|
||||||
all_data = deduplicate(all_data)
|
all_data = deduplicate(all_data)
|
||||||
|
|
||||||
# parse python STIX objects from the STIX object dicts
|
# parse python STIX objects from the STIX object dicts
|
||||||
stix_objs = [parse(stix_obj_dict) for stix_obj_dict in all_data]
|
stix_objs = [parse(stix_obj_dict, allow_custom) for stix_obj_dict in all_data]
|
||||||
|
|
||||||
return stix_objs
|
return stix_objs
|
||||||
|
|
||||||
def _parse_file_filters(self, query):
|
def _parse_file_filters(self, query):
|
||||||
"""utility method to extract STIX common filters
|
"""Extract STIX common filters.
|
||||||
that can used to possibly speed up querying STIX objects
|
|
||||||
from the file system
|
Possibly speeds up querying STIX objects from the file system.
|
||||||
|
|
||||||
Extracts filters that are for the "id" and "type" field of
|
Extracts filters that are for the "id" and "type" field of
|
||||||
a STIX object. As the file directory is organized by STIX
|
a STIX object. As the file directory is organized by STIX
|
||||||
object type with filenames that are equivalent to the STIX
|
object type with filenames that are equivalent to the STIX
|
||||||
object ID, these filters can be used first to reduce the
|
object ID, these filters can be used first to reduce the
|
||||||
search space of a FileSystemStore(or FileSystemSink)
|
search space of a FileSystemStore (or FileSystemSink).
|
||||||
|
|
||||||
"""
|
"""
|
||||||
file_filters = set()
|
file_filters = set()
|
||||||
for filter_ in query:
|
for filter_ in query:
|
||||||
|
|
|
@ -1,16 +1,9 @@
|
||||||
{
|
{
|
||||||
"id": "bundle--2ed6ab6a-ca68-414f-8493-e4db8b75dd51",
|
"created": "2017-05-31T21:30:41.022744Z",
|
||||||
"objects": [
|
"created_by_ref": "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5",
|
||||||
{
|
"description": "Identify unnecessary system utilities or potentially malicious software that may be used to collect data from a network share, and audit and/or block them by using whitelisting[[CiteRef::Beechey 2010]] tools, like AppLocker,[[CiteRef::Windows Commands JPCERT]][[CiteRef::NSA MS AppLocker]] or Software Restriction Policies[[CiteRef::Corio 2008]] where appropriate.[[CiteRef::TechNet Applocker vs SRP]]",
|
||||||
"created": "2017-05-31T21:30:41.022744Z",
|
"id": "course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd",
|
||||||
"created_by_ref": "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5",
|
"modified": "2017-05-31T21:30:41.022744Z",
|
||||||
"description": "Identify unnecessary system utilities or potentially malicious software that may be used to collect data from a network share, and audit and/or block them by using whitelisting[[CiteRef::Beechey 2010]] tools, like AppLocker,[[CiteRef::Windows Commands JPCERT]][[CiteRef::NSA MS AppLocker]] or Software Restriction Policies[[CiteRef::Corio 2008]] where appropriate.[[CiteRef::TechNet Applocker vs SRP]]",
|
"name": "Data from Network Shared Drive Mitigation",
|
||||||
"id": "course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd",
|
"type": "course-of-action"
|
||||||
"modified": "2017-05-31T21:30:41.022744Z",
|
|
||||||
"name": "Data from Network Shared Drive Mitigation",
|
|
||||||
"type": "course-of-action"
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"spec_version": "2.0",
|
|
||||||
"type": "bundle"
|
|
||||||
}
|
}
|
|
@ -3,8 +3,8 @@ import shutil
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from stix2 import (Bundle, Campaign, FileSystemSink, FileSystemSource,
|
from stix2 import (Bundle, Campaign, CustomObject, FileSystemSink,
|
||||||
FileSystemStore, Filter)
|
FileSystemSource, FileSystemStore, Filter, properties)
|
||||||
|
|
||||||
FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
|
FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
|
||||||
|
|
||||||
|
@ -213,8 +213,13 @@ def test_filesystem_sink_add_objects_list(fs_sink, fs_source):
|
||||||
os.remove(os.path.join(FS_PATH, "campaign", camp7_r.id + ".json"))
|
os.remove(os.path.join(FS_PATH, "campaign", camp7_r.id + ".json"))
|
||||||
|
|
||||||
|
|
||||||
def test_filesystem_store_get(fs_store):
|
def test_filesystem_store_get_stored_as_bundle(fs_store):
|
||||||
# get()
|
coa = fs_store.get("course-of-action--95ddb356-7ba0-4bd9-a889-247262b8946f")
|
||||||
|
assert coa.id == "course-of-action--95ddb356-7ba0-4bd9-a889-247262b8946f"
|
||||||
|
assert coa.type == "course-of-action"
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_store_get_stored_as_object(fs_store):
|
||||||
coa = fs_store.get("course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd")
|
coa = fs_store.get("course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd")
|
||||||
assert coa.id == "course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd"
|
assert coa.id == "course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd"
|
||||||
assert coa.type == "course-of-action"
|
assert coa.type == "course-of-action"
|
||||||
|
@ -250,6 +255,51 @@ def test_filesystem_store_add(fs_store):
|
||||||
os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json"))
|
os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json"))
|
||||||
|
|
||||||
|
|
||||||
def test_filesystem_add_object_with_custom_property_in_bundle(fs_store):
|
def test_filesystem_add_bundle_object(fs_store):
|
||||||
bundle = Bundle()
|
bundle = Bundle()
|
||||||
fs_store.add(bundle)
|
fs_store.add(bundle)
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_object_with_custom_property(fs_store):
|
||||||
|
camp = Campaign(name="Scipio Africanus",
|
||||||
|
objective="Defeat the Carthaginians",
|
||||||
|
x_empire="Roman",
|
||||||
|
allow_custom=True)
|
||||||
|
|
||||||
|
fs_store.add(camp, True)
|
||||||
|
|
||||||
|
camp_r = fs_store.get(camp.id, True)
|
||||||
|
assert camp_r.id == camp.id
|
||||||
|
assert camp_r.x_empire == camp.x_empire
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_object_with_custom_property_in_bundle(fs_store):
|
||||||
|
camp = Campaign(name="Scipio Africanus",
|
||||||
|
objective="Defeat the Carthaginians",
|
||||||
|
x_empire="Roman",
|
||||||
|
allow_custom=True)
|
||||||
|
|
||||||
|
bundle = Bundle(camp, allow_custom=True)
|
||||||
|
fs_store.add(bundle, True)
|
||||||
|
|
||||||
|
camp_r = fs_store.get(camp.id, True)
|
||||||
|
assert camp_r.id == camp.id
|
||||||
|
assert camp_r.x_empire == camp.x_empire
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_custom_object(fs_store):
|
||||||
|
@CustomObject('x-new-obj', [
|
||||||
|
('property1', properties.StringProperty(required=True)),
|
||||||
|
])
|
||||||
|
class NewObj():
|
||||||
|
pass
|
||||||
|
|
||||||
|
newobj = NewObj(property1='something')
|
||||||
|
fs_store.add(newobj, True)
|
||||||
|
|
||||||
|
newobj_r = fs_store.get(newobj.id, True)
|
||||||
|
assert newobj_r.id == newobj.id
|
||||||
|
assert newobj_r.property1 == 'something'
|
||||||
|
|
||||||
|
# remove dir
|
||||||
|
shutil.rmtree(os.path.join(FS_PATH, "x-new-obj"), True)
|
||||||
|
|
Loading…
Reference in New Issue