cti-python-stix2/stix2/datastore/filesystem.py

344 lines
14 KiB
Python
Raw Normal View History

2017-07-12 16:58:31 +02:00
"""
Python STIX 2.0 FileSystem Source/Sink
"""
import json
import os
from stix2.core import Bundle, parse
from stix2.datastore import DataSink, DataSource, DataStoreMixin
2018-04-11 19:36:52 +02:00
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
2017-11-01 19:17:41 +01:00
from stix2.utils import deduplicate, get_class_hierarchy_names
2017-07-12 16:58:31 +02:00
class FileSystemStore(DataStoreMixin):
"""Interface to a file directory of STIX objects.
FileSystemStore is a wrapper around a paired FileSystemSink
and FileSystemSource.
Args:
stix_dir (str): path to directory of STIX objects
2017-11-29 18:03:10 +01:00
allow_custom (bool): whether to allow custom STIX content to be
pushed/retrieved. Defaults to True for FileSystemSource side(retrieving data)
and False for FileSystemSink side(pushing data). However, when
parameter is supplied, it will be applied to both FileSystemSource
and FileSystemSink.
bundlify (bool): whether to wrap objects in bundles when saving them.
Default: False.
Attributes:
source (FileSystemSource): FileSystemSource
sink (FileSystemSink): FileSystemSink
2017-07-12 16:58:31 +02:00
"""
2017-11-29 18:03:10 +01:00
def __init__(self, stix_dir, allow_custom=None, bundlify=False):
2018-02-26 22:56:24 +01:00
if allow_custom is None:
2017-11-29 18:03:10 +01:00
allow_custom_source = True
allow_custom_sink = False
else:
allow_custom_sink = allow_custom_source = allow_custom
super(FileSystemStore, self).__init__(
2017-11-29 18:03:10 +01:00
source=FileSystemSource(stix_dir=stix_dir, allow_custom=allow_custom_source),
sink=FileSystemSink(stix_dir=stix_dir, allow_custom=allow_custom_sink, bundlify=bundlify)
)
2017-07-12 16:58:31 +02:00
class FileSystemSink(DataSink):
"""Interface for adding/pushing STIX objects to file directory of STIX
objects.
Can be paired with a FileSystemSource, together as the two
components of a FileSystemStore.
Args:
stix_dir (str): path to directory of STIX objects.
2017-11-29 18:03:10 +01:00
allow_custom (bool): Whether to allow custom STIX content to be
added to the FileSystemSource. Default: False
bundlify (bool): Whether to wrap objects in bundles when saving them.
Default: False.
2017-07-12 16:58:31 +02:00
"""
2017-11-29 18:03:10 +01:00
def __init__(self, stix_dir, allow_custom=False, bundlify=False):
super(FileSystemSink, self).__init__()
self._stix_dir = os.path.abspath(stix_dir)
2017-11-29 18:03:10 +01:00
self.allow_custom = allow_custom
self.bundlify = bundlify
2017-07-12 16:58:31 +02:00
if not os.path.exists(self._stix_dir):
raise ValueError("directory path for STIX data does not exist")
2017-07-12 16:58:31 +02:00
@property
def stix_dir(self):
return self._stix_dir
2017-07-12 16:58:31 +02:00
def _check_path_and_write(self, stix_obj):
"""Write the given STIX object to a file in the STIX file directory.
"""
path = os.path.join(self._stix_dir, stix_obj["type"], stix_obj["id"] + ".json")
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
if self.bundlify:
2017-11-29 18:03:10 +01:00
stix_obj = Bundle(stix_obj, allow_custom=self.allow_custom)
with open(path, "w") as f:
f.write(str(stix_obj))
2017-11-29 18:03:10 +01:00
def add(self, stix_data=None, version=None):
"""Add STIX objects to file directory.
2017-07-12 16:58:31 +02:00
Args:
stix_data (STIX object OR dict OR str OR list): valid STIX 2.0 content
in a STIX object (or list of), dict (or list of), or a STIX 2.0
json encoded string.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Note:
``stix_data`` can be a Bundle object, but each object in it will be
saved separately; you will be able to retrieve any of the objects
the Bundle contained, but not the Bundle itself.
2017-07-12 16:58:31 +02:00
"""
2017-11-01 19:17:41 +01:00
if any(x in ('STIXDomainObject', 'STIXRelationshipObject', 'MarkingDefinition')
for x in get_class_hierarchy_names(stix_data)):
# adding python STIX object
self._check_path_and_write(stix_data)
elif isinstance(stix_data, (str, dict)):
2017-11-29 18:03:10 +01:00
stix_data = parse(stix_data, allow_custom=self.allow_custom, version=version)
if stix_data["type"] == "bundle":
# extract STIX objects
for stix_obj in stix_data.get("objects", []):
2017-11-29 18:03:10 +01:00
self.add(stix_obj, version=version)
else:
# adding json-formatted STIX
2017-11-29 18:03:10 +01:00
self._check_path_and_write(stix_data,)
elif isinstance(stix_data, Bundle):
# recursively add individual STIX objects
for stix_obj in stix_data.get("objects", []):
2017-11-29 18:03:10 +01:00
self.add(stix_obj, version=version)
elif isinstance(stix_data, list):
# recursively add individual STIX objects
for stix_obj in stix_data:
2017-11-29 18:03:10 +01:00
self.add(stix_obj, version=version)
else:
raise TypeError("stix_data must be a STIX object (or list of), "
"JSON formatted STIX (or list of), "
"or a JSON formatted STIX bundle")
2017-07-12 16:58:31 +02:00
class FileSystemSource(DataSource):
"""Interface for searching/retrieving STIX objects from a STIX object file
directory.
Can be paired with a FileSystemSink, together as the two
components of a FileSystemStore.
Args:
stix_dir (str): path to directory of STIX objects
2017-11-29 18:03:10 +01:00
allow_custom (bool): Whether to allow custom STIX content to be
added to the FileSystemSink. Default: True
2017-07-12 16:58:31 +02:00
"""
2017-11-29 18:03:10 +01:00
def __init__(self, stix_dir, allow_custom=True):
super(FileSystemSource, self).__init__()
self._stix_dir = os.path.abspath(stix_dir)
2017-11-29 18:03:10 +01:00
self.allow_custom = allow_custom
2017-07-12 16:58:31 +02:00
if not os.path.exists(self._stix_dir):
raise ValueError("directory path for STIX data does not exist: %s" % self._stix_dir)
2017-07-12 16:58:31 +02:00
@property
def stix_dir(self):
return self._stix_dir
2017-07-12 16:58:31 +02:00
2017-11-29 18:03:10 +01:00
def get(self, stix_id, version=None, _composite_filters=None):
"""Retrieve STIX object from file directory via STIX ID.
Args:
stix_id (str): The STIX ID of the STIX object to be retrieved.
2018-04-11 19:36:52 +02:00
_composite_filters (FilterSet): collection of filters passed from the parent
CompositeDataSource, not user supplied
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
(STIX object): STIX object that has the supplied STIX ID.
The STIX object is loaded from its json file, parsed into
a python STIX object and then returned
2017-07-12 16:58:31 +02:00
"""
query = [Filter("id", "=", stix_id)]
2017-07-12 16:58:31 +02:00
2017-11-29 18:03:10 +01:00
all_data = self.query(query=query, version=version, _composite_filters=_composite_filters)
2017-07-12 16:58:31 +02:00
if all_data:
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0]
else:
stix_obj = None
2017-07-12 16:58:31 +02:00
return stix_obj
2017-07-12 16:58:31 +02:00
2017-11-29 18:03:10 +01:00
def all_versions(self, stix_id, version=None, _composite_filters=None):
"""Retrieve STIX object from file directory via STIX ID, all versions.
Note: Since FileSystem sources/sinks don't handle multiple versions
of a STIX object, this operation is unnecessary. Pass call to get().
Args:
stix_id (str): The STIX ID of the STIX objects to be retrieved.
2018-04-11 19:36:52 +02:00
_composite_filters (FilterSet): collection of filters passed from the parent
CompositeDataSource, not user supplied
2017-11-01 19:17:41 +01:00
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
(list): of STIX objects that has the supplied STIX ID.
The STIX objects are loaded from their json files, parsed into
a python STIX objects and then returned
2017-07-12 16:58:31 +02:00
"""
2017-11-29 18:03:10 +01:00
return [self.get(stix_id=stix_id, version=version, _composite_filters=_composite_filters)]
2017-07-12 16:58:31 +02:00
2017-11-29 18:03:10 +01:00
def query(self, query=None, version=None, _composite_filters=None):
"""Search and retrieve STIX objects based on the complete query.
A "complete query" includes the filters from the query, the filters
attached to this FileSystemSource, and any filters passed from a
CompositeDataSource (i.e. _composite_filters).
Args:
query (list): list of filters to search on
2018-04-11 19:36:52 +02:00
_composite_filters (FilterSet): collection of filters passed from the
CompositeDataSource, not user supplied
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
(list): list of STIX objects that matches the supplied
query. The STIX objects are loaded from their json files,
parsed into a python STIX objects and then returned.
2017-07-12 16:58:31 +02:00
"""
2017-11-29 18:03:10 +01:00
2017-07-12 16:58:31 +02:00
all_data = []
2018-04-11 19:36:52 +02:00
query = FilterSet(query)
2017-07-12 16:58:31 +02:00
# combine all query filters
if self.filters:
2018-04-11 19:36:52 +02:00
query.add(self.filters)
2017-07-12 16:58:31 +02:00
if _composite_filters:
2018-04-11 19:36:52 +02:00
query.add(_composite_filters)
2017-07-12 16:58:31 +02:00
# extract any filters that are for "type" or "id" , as we can then do
# filtering before reading in the STIX objects. A STIX 'type' filter
# can reduce the query to a single sub-directory. A STIX 'id' filter
# allows for the fast checking of the file names versus loading it.
file_filters = self._parse_file_filters(query)
# establish which subdirectories can be avoided in query
# by decluding as many as possible. A filter with "type" as the property
2017-07-12 16:58:31 +02:00
# means that certain STIX object types can be ruled out, and thus
# the corresponding subdirectories as well
include_paths = []
declude_paths = []
if "type" in [filter.property for filter in file_filters]:
for filter in file_filters:
if filter.property == "type":
if filter.op == "=":
include_paths.append(os.path.join(self._stix_dir, filter.value))
elif filter.op == "!=":
declude_paths.append(os.path.join(self._stix_dir, filter.value))
2017-07-12 16:58:31 +02:00
else:
# have to walk entire STIX directory
include_paths.append(self._stix_dir)
2017-07-12 16:58:31 +02:00
# if a user specifies a "type" filter like "type = <stix-object_type>",
# the filter is reducing the search space to single stix object types
# (and thus single directories). This makes such a filter more powerful
# than "type != <stix-object_type>" bc the latter is substracting
# only one type of stix object type (and thus only one directory),
# As such the former type of filters are given preference over the latter;
# i.e. if both exist in a query, that latter type will be ignored
if not include_paths:
# user has specified types that are not wanted (i.e. "!=")
# so query will look in all STIX directories that are not
# the specified type. Compile correct dir paths
for dir in os.listdir(self._stix_dir):
if os.path.abspath(os.path.join(self._stix_dir, dir)) not in declude_paths:
include_paths.append(os.path.abspath(os.path.join(self._stix_dir, dir)))
2017-07-12 16:58:31 +02:00
# grab stix object ID as well - if present in filters, as
# may forgo the loading of STIX content into memory
if "id" in [filter.property for filter in file_filters]:
for filter in file_filters:
if filter.property == "id" and filter.op == "=":
id_ = filter.value
break
else:
id_ = None
2017-07-12 16:58:31 +02:00
else:
id_ = None
2017-07-12 16:58:31 +02:00
# now iterate through all STIX objs
for path in include_paths:
for root, dirs, files in os.walk(path):
for file_ in files:
if not file_.endswith(".json"):
# skip non '.json' files as more likely to be random non-STIX files
continue
if not id_ or id_ == file_.split(".")[0]:
2017-07-12 16:58:31 +02:00
# have to load into memory regardless to evaluate other filters
2018-03-14 16:06:03 +01:00
try:
stix_obj = json.load(open(os.path.join(root, file_)))
if stix_obj["type"] == "bundle":
stix_obj = stix_obj["objects"][0]
2018-03-15 00:09:25 +01:00
# naive STIX type checking
stix_obj["type"]
stix_obj["id"]
2018-03-15 00:09:25 +01:00
except (ValueError, KeyError): # likely not a JSON file
raise TypeError("STIX JSON object at '{0}' could either not be parsed to "
"JSON or was not valid STIX JSON".format(os.path.join(root, file_)))
# check against other filters, add if match
2017-10-04 15:47:18 +02:00
all_data.extend(apply_common_filters([stix_obj], query))
all_data = deduplicate(all_data)
2017-07-12 16:58:31 +02:00
# parse python STIX objects from the STIX object dicts
2017-11-29 18:03:10 +01:00
stix_objs = [parse(stix_obj_dict, allow_custom=self.allow_custom, version=version) for stix_obj_dict in all_data]
return stix_objs
2017-07-12 16:58:31 +02:00
def _parse_file_filters(self, query):
"""Extract STIX common filters.
Possibly speeds up querying STIX objects from the file system.
Extracts filters that are for the "id" and "type" property of
a STIX object. As the file directory is organized by STIX
object type with filenames that are equivalent to the STIX
object ID, these filters can be used first to reduce the
search space of a FileSystemStore (or FileSystemSink).
2017-07-12 16:58:31 +02:00
"""
2018-04-11 19:36:52 +02:00
file_filters = []
for filter_ in query:
if filter_.property == "id" or filter_.property == "type":
2018-04-11 19:36:52 +02:00
file_filters.append(filter_)
2017-07-12 16:58:31 +02:00
return file_filters