Remove str() as a way to serialize objects. Add support for encodings and Bundle versions.

stix2.1
Emmanuelle Vargas-Gonzalez 2018-07-10 15:51:20 -04:00
parent 012eba4e9b
commit 8cf68054d4
1 changed files with 32 additions and 31 deletions

View File

@ -1,9 +1,10 @@
"""Python STIX 2.0 FileSystem Source/Sink"""
import io
import json
import os
from stix2 import Bundle
from stix2 import Bundle, v20
from stix2.core import parse
from stix2.datastore import DataSink, DataSource, DataStoreMixin
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
@ -72,7 +73,7 @@ class FileSystemSink(DataSink):
def stix_dir(self):
return self._stix_dir
def _check_path_and_write(self, stix_obj):
def _check_path_and_write(self, stix_obj, encoding='utf-8'):
"""Write the given STIX object to a file in the STIX file directory.
"""
path = os.path.join(self._stix_dir, stix_obj['type'], stix_obj['id'] + '.json')
@ -81,22 +82,27 @@ class FileSystemSink(DataSink):
os.makedirs(os.path.dirname(path))
if self.bundlify:
stix_obj = Bundle(stix_obj, allow_custom=self.allow_custom)
if 'spec_version' in stix_obj:
# Assuming future specs will allow multiple SDO/SROs
# versions in a single bundle we won't need to check this
# and just use the latest supported Bundle version.
stix_obj = Bundle(stix_obj, allow_custom=self.allow_custom)
else:
stix_obj = v20.Bundle(stix_obj, allow_custom=self.allow_custom)
with open(path, 'w') as f:
f.write(str(stix_obj))
with io.open(path, 'w', encoding=encoding) as f:
stix_obj = stix_obj.serialize(pretty=True, encoding=encoding, ensure_ascii=False)
f.write(stix_obj)
def add(self, stix_data=None, version=None):
def add(self, stix_data=None):
"""Add STIX objects to file directory.
Args:
stix_data (STIX object OR dict OR str OR list): valid STIX 2.0 content
in a STIX object (or list of), dict (or list of), or a STIX 2.0
json encoded string.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Note:
Notes:
``stix_data`` can be a Bundle object, but each object in it will be
saved separately; you will be able to retrieve any of the objects
the Bundle contained, but not the Bundle itself.
@ -108,24 +114,24 @@ class FileSystemSink(DataSink):
self._check_path_and_write(stix_data)
elif isinstance(stix_data, (str, dict)):
stix_data = parse(stix_data, allow_custom=self.allow_custom, version=version)
stix_data = parse(stix_data, allow_custom=self.allow_custom)
if stix_data['type'] == 'bundle':
# extract STIX objects
for stix_obj in stix_data.get('objects', []):
self.add(stix_obj, version=version)
self.add(stix_obj)
else:
# adding json-formatted STIX
self._check_path_and_write(stix_data,)
self._check_path_and_write(stix_data)
elif isinstance(stix_data, Bundle):
elif 'Bundle' in get_class_hierarchy_names(stix_data):
# recursively add individual STIX objects
for stix_obj in stix_data.get('objects', []):
self.add(stix_obj, version=version)
self.add(stix_obj)
elif isinstance(stix_data, list):
# recursively add individual STIX objects
for stix_obj in stix_data:
self.add(stix_obj, version=version)
self.add(stix_obj)
else:
raise TypeError("stix_data must be a STIX object (or list of), "
@ -158,7 +164,7 @@ class FileSystemSource(DataSource):
def stix_dir(self):
return self._stix_dir
def get(self, stix_id, version=None, _composite_filters=None):
def get(self, stix_id, _composite_filters=None):
"""Retrieve STIX object from file directory via STIX ID.
Args:
@ -176,7 +182,7 @@ class FileSystemSource(DataSource):
"""
query = [Filter('id', '=', stix_id)]
all_data = self.query(query=query, version=version, _composite_filters=_composite_filters)
all_data = self.query(query=query, _composite_filters=_composite_filters)
if all_data:
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0]
@ -185,7 +191,7 @@ class FileSystemSource(DataSource):
return stix_obj
def all_versions(self, stix_id, version=None, _composite_filters=None):
def all_versions(self, stix_id, _composite_filters=None):
"""Retrieve STIX object from file directory via STIX ID, all versions.
Note: Since FileSystem sources/sinks don't handle multiple versions
@ -193,10 +199,8 @@ class FileSystemSource(DataSource):
Args:
stix_id (str): The STIX ID of the STIX objects to be retrieved.
_composite_filters (FilterSet): collection of filters passed from the parent
CompositeDataSource, not user supplied
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
_composite_filters (FilterSet): collection of filters passed from
the parent CompositeDataSource, not user supplied.
Returns:
(list): of STIX objects that has the supplied STIX ID.
@ -204,9 +208,9 @@ class FileSystemSource(DataSource):
a python STIX objects and then returned
"""
return [self.get(stix_id=stix_id, version=version, _composite_filters=_composite_filters)]
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]
def query(self, query=None, version=None, _composite_filters=None):
def query(self, query=None, _composite_filters=None):
"""Search and retrieve STIX objects based on the complete query.
A "complete query" includes the filters from the query, the filters
@ -215,18 +219,15 @@ class FileSystemSource(DataSource):
Args:
query (list): list of filters to search on
_composite_filters (FilterSet): collection of filters passed from the
CompositeDataSource, not user supplied
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
_composite_filters (FilterSet): collection of filters passed from
the CompositeDataSource, not user supplied
Returns:
(list): list of STIX objects that matches the supplied
stix_objs (list): list of STIX objects that matches the supplied
query. The STIX objects are loaded from their json files,
parsed into a python STIX objects and then returned.
"""
all_data = []
query = FilterSet(query)
@ -318,7 +319,7 @@ class FileSystemSource(DataSource):
all_data = deduplicate(all_data)
# parse python STIX objects from the STIX object dicts
stix_objs = [parse(stix_obj_dict, allow_custom=self.allow_custom, version=version) for stix_obj_dict in all_data]
stix_objs = [parse(stix_obj_dict, allow_custom=self.allow_custom) for stix_obj_dict in all_data]
return stix_objs