cti-python-stix2/stix2/datastore/memory.py

347 lines
12 KiB
Python

"""
Python STIX 2.0 Memory Source/Sink
"""
import itertools
import json
import os
from stix2.base import _STIXBase
from stix2.core import Bundle, parse
from stix2.datastore import DataSink, DataSource, DataStoreMixin
from stix2.datastore.filters import FilterSet, apply_common_filters
from stix2.utils import is_marking
def _add(store, stix_data=None, allow_custom=True, version=None):
"""Add STIX objects to MemoryStore/Sink.
Adds STIX objects to an in-memory dictionary for fast lookup.
Recursive function, breaks down STIX Bundles and lists.
Args:
stix_data (list OR dict OR STIX object): STIX objects to be added
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
if isinstance(stix_data, list):
# STIX objects are in a list- recurse on each object
for stix_obj in stix_data:
_add(store, stix_obj, allow_custom, version)
elif stix_data["type"] == "bundle":
# adding a json bundle - so just grab STIX objects
for stix_obj in stix_data.get("objects", []):
_add(store, stix_obj, allow_custom, version)
else:
# Adding a single non-bundle object
if isinstance(stix_data, _STIXBase):
stix_obj = stix_data
else:
stix_obj = parse(stix_data, allow_custom, version)
# Map ID directly to the object, if it is a marking. Otherwise,
# map to a family, so we can track multiple versions.
if is_marking(stix_obj):
store._data[stix_obj["id"]] = stix_obj
else:
if stix_obj["id"] in store._data:
obj_family = store._data[stix_obj["id"]]
else:
obj_family = _ObjectFamily()
store._data[stix_obj["id"]] = obj_family
obj_family.add(stix_obj)
class _ObjectFamily(object):
"""
An internal implementation detail of memory sources/sinks/stores.
Represents a "family" of STIX objects: all objects with a particular
ID. (I.e. all versions.) The latest version is also tracked so that it
can be obtained quickly.
"""
def __init__(self):
self.all_versions = {}
self.latest_version = None
def add(self, obj):
self.all_versions[obj["modified"]] = obj
if self.latest_version is None or \
obj["modified"] > self.latest_version["modified"]:
self.latest_version = obj
def __str__(self):
return "<<{}; latest={}>>".format(self.all_versions,
self.latest_version["modified"])
def __repr__(self):
return str(self)
class MemoryStore(DataStoreMixin):
"""Interface to an in-memory dictionary of STIX objects.
MemoryStore is a wrapper around a paired MemorySink and MemorySource.
Note: It doesn't make sense to create a MemoryStore by passing
in existing MemorySource and MemorySink because there could
be data concurrency issues. As well, just as easy to create new MemoryStore.
Args:
stix_data (list OR dict OR STIX object): STIX content to be added
allow_custom (bool): whether to allow custom STIX content.
Only applied when export/input functions called, i.e.
load_from_file() and save_to_file(). Defaults to True.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Attributes:
_data (dict): the in-memory dict that holds STIX objects
source (MemorySource): MemorySource
sink (MemorySink): MemorySink
"""
def __init__(self, stix_data=None, allow_custom=True, version=None):
self._data = {}
if stix_data:
_add(self, stix_data, allow_custom, version=version)
super(MemoryStore, self).__init__(
source=MemorySource(stix_data=self._data, allow_custom=allow_custom, version=version, _store=True),
sink=MemorySink(stix_data=self._data, allow_custom=allow_custom, version=version, _store=True)
)
def save_to_file(self, *args, **kwargs):
"""Write SITX objects from in-memory dictionary to JSON file, as a STIX
Bundle.
Args:
file_path (str): file path to write STIX data to
"""
return self.sink.save_to_file(*args, **kwargs)
def load_from_file(self, *args, **kwargs):
"""Load STIX data from JSON file.
File format is expected to be a single JSON
STIX object or JSON STIX bundle.
Args:
file_path (str): file path to load STIX data from
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
return self.source.load_from_file(*args, **kwargs)
class MemorySink(DataSink):
"""Interface for adding/pushing STIX objects to an in-memory dictionary.
Designed to be paired with a MemorySource, together as the two
components of a MemoryStore.
Args:
stix_data (dict OR list): valid STIX 2.0 content in
bundle or a list.
_store (bool): whether the MemorySink is a part of a MemoryStore,
in which case "stix_data" is a direct reference to
shared memory with DataSource. Not user supplied
allow_custom (bool): whether to allow custom objects/properties
when exporting STIX content to file.
Default: True.
Attributes:
_data (dict): the in-memory dict that holds STIX objects.
If part of a MemoryStore, the dict is shared with a MemorySource
"""
def __init__(self, stix_data=None, allow_custom=True, version=None, _store=False):
super(MemorySink, self).__init__()
self.allow_custom = allow_custom
if _store:
self._data = stix_data
else:
self._data = {}
if stix_data:
_add(self, stix_data, allow_custom, version=version)
def add(self, stix_data, version=None):
_add(self, stix_data, self.allow_custom, version)
add.__doc__ = _add.__doc__
def save_to_file(self, file_path):
file_path = os.path.abspath(file_path)
all_objs = itertools.chain.from_iterable(
value.all_versions.values() if isinstance(value, _ObjectFamily)
else [value]
for value in self._data.values()
)
if not os.path.exists(os.path.dirname(file_path)):
os.makedirs(os.path.dirname(file_path))
with open(file_path, "w") as f:
f.write(str(Bundle(list(all_objs), allow_custom=self.allow_custom)))
save_to_file.__doc__ = MemoryStore.save_to_file.__doc__
class MemorySource(DataSource):
"""Interface for searching/retrieving STIX objects from an in-memory
dictionary.
Designed to be paired with a MemorySink, together as the two
components of a MemoryStore.
Args:
stix_data (dict OR list OR STIX object): valid STIX 2.0 content in
bundle or list.
_store (bool): if the MemorySource is a part of a MemoryStore,
in which case "stix_data" is a direct reference to shared
memory with DataSink. Not user supplied
allow_custom (bool): whether to allow custom objects/properties
when importing STIX content from file.
Default: True.
Attributes:
_data (dict): the in-memory dict that holds STIX objects.
If part of a MemoryStore, the dict is shared with a MemorySink
"""
def __init__(self, stix_data=None, allow_custom=True, version=None, _store=False):
super(MemorySource, self).__init__()
self.allow_custom = allow_custom
if _store:
self._data = stix_data
else:
self._data = {}
if stix_data:
_add(self, stix_data, allow_custom, version=version)
def get(self, stix_id, _composite_filters=None):
"""Retrieve STIX object from in-memory dict via STIX ID.
Args:
stix_id (str): The STIX ID of the STIX object to be retrieved.
_composite_filters (FilterSet): collection of filters passed from the parent
CompositeDataSource, not user supplied
Returns:
(STIX object): STIX object that has the supplied ID.
"""
stix_obj = None
if is_marking(stix_id):
stix_obj = self._data.get(stix_id)
else:
object_family = self._data.get(stix_id)
if object_family:
stix_obj = object_family.latest_version
if stix_obj:
all_filters = list(
itertools.chain(
_composite_filters or [],
self.filters
)
)
stix_obj = next(apply_common_filters([stix_obj], all_filters), None)
return stix_obj
def all_versions(self, stix_id, _composite_filters=None):
"""Retrieve STIX objects from in-memory dict via STIX ID, all versions of it
Note: Since Memory sources/sinks don't handle multiple versions of a
STIX object, this operation is unnecessary. Translate call to get().
Args:
stix_id (str): The STIX ID of the STIX 2 object to retrieve.
_composite_filters (FilterSet): collection of filters passed from the parent
CompositeDataSource, not user supplied
Returns:
(list): list of STIX objects that have the supplied ID.
"""
results = []
stix_objs_to_filter = None
if is_marking(stix_id):
stix_obj = self._data.get(stix_id)
if stix_obj:
stix_objs_to_filter = [stix_obj]
else:
object_family = self._data.get(stix_id)
if object_family:
stix_objs_to_filter = object_family.all_versions.values()
if stix_objs_to_filter:
all_filters = list(
itertools.chain(
_composite_filters or [],
self.filters
)
)
results.extend(
apply_common_filters(stix_objs_to_filter, all_filters)
)
return results
def query(self, query=None, _composite_filters=None):
"""Search and retrieve STIX objects based on the complete query.
A "complete query" includes the filters from the query, the filters
attached to this MemorySource, and any filters passed from a
CompositeDataSource (i.e. _composite_filters).
Args:
query (list): list of filters to search on
_composite_filters (FilterSet): collection of filters passed from the
CompositeDataSource, not user supplied
Returns:
(list): list of STIX objects that match the supplied query.
"""
query = FilterSet(query)
# combine all query filters
if self.filters:
query.add(self.filters)
if _composite_filters:
query.add(_composite_filters)
all_objs = itertools.chain.from_iterable(
value.all_versions.values() if isinstance(value, _ObjectFamily)
else [value]
for value in self._data.values()
)
# Apply STIX common property filters.
all_data = list(apply_common_filters(all_objs, query))
return all_data
def load_from_file(self, file_path, version=None):
with open(os.path.abspath(file_path), "r") as f:
stix_data = json.load(f)
# Override user version selection if loading a bundle
if stix_data["type"] == "bundle":
version = stix_data["spec_version"]
_add(self, stix_data, self.allow_custom, version)
load_from_file.__doc__ = MemoryStore.load_from_file.__doc__