Add multi-version support to the filesystem datastore.
Factored out the _is_marking() function from the memory datastore module to utils so it can be reused, and changed both filesystem and memory datastore modules to import and use it.master
parent
7cc7431cb7
commit
0096835cfc
|
@ -3,13 +3,367 @@ Python STIX 2.0 FileSystem Source/Sink
|
|||
|
||||
"""
|
||||
|
||||
import errno
|
||||
import json
|
||||
import os
|
||||
import stat
|
||||
|
||||
import six
|
||||
|
||||
from stix2.base import _STIXBase
|
||||
from stix2.core import Bundle, parse
|
||||
from stix2.datastore import DataSink, DataSource, DataStoreMixin
|
||||
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
|
||||
from stix2.utils import deduplicate, get_class_hierarchy_names
|
||||
from stix2.utils import deduplicate, is_marking
|
||||
|
||||
|
||||
def _timestamp2filename(timestamp):
|
||||
"""
|
||||
Encapsulates a way to create unique filenames based on an object's
|
||||
"modified" property value. This should not include an extension.
|
||||
|
||||
:param timestamp: A timestamp, as a datetime.datetime object.
|
||||
"""
|
||||
# Different times will only produce different file names if all timestamps
|
||||
# are in the same time zone! (Should I convert to UTC just to be safe?)
|
||||
return timestamp.strftime("%Y%m%d%H%M%S%f")
|
||||
|
||||
|
||||
def _type_from_id(id_):
|
||||
"""Extract the type from a STIX identifier"""
|
||||
dd_idx = id_.find("--")
|
||||
if dd_idx == -1:
|
||||
raise Exception(
|
||||
"Invalid ID: {}. Must have format <type>--<uuid>.".format(id_)
|
||||
)
|
||||
return id_[:dd_idx]
|
||||
|
||||
|
||||
class AuthSet(object):
|
||||
"""
|
||||
Represents either a whitelist or blacklist of values, where/what we
|
||||
must/must not search to find objects which match a query. (Maybe "AuthSet"
|
||||
isn't the right name, but determining authorization is a typical context in
|
||||
which black/white lists are used.)
|
||||
|
||||
The set may be empty. For a whitelist, this means you mustn't search
|
||||
anywhere, which means the query was impossible to match, so you can skip
|
||||
searching altogether. For a blacklist, this means nothing is excluded
|
||||
and you must search everywhere.
|
||||
"""
|
||||
|
||||
BLACK = 0
|
||||
WHITE = 1
|
||||
|
||||
def __init__(self, allowed, prohibited):
|
||||
"""
|
||||
Initialize this AuthSet from the given sets of allowed and/or
|
||||
prohibited values. The type of set (black or white) is determined
|
||||
from the allowed and/or prohibited values given.
|
||||
|
||||
:param allowed: A set of allowed values (or None if no allow filters
|
||||
were found in the query)
|
||||
:param prohibited: A set of prohibited values (not None)
|
||||
"""
|
||||
if allowed is None:
|
||||
self.__values = prohibited
|
||||
self.__type = AuthSet.BLACK
|
||||
|
||||
else:
|
||||
# There was at least one allow filter, so create a whitelist. But
|
||||
# any matching prohibited values create a combination of conditions
|
||||
# which can never match. So exclude those.
|
||||
self.__values = allowed - prohibited
|
||||
self.__type = AuthSet.WHITE
|
||||
|
||||
@property
|
||||
def values(self):
|
||||
"""
|
||||
Get the values in this white/blacklist, as a set.
|
||||
"""
|
||||
return self.__values
|
||||
|
||||
@property
|
||||
def auth_type(self):
|
||||
"""
|
||||
Get the type of set: AuthSet.WHITE or AuthSet.BLACK.
|
||||
"""
|
||||
return self.__type
|
||||
|
||||
def __repr__(self):
|
||||
return "{}list: {}".format(
|
||||
"white" if self.auth_type == AuthSet.WHITE else "black",
|
||||
self.values
|
||||
)
|
||||
|
||||
|
||||
# A fixed, reusable AuthSet which accepts anything. It came in handy.
|
||||
_AUTHSET_ANY = AuthSet(None, set())
|
||||
|
||||
|
||||
def _update_allow(allow_set, value):
|
||||
"""
|
||||
Updates the given set of "allow" values. The first time an update to the
|
||||
set occurs, the value(s) are added. Thereafter, since all filters are
|
||||
implicitly AND'd, the given values are intersected with the existing allow
|
||||
set, which may remove values. At the end, it may even wind up empty.
|
||||
|
||||
:param allow_set: The allow set, or None
|
||||
:param value: The value(s) to add (single value, or iterable of values)
|
||||
:return: The updated allow set (not None)
|
||||
"""
|
||||
adding_seq = hasattr(value, "__iter__") and \
|
||||
not isinstance(value, six.string_types)
|
||||
|
||||
if allow_set is None:
|
||||
allow_set = set()
|
||||
if adding_seq:
|
||||
allow_set.update(value)
|
||||
else:
|
||||
allow_set.add(value)
|
||||
|
||||
else:
|
||||
# strangely, the "&=" operator requires a set on the RHS
|
||||
# whereas the method allows any iterable.
|
||||
if adding_seq:
|
||||
allow_set.intersection_update(value)
|
||||
else:
|
||||
allow_set.intersection_update({value})
|
||||
|
||||
return allow_set
|
||||
|
||||
|
||||
def _find_search_optimizations(filters):
|
||||
"""
|
||||
Searches through all the filters, and creates white/blacklists of types and
|
||||
IDs, which can be used to optimize the filesystem search.
|
||||
|
||||
:param filters: An iterable of filter objects representing a query
|
||||
:return: A 2-tuple of AuthSet objects: the first is for object types, and
|
||||
the second is for object IDs.
|
||||
"""
|
||||
|
||||
# The basic approach to this is to determine what is allowed and
|
||||
# prohibited, independently, and then combine them to create the final
|
||||
# white/blacklists.
|
||||
|
||||
allowed_types = allowed_ids = None
|
||||
prohibited_types = set()
|
||||
prohibited_ids = set()
|
||||
|
||||
for filter_ in filters:
|
||||
if filter_.property == "type":
|
||||
if filter_.op in ("=", "in"):
|
||||
allowed_types = _update_allow(allowed_types, filter_.value)
|
||||
elif filter_.op == "!=":
|
||||
prohibited_types.add(filter_.value)
|
||||
|
||||
elif filter_.property == "id":
|
||||
if filter_.op == "=":
|
||||
# An "allow" ID filter implies a type filter too, since IDs
|
||||
# contain types within them.
|
||||
allowed_ids = _update_allow(allowed_ids, filter_.value)
|
||||
allowed_types = _update_allow(allowed_types,
|
||||
_type_from_id(filter_.value))
|
||||
elif filter_.op == "!=":
|
||||
prohibited_ids.add(filter_.value)
|
||||
elif filter_.op == "in":
|
||||
allowed_ids = _update_allow(allowed_ids, filter_.value)
|
||||
allowed_types = _update_allow(allowed_types, (
|
||||
_type_from_id(id_) for id_ in filter_.value
|
||||
))
|
||||
|
||||
opt_types = AuthSet(allowed_types, prohibited_types)
|
||||
opt_ids = AuthSet(allowed_ids, prohibited_ids)
|
||||
|
||||
# If we have both type and ID whitelists, perform a type-based intersection
|
||||
# on them, to further optimize. (Some of the cross-property constraints
|
||||
# occur above; this is essentially a second pass which operates on the
|
||||
# final whitelists, which among other things, incorporates any of the
|
||||
# prohibitions found above.)
|
||||
if opt_types.auth_type == AuthSet.WHITE and \
|
||||
opt_ids.auth_type == AuthSet.WHITE:
|
||||
|
||||
opt_types.values.intersection_update(
|
||||
_type_from_id(id_) for id_ in opt_ids.values
|
||||
)
|
||||
|
||||
opt_ids.values.intersection_update(
|
||||
id_ for id_ in opt_ids.values
|
||||
if _type_from_id(id_) in opt_types.values
|
||||
)
|
||||
|
||||
return opt_types, opt_ids
|
||||
|
||||
|
||||
def _get_matching_dir_entries(parent_dir, auth_set, st_mode_test=None, ext=""):
|
||||
"""
|
||||
Search a directory (non-recursively), and find entries which match the
|
||||
given criteria.
|
||||
|
||||
:param parent_dir: The directory to search
|
||||
:param auth_set: an AuthSet instance, which represents a black/whitelist
|
||||
filter on filenames
|
||||
:param st_mode_test: A callable allowing filtering based on the type of
|
||||
directory entry. E.g. just get directories, or just get files. It
|
||||
will be passed the st_mode field of a stat() structure and should
|
||||
return True to include the file, or False to exclude it. Easy thing to
|
||||
do is pass one of the stat module functions, e.g. stat.S_ISREG. If
|
||||
None, don't filter based on entry type.
|
||||
:param ext: Determines how names from auth_set match up to directory
|
||||
entries, and allows filtering by extension. The extension is added
|
||||
to auth_set values to obtain directory entries; it is removed from
|
||||
directory entries to obtain auth_set values. In this way, auth_set
|
||||
may be treated as having only "basenames" of the entries. Only entries
|
||||
having the given extension will be included in the results. If not
|
||||
empty, the extension MUST include a leading ".". The default is the
|
||||
empty string, which will result in direct comparisons, and no
|
||||
extension-based filtering.
|
||||
:return: A list of directory entries matching the criteria. These will not
|
||||
have any path info included; they will just be bare names.
|
||||
:raises OSError: If there are errors accessing directory contents or
|
||||
stat()'ing files
|
||||
"""
|
||||
|
||||
results = []
|
||||
if auth_set.auth_type == AuthSet.WHITE:
|
||||
for value in auth_set.values:
|
||||
try:
|
||||
filename = value + ext
|
||||
s = os.stat(os.path.join(parent_dir, filename))
|
||||
if not st_mode_test or st_mode_test(s.st_mode):
|
||||
results.append(filename)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOENT:
|
||||
raise e
|
||||
# else, file-not-found is ok, just skip
|
||||
|
||||
else: # auth_set is a blacklist
|
||||
for entry in os.listdir(parent_dir):
|
||||
if ext:
|
||||
auth_name, this_ext = os.path.splitext(entry)
|
||||
if this_ext != ext:
|
||||
continue
|
||||
else:
|
||||
auth_name = entry
|
||||
|
||||
if auth_name in auth_set.values:
|
||||
continue
|
||||
|
||||
try:
|
||||
s = os.stat(os.path.join(parent_dir, entry))
|
||||
if not st_mode_test or st_mode_test(s.st_mode):
|
||||
results.append(entry)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOENT:
|
||||
raise e
|
||||
# else, file-not-found is ok, just skip
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def _check_object_from_file(query, filepath):
|
||||
"""
|
||||
Read a STIX object from the given file, and check it against the given
|
||||
filters.
|
||||
|
||||
:param query: Iterable of filters
|
||||
:param filepath: Path to file to read
|
||||
:return: The STIX object, as a dict, if the object passes the filters. If
|
||||
not, None is returned.
|
||||
:raises TypeError: If the file had invalid content
|
||||
:raises IOError: If there are problems opening/reading the file
|
||||
"""
|
||||
try:
|
||||
with open(filepath, "r") as f:
|
||||
stix_obj = json.load(f)
|
||||
|
||||
if stix_obj["type"] == "bundle":
|
||||
stix_obj = stix_obj["objects"][0]
|
||||
|
||||
# naive STIX type checking
|
||||
stix_obj["type"]
|
||||
stix_obj["id"]
|
||||
|
||||
except (ValueError, KeyError): # likely not a JSON file
|
||||
raise TypeError(
|
||||
"STIX JSON object at '{0}' could either not be parsed "
|
||||
"to JSON or was not valid STIX JSON".format(
|
||||
filepath))
|
||||
|
||||
# check against other filters, add if match
|
||||
result = next(apply_common_filters([stix_obj], query), None)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _search_versioned(query, type_path, auth_ids):
|
||||
"""
|
||||
Searches the given directory, which contains data for STIX objects of a
|
||||
particular versioned type (i.e. not markings), and return any which match
|
||||
the query.
|
||||
|
||||
:param query: The query to match against
|
||||
:param type_path: The directory with type-specific STIX object files
|
||||
:param auth_ids: Search optimization based on object ID
|
||||
:return: A list of all matching objects
|
||||
:raises TypeError: If any objects had invalid content
|
||||
:raises IOError, OSError: If there were any problems opening/reading files
|
||||
"""
|
||||
results = []
|
||||
id_dirs = _get_matching_dir_entries(type_path, auth_ids,
|
||||
stat.S_ISDIR)
|
||||
for id_dir in id_dirs:
|
||||
id_path = os.path.join(type_path, id_dir)
|
||||
|
||||
# This leverages a more sophisticated function to do a simple thing:
|
||||
# get all the JSON files from a directory. I guess it does give us
|
||||
# file type checking, ensuring we only get regular files.
|
||||
version_files = _get_matching_dir_entries(id_path, _AUTHSET_ANY,
|
||||
stat.S_ISREG, ".json")
|
||||
for version_file in version_files:
|
||||
version_path = os.path.join(id_path, version_file)
|
||||
|
||||
try:
|
||||
stix_obj = _check_object_from_file(query, version_path)
|
||||
if stix_obj:
|
||||
results.append(stix_obj)
|
||||
except IOError as e:
|
||||
if e.errno != errno.ENOENT:
|
||||
raise e
|
||||
# else, file-not-found is ok, just skip
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def _search_markings(query, markings_path, auth_ids):
|
||||
"""
|
||||
Searches the given directory, which contains markings data, and return any
|
||||
which match the query.
|
||||
|
||||
:param query: The query to match against
|
||||
:param markings_path: The directory with STIX markings files
|
||||
:param auth_ids: Search optimization based on object ID
|
||||
:return: A list of all matching objects
|
||||
:raises TypeError: If any objects had invalid content
|
||||
:raises IOError: If there were any problems opening/reading files
|
||||
"""
|
||||
results = []
|
||||
id_files = _get_matching_dir_entries(markings_path, auth_ids, stat.S_ISREG,
|
||||
".json")
|
||||
for id_file in id_files:
|
||||
id_path = os.path.join(markings_path, id_file)
|
||||
|
||||
try:
|
||||
stix_obj = _check_object_from_file(query, id_path)
|
||||
if stix_obj:
|
||||
results.append(stix_obj)
|
||||
except IOError as e:
|
||||
if e.errno != errno.ENOENT:
|
||||
raise e
|
||||
# else, file-not-found is ok, just skip
|
||||
|
||||
return results
|
||||
|
||||
|
||||
class FileSystemStore(DataStoreMixin):
|
||||
|
@ -77,15 +431,23 @@ class FileSystemSink(DataSink):
|
|||
def _check_path_and_write(self, stix_obj):
|
||||
"""Write the given STIX object to a file in the STIX file directory.
|
||||
"""
|
||||
path = os.path.join(self._stix_dir, stix_obj["type"], stix_obj["id"] + ".json")
|
||||
type_dir = os.path.join(self._stix_dir, stix_obj["type"])
|
||||
if is_marking(stix_obj):
|
||||
filename = stix_obj.id
|
||||
obj_dir = type_dir
|
||||
else:
|
||||
filename = _timestamp2filename(stix_obj.modified)
|
||||
obj_dir = os.path.join(type_dir, stix_obj["id"])
|
||||
|
||||
if not os.path.exists(os.path.dirname(path)):
|
||||
os.makedirs(os.path.dirname(path))
|
||||
file_path = os.path.join(obj_dir, filename + ".json")
|
||||
|
||||
if not os.path.exists(obj_dir):
|
||||
os.makedirs(obj_dir)
|
||||
|
||||
if self.bundlify:
|
||||
stix_obj = Bundle(stix_obj, allow_custom=self.allow_custom)
|
||||
|
||||
with open(path, "w") as f:
|
||||
with open(file_path, "w") as f:
|
||||
f.write(str(stix_obj))
|
||||
|
||||
def add(self, stix_data=None, version=None):
|
||||
|
@ -104,25 +466,18 @@ class FileSystemSink(DataSink):
|
|||
the Bundle contained, but not the Bundle itself.
|
||||
|
||||
"""
|
||||
if any(x in ('STIXDomainObject', 'STIXRelationshipObject', 'MarkingDefinition')
|
||||
for x in get_class_hierarchy_names(stix_data)):
|
||||
if isinstance(stix_data, Bundle):
|
||||
# recursively add individual STIX objects
|
||||
for stix_obj in stix_data.get("objects", []):
|
||||
self.add(stix_obj, version=version)
|
||||
|
||||
elif isinstance(stix_data, _STIXBase):
|
||||
# adding python STIX object
|
||||
self._check_path_and_write(stix_data)
|
||||
|
||||
elif isinstance(stix_data, (str, dict)):
|
||||
stix_data = parse(stix_data, allow_custom=self.allow_custom, version=version)
|
||||
if stix_data["type"] == "bundle":
|
||||
# extract STIX objects
|
||||
for stix_obj in stix_data.get("objects", []):
|
||||
self.add(stix_obj, version=version)
|
||||
else:
|
||||
# adding json-formatted STIX
|
||||
self._check_path_and_write(stix_data,)
|
||||
|
||||
elif isinstance(stix_data, Bundle):
|
||||
# recursively add individual STIX objects
|
||||
for stix_obj in stix_data.get("objects", []):
|
||||
self.add(stix_obj, version=version)
|
||||
self.add(stix_data, version=version)
|
||||
|
||||
elif isinstance(stix_data, list):
|
||||
# recursively add individual STIX objects
|
||||
|
@ -176,9 +531,7 @@ class FileSystemSource(DataSource):
|
|||
a python STIX object and then returned
|
||||
|
||||
"""
|
||||
query = [Filter("id", "=", stix_id)]
|
||||
|
||||
all_data = self.query(query=query, version=version, _composite_filters=_composite_filters)
|
||||
all_data = self.all_versions(stix_id, version=version, _composite_filters=_composite_filters)
|
||||
|
||||
if all_data:
|
||||
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0]
|
||||
|
@ -206,9 +559,10 @@ class FileSystemSource(DataSource):
|
|||
a python STIX objects and then returned
|
||||
|
||||
"""
|
||||
return [self.get(stix_id=stix_id, version=version, _composite_filters=_composite_filters)]
|
||||
query = [Filter("id", "=", stix_id)]
|
||||
return self.query(query, version=version, _composite_filters=_composite_filters)
|
||||
|
||||
def query(self, query=None, version=None, _composite_filters=None):
|
||||
def query2(self, query=None, version=None, _composite_filters=None):
|
||||
"""Search and retrieve STIX objects based on the complete query.
|
||||
|
||||
A "complete query" includes the filters from the query, the filters
|
||||
|
@ -341,3 +695,57 @@ class FileSystemSource(DataSource):
|
|||
if filter_.property == "id" or filter_.property == "type":
|
||||
file_filters.append(filter_)
|
||||
return file_filters
|
||||
|
||||
def query(self, query=None, version=None, _composite_filters=None):
|
||||
"""Search and retrieve STIX objects based on the complete query.
|
||||
|
||||
A "complete query" includes the filters from the query, the filters
|
||||
attached to this FileSystemSource, and any filters passed from a
|
||||
CompositeDataSource (i.e. _composite_filters).
|
||||
|
||||
Args:
|
||||
query (list): list of filters to search on
|
||||
_composite_filters (FilterSet): collection of filters passed from the
|
||||
CompositeDataSource, not user supplied
|
||||
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
|
||||
None, use latest version.
|
||||
|
||||
Returns:
|
||||
(list): list of STIX objects that matches the supplied
|
||||
query. The STIX objects are loaded from their json files,
|
||||
parsed into a python STIX objects and then returned.
|
||||
|
||||
"""
|
||||
|
||||
all_data = []
|
||||
|
||||
query = FilterSet(query)
|
||||
|
||||
# combine all query filters
|
||||
if self.filters:
|
||||
query.add(self.filters)
|
||||
if _composite_filters:
|
||||
query.add(_composite_filters)
|
||||
|
||||
auth_types, auth_ids = _find_search_optimizations(query)
|
||||
|
||||
type_dirs = _get_matching_dir_entries(self._stix_dir, auth_types,
|
||||
stat.S_ISDIR)
|
||||
for type_dir in type_dirs:
|
||||
type_path = os.path.join(self._stix_dir, type_dir)
|
||||
|
||||
if type_dir == "marking-definition":
|
||||
type_results = _search_markings(query, type_path, auth_ids)
|
||||
else:
|
||||
type_results = _search_versioned(query, type_path, auth_ids)
|
||||
|
||||
all_data.extend(type_results)
|
||||
|
||||
# parse python STIX objects from the STIX object dicts
|
||||
stix_objs = [
|
||||
parse(stix_obj_dict, allow_custom=self.allow_custom,
|
||||
version=version)
|
||||
for stix_obj_dict in all_data
|
||||
]
|
||||
|
||||
return stix_objs
|
||||
|
|
|
@ -10,6 +10,7 @@ from stix2.base import _STIXBase
|
|||
from stix2.core import Bundle, parse
|
||||
from stix2.datastore import DataSink, DataSource, DataStoreMixin
|
||||
from stix2.datastore.filters import FilterSet, apply_common_filters
|
||||
from stix2.utils import is_marking
|
||||
|
||||
|
||||
def _add(store, stix_data=None, allow_custom=True, version=None):
|
||||
|
@ -43,7 +44,7 @@ def _add(store, stix_data=None, allow_custom=True, version=None):
|
|||
|
||||
# Map ID directly to the object, if it is a marking. Otherwise,
|
||||
# map to a family, so we can track multiple versions.
|
||||
if _is_marking(stix_obj):
|
||||
if is_marking(stix_obj):
|
||||
store._data[stix_obj["id"]] = stix_obj
|
||||
|
||||
else:
|
||||
|
@ -56,22 +57,6 @@ def _add(store, stix_data=None, allow_custom=True, version=None):
|
|||
obj_family.add(stix_obj)
|
||||
|
||||
|
||||
def _is_marking(obj_or_id):
|
||||
"""Determines whether the given object or object ID is/is for a marking
|
||||
definition.
|
||||
|
||||
:param obj_or_id: A STIX object or object ID as a string.
|
||||
:return: True if a marking definition, False otherwise.
|
||||
"""
|
||||
|
||||
if isinstance(obj_or_id, (_STIXBase, dict)):
|
||||
id_ = obj_or_id["id"]
|
||||
else:
|
||||
id_ = obj_or_id
|
||||
|
||||
return id_.startswith("marking-definition--")
|
||||
|
||||
|
||||
class _ObjectFamily(object):
|
||||
"""
|
||||
An internal implementation detail of memory sources/sinks/stores.
|
||||
|
@ -255,7 +240,7 @@ class MemorySource(DataSource):
|
|||
"""
|
||||
stix_obj = None
|
||||
|
||||
if _is_marking(stix_id):
|
||||
if is_marking(stix_id):
|
||||
stix_obj = self._data.get(stix_id)
|
||||
else:
|
||||
object_family = self._data.get(stix_id)
|
||||
|
@ -291,7 +276,7 @@ class MemorySource(DataSource):
|
|||
"""
|
||||
results = []
|
||||
stix_objs_to_filter = None
|
||||
if _is_marking(stix_id):
|
||||
if is_marking(stix_id):
|
||||
stix_obj = self._data.get(stix_id)
|
||||
if stix_obj:
|
||||
stix_objs_to_filter = [stix_obj]
|
||||
|
|
|
@ -1,15 +0,0 @@
|
|||
{
|
||||
"id": "bundle--81884287-2548-47fc-a997-39489ddd5462",
|
||||
"objects": [
|
||||
{
|
||||
"created": "2017-06-01T00:00:00Z",
|
||||
"id": "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5",
|
||||
"identity_class": "organization",
|
||||
"modified": "2017-06-01T00:00:00Z",
|
||||
"name": "The MITRE Corporation",
|
||||
"type": "identity"
|
||||
}
|
||||
],
|
||||
"spec_version": "2.0",
|
||||
"type": "bundle"
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
{
|
||||
"id": "bundle--81884287-2548-47fc-a997-39489ddd5462",
|
||||
"objects": [
|
||||
{
|
||||
"created": "2017-06-01T00:00:00Z",
|
||||
"id": "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5",
|
||||
"identity_class": "organization",
|
||||
"modified": "2017-06-01T00:00:00Z",
|
||||
"name": "The MITRE Corporation",
|
||||
"type": "identity"
|
||||
}
|
||||
],
|
||||
"spec_version": "2.0",
|
||||
"type": "bundle"
|
||||
}
|
|
@ -1,12 +1,17 @@
|
|||
import errno
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import stat
|
||||
|
||||
import pytest
|
||||
|
||||
from stix2 import (Bundle, Campaign, CustomObject, FileSystemSink,
|
||||
FileSystemSource, FileSystemStore, Filter, Identity,
|
||||
Indicator, Malware, Relationship, properties)
|
||||
Indicator, Malware, Relationship, parse, properties)
|
||||
from stix2.datastore.filesystem import (AuthSet, _find_search_optimizations,
|
||||
_get_matching_dir_entries,
|
||||
_timestamp2filename)
|
||||
from stix2.test.constants import (CAMPAIGN_ID, CAMPAIGN_KWARGS, IDENTITY_ID,
|
||||
IDENTITY_KWARGS, INDICATOR_ID,
|
||||
INDICATOR_KWARGS, MALWARE_ID, MALWARE_KWARGS,
|
||||
|
@ -97,7 +102,20 @@ def rel_fs_store():
|
|||
yield fs
|
||||
|
||||
for o in stix_objs:
|
||||
os.remove(os.path.join(FS_PATH, o.type, o.id + '.json'))
|
||||
filepath = os.path.join(FS_PATH, o.type, o.id,
|
||||
_timestamp2filename(o.modified) + '.json')
|
||||
|
||||
# Some test-scoped fixtures (e.g. fs_store) delete all campaigns, so by
|
||||
# the time this module-scoped fixture tears itself down, it may find
|
||||
# its campaigns already gone, which causes not-found errors.
|
||||
try:
|
||||
os.remove(filepath)
|
||||
except OSError as e:
|
||||
# 3 is the ERROR_PATH_NOT_FOUND windows error code. Which has an
|
||||
# errno symbolic value, but not the windows meaning...
|
||||
if e.errno in (errno.ENOENT, 3):
|
||||
continue
|
||||
raise e
|
||||
|
||||
|
||||
def test_filesystem_source_nonexistent_folder():
|
||||
|
@ -182,14 +200,16 @@ def test_filesystem_sink_add_python_stix_object(fs_sink, fs_source):
|
|||
|
||||
fs_sink.add(camp1)
|
||||
|
||||
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp1.id + ".json"))
|
||||
filepath = os.path.join(FS_PATH, "campaign", camp1.id,
|
||||
_timestamp2filename(camp1.modified) + ".json")
|
||||
assert os.path.exists(filepath)
|
||||
|
||||
camp1_r = fs_source.get(camp1.id)
|
||||
assert camp1_r.id == camp1.id
|
||||
assert camp1_r.name == "Hannibal"
|
||||
assert "War Elephant" in camp1_r.aliases
|
||||
|
||||
os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json"))
|
||||
os.remove(filepath)
|
||||
|
||||
|
||||
def test_filesystem_sink_add_stix_object_dict(fs_sink, fs_source):
|
||||
|
@ -200,19 +220,30 @@ def test_filesystem_sink_add_stix_object_dict(fs_sink, fs_source):
|
|||
"objective": "German and French Intelligence Services",
|
||||
"aliases": ["Purple Robes"],
|
||||
"id": "campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
|
||||
"created": "2017-05-31T21:31:53.197755Z"
|
||||
"created": "2017-05-31T21:31:53.197755Z",
|
||||
"modified": "2017-05-31T21:31:53.197755Z"
|
||||
}
|
||||
|
||||
fs_sink.add(camp2)
|
||||
|
||||
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp2["id"] + ".json"))
|
||||
# Need to get the exact "modified" timestamp which would have been
|
||||
# in effect at the time the object was saved to the sink, which determines
|
||||
# the filename it would have been saved as. It may not be exactly the same
|
||||
# as what's in the dict, since the parsing process can enforce a precision
|
||||
# constraint (e.g. truncate to milliseconds), which results in a slightly
|
||||
# different name.
|
||||
camp2obj = parse(camp2)
|
||||
filepath = os.path.join(FS_PATH, "campaign", camp2obj["id"],
|
||||
_timestamp2filename(camp2obj["modified"]) + ".json")
|
||||
|
||||
assert os.path.exists(filepath)
|
||||
|
||||
camp2_r = fs_source.get(camp2["id"])
|
||||
assert camp2_r.id == camp2["id"]
|
||||
assert camp2_r.name == camp2["name"]
|
||||
assert "Purple Robes" in camp2_r.aliases
|
||||
|
||||
os.remove(os.path.join(FS_PATH, "campaign", camp2_r.id + ".json"))
|
||||
os.remove(filepath)
|
||||
|
||||
|
||||
def test_filesystem_sink_add_stix_bundle_dict(fs_sink, fs_source):
|
||||
|
@ -228,53 +259,74 @@ def test_filesystem_sink_add_stix_bundle_dict(fs_sink, fs_source):
|
|||
"objective": "Bulgarian, Albanian and Romanian Intelligence Services",
|
||||
"aliases": ["Huns"],
|
||||
"id": "campaign--b8f86161-ccae-49de-973a-4ca320c62478",
|
||||
"created": "2017-05-31T21:31:53.197755Z"
|
||||
"created": "2017-05-31T21:31:53.197755Z",
|
||||
"modified": "2017-05-31T21:31:53.197755Z"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
fs_sink.add(bund)
|
||||
|
||||
assert os.path.exists(os.path.join(FS_PATH, "campaign", bund["objects"][0]["id"] + ".json"))
|
||||
camp_obj = parse(bund["objects"][0])
|
||||
filepath = os.path.join(FS_PATH, "campaign", camp_obj["id"],
|
||||
_timestamp2filename(camp_obj["modified"]) + ".json")
|
||||
|
||||
assert os.path.exists(filepath)
|
||||
|
||||
camp3_r = fs_source.get(bund["objects"][0]["id"])
|
||||
assert camp3_r.id == bund["objects"][0]["id"]
|
||||
assert camp3_r.name == bund["objects"][0]["name"]
|
||||
assert "Huns" in camp3_r.aliases
|
||||
|
||||
os.remove(os.path.join(FS_PATH, "campaign", camp3_r.id + ".json"))
|
||||
os.remove(filepath)
|
||||
|
||||
|
||||
def test_filesystem_sink_add_json_stix_object(fs_sink, fs_source):
|
||||
# add json-encoded stix obj
|
||||
camp4 = '{"type": "campaign", "id":"campaign--6a6ca372-ba07-42cc-81ef-9840fc1f963d",'\
|
||||
' "created":"2017-05-31T21:31:53.197755Z", "name": "Ghengis Khan", "objective": "China and Russian infrastructure"}'
|
||||
' "created":"2017-05-31T21:31:53.197755Z",'\
|
||||
' "modified":"2017-05-31T21:31:53.197755Z",'\
|
||||
' "name": "Ghengis Khan", "objective": "China and Russian infrastructure"}'
|
||||
|
||||
fs_sink.add(camp4)
|
||||
|
||||
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--6a6ca372-ba07-42cc-81ef-9840fc1f963d" + ".json"))
|
||||
camp4obj = parse(camp4)
|
||||
filepath = os.path.join(FS_PATH, "campaign",
|
||||
"campaign--6a6ca372-ba07-42cc-81ef-9840fc1f963d",
|
||||
_timestamp2filename(camp4obj["modified"]) + ".json")
|
||||
|
||||
assert os.path.exists(filepath)
|
||||
|
||||
camp4_r = fs_source.get("campaign--6a6ca372-ba07-42cc-81ef-9840fc1f963d")
|
||||
assert camp4_r.id == "campaign--6a6ca372-ba07-42cc-81ef-9840fc1f963d"
|
||||
assert camp4_r.name == "Ghengis Khan"
|
||||
|
||||
os.remove(os.path.join(FS_PATH, "campaign", camp4_r.id + ".json"))
|
||||
os.remove(filepath)
|
||||
|
||||
|
||||
def test_filesystem_sink_json_stix_bundle(fs_sink, fs_source):
|
||||
# add json-encoded stix bundle
|
||||
bund2 = '{"type": "bundle", "id": "bundle--3d267103-8475-4d8f-b321-35ec6eccfa37",' \
|
||||
' "spec_version": "2.0", "objects": [{"type": "campaign", "id": "campaign--2c03b8bf-82ee-433e-9918-ca2cb6e9534b",' \
|
||||
' "created":"2017-05-31T21:31:53.197755Z", "name": "Spartacus", "objective": "Oppressive regimes of Africa and Middle East"}]}'
|
||||
' "created":"2017-05-31T21:31:53.197755Z",'\
|
||||
' "modified":"2017-05-31T21:31:53.197755Z",'\
|
||||
' "name": "Spartacus", "objective": "Oppressive regimes of Africa and Middle East"}]}'
|
||||
fs_sink.add(bund2)
|
||||
|
||||
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--2c03b8bf-82ee-433e-9918-ca2cb6e9534b" + ".json"))
|
||||
bund2obj = parse(bund2)
|
||||
camp_obj = bund2obj["objects"][0]
|
||||
|
||||
filepath = os.path.join(FS_PATH, "campaign",
|
||||
"campaign--2c03b8bf-82ee-433e-9918-ca2cb6e9534b",
|
||||
_timestamp2filename(camp_obj["modified"]) + ".json")
|
||||
|
||||
assert os.path.exists(filepath)
|
||||
|
||||
camp5_r = fs_source.get("campaign--2c03b8bf-82ee-433e-9918-ca2cb6e9534b")
|
||||
assert camp5_r.id == "campaign--2c03b8bf-82ee-433e-9918-ca2cb6e9534b"
|
||||
assert camp5_r.name == "Spartacus"
|
||||
|
||||
os.remove(os.path.join(FS_PATH, "campaign", camp5_r.id + ".json"))
|
||||
os.remove(filepath)
|
||||
|
||||
|
||||
def test_filesystem_sink_add_objects_list(fs_sink, fs_source):
|
||||
|
@ -289,13 +341,23 @@ def test_filesystem_sink_add_objects_list(fs_sink, fs_source):
|
|||
"objective": "Central and Eastern Europe military commands and departments",
|
||||
"aliases": ["The Frenchmen"],
|
||||
"id": "campaign--122818b6-1112-4fb0-b11b-b111107ca70a",
|
||||
"created": "2017-05-31T21:31:53.197755Z"
|
||||
"created": "2017-05-31T21:31:53.197755Z",
|
||||
"modified": "2017-05-31T21:31:53.197755Z"
|
||||
}
|
||||
|
||||
fs_sink.add([camp6, camp7])
|
||||
|
||||
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp6.id + ".json"))
|
||||
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--122818b6-1112-4fb0-b11b-b111107ca70a" + ".json"))
|
||||
camp7obj = parse(camp7)
|
||||
|
||||
camp6filepath = os.path.join(FS_PATH, "campaign", camp6.id,
|
||||
_timestamp2filename(camp6["modified"]) +
|
||||
".json")
|
||||
camp7filepath = os.path.join(
|
||||
FS_PATH, "campaign", "campaign--122818b6-1112-4fb0-b11b-b111107ca70a",
|
||||
_timestamp2filename(camp7obj["modified"]) + ".json")
|
||||
|
||||
assert os.path.exists(camp6filepath)
|
||||
assert os.path.exists(camp7filepath)
|
||||
|
||||
camp6_r = fs_source.get(camp6.id)
|
||||
assert camp6_r.id == camp6.id
|
||||
|
@ -306,8 +368,8 @@ def test_filesystem_sink_add_objects_list(fs_sink, fs_source):
|
|||
assert "The Frenchmen" in camp7_r.aliases
|
||||
|
||||
# remove all added objects
|
||||
os.remove(os.path.join(FS_PATH, "campaign", camp6_r.id + ".json"))
|
||||
os.remove(os.path.join(FS_PATH, "campaign", camp7_r.id + ".json"))
|
||||
os.remove(camp6filepath)
|
||||
os.remove(camp7filepath)
|
||||
|
||||
|
||||
def test_filesystem_store_get_stored_as_bundle(fs_store):
|
||||
|
@ -375,8 +437,11 @@ def test_filesystem_store_add(fs_store):
|
|||
assert camp1_r.id == camp1.id
|
||||
assert camp1_r.name == camp1.name
|
||||
|
||||
filepath = os.path.join(FS_PATH, "campaign", camp1_r.id,
|
||||
_timestamp2filename(camp1_r.modified) + ".json")
|
||||
|
||||
# remove
|
||||
os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json"))
|
||||
os.remove(filepath)
|
||||
|
||||
|
||||
def test_filesystem_store_add_as_bundle():
|
||||
|
@ -387,7 +452,10 @@ def test_filesystem_store_add_as_bundle():
|
|||
aliases=["Ragnar"])
|
||||
fs_store.add(camp1)
|
||||
|
||||
with open(os.path.join(FS_PATH, "campaign", camp1.id + ".json")) as bundle_file:
|
||||
filepath = os.path.join(FS_PATH, "campaign", camp1.id,
|
||||
_timestamp2filename(camp1.modified) + ".json")
|
||||
|
||||
with open(filepath) as bundle_file:
|
||||
assert '"type": "bundle"' in bundle_file.read()
|
||||
|
||||
camp1_r = fs_store.get(camp1.id)
|
||||
|
@ -527,3 +595,334 @@ def test_related_to_by_target(rel_fs_store):
|
|||
assert len(resp) == 2
|
||||
assert any(x['id'] == CAMPAIGN_ID for x in resp)
|
||||
assert any(x['id'] == INDICATOR_ID for x in resp)
|
||||
|
||||
|
||||
def test_auth_set_white1():
|
||||
auth_set = AuthSet({"A"}, set())
|
||||
|
||||
assert auth_set.auth_type == AuthSet.WHITE
|
||||
assert auth_set.values == {"A"}
|
||||
|
||||
|
||||
def test_auth_set_white2():
|
||||
auth_set = AuthSet(set(), set())
|
||||
|
||||
assert auth_set.auth_type == AuthSet.WHITE
|
||||
assert len(auth_set.values) == 0
|
||||
|
||||
|
||||
def test_auth_set_white3():
|
||||
auth_set = AuthSet({"A", "B"}, {"B", "C"})
|
||||
|
||||
assert auth_set.auth_type == AuthSet.WHITE
|
||||
assert auth_set.values == {"A"}
|
||||
|
||||
|
||||
def test_auth_set_black1():
|
||||
auth_set = AuthSet(None, {"B", "C"})
|
||||
|
||||
assert auth_set.auth_type == AuthSet.BLACK
|
||||
assert auth_set.values == {"B", "C"}
|
||||
|
||||
|
||||
def test_optimize_types1():
|
||||
filters = [
|
||||
Filter("type", "=", "foo")
|
||||
]
|
||||
|
||||
auth_types, auth_ids = _find_search_optimizations(filters)
|
||||
|
||||
assert auth_types.auth_type == AuthSet.WHITE
|
||||
assert auth_types.values == {"foo"}
|
||||
assert auth_ids.auth_type == AuthSet.BLACK
|
||||
assert len(auth_ids.values) == 0
|
||||
|
||||
|
||||
def test_optimize_types2():
|
||||
filters = [
|
||||
Filter("type", "=", "foo"),
|
||||
Filter("type", "=", "bar")
|
||||
]
|
||||
|
||||
auth_types, auth_ids = _find_search_optimizations(filters)
|
||||
|
||||
assert auth_types.auth_type == AuthSet.WHITE
|
||||
assert len(auth_types.values) == 0
|
||||
assert auth_ids.auth_type == AuthSet.BLACK
|
||||
assert len(auth_ids.values) == 0
|
||||
|
||||
|
||||
def test_optimize_types3():
|
||||
filters = [
|
||||
Filter("type", "in", ["A", "B", "C"]),
|
||||
Filter("type", "in", ["B", "C", "D"])
|
||||
]
|
||||
|
||||
auth_types, auth_ids = _find_search_optimizations(filters)
|
||||
|
||||
assert auth_types.auth_type == AuthSet.WHITE
|
||||
assert auth_types.values == {"B", "C"}
|
||||
assert auth_ids.auth_type == AuthSet.BLACK
|
||||
assert len(auth_ids.values) == 0
|
||||
|
||||
|
||||
def test_optimize_types4():
|
||||
filters = [
|
||||
Filter("type", "in", ["A", "B", "C"]),
|
||||
Filter("type", "in", ["D", "E", "F"])
|
||||
]
|
||||
|
||||
auth_types, auth_ids = _find_search_optimizations(filters)
|
||||
|
||||
assert auth_types.auth_type == AuthSet.WHITE
|
||||
assert len(auth_types.values) == 0
|
||||
assert auth_ids.auth_type == AuthSet.BLACK
|
||||
assert len(auth_ids.values) == 0
|
||||
|
||||
|
||||
def test_optimize_types5():
|
||||
filters = [
|
||||
Filter("type", "in", ["foo", "bar"]),
|
||||
Filter("type", "!=", "bar")
|
||||
]
|
||||
|
||||
auth_types, auth_ids = _find_search_optimizations(filters)
|
||||
|
||||
assert auth_types.auth_type == AuthSet.WHITE
|
||||
assert auth_types.values == {"foo"}
|
||||
assert auth_ids.auth_type == AuthSet.BLACK
|
||||
assert len(auth_ids.values) == 0
|
||||
|
||||
|
||||
def test_optimize_types6():
|
||||
filters = [
|
||||
Filter("type", "!=", "foo"),
|
||||
Filter("type", "!=", "bar")
|
||||
]
|
||||
|
||||
auth_types, auth_ids = _find_search_optimizations(filters)
|
||||
|
||||
assert auth_types.auth_type == AuthSet.BLACK
|
||||
assert auth_types.values == {"foo", "bar"}
|
||||
assert auth_ids.auth_type == AuthSet.BLACK
|
||||
assert len(auth_ids.values) == 0
|
||||
|
||||
|
||||
def test_optimize_types7():
|
||||
filters = [
|
||||
Filter("type", "=", "foo"),
|
||||
Filter("type", "!=", "foo")
|
||||
]
|
||||
|
||||
auth_types, auth_ids = _find_search_optimizations(filters)
|
||||
|
||||
assert auth_types.auth_type == AuthSet.WHITE
|
||||
assert len(auth_types.values) == 0
|
||||
assert auth_ids.auth_type == AuthSet.BLACK
|
||||
assert len(auth_ids.values) == 0
|
||||
|
||||
|
||||
def test_optimize_types8():
|
||||
filters = []
|
||||
|
||||
auth_types, auth_ids = _find_search_optimizations(filters)
|
||||
|
||||
assert auth_types.auth_type == AuthSet.BLACK
|
||||
assert len(auth_types.values) == 0
|
||||
assert auth_ids.auth_type == AuthSet.BLACK
|
||||
assert len(auth_ids.values) == 0
|
||||
|
||||
|
||||
def test_optimize_types_ids1():
|
||||
filters = [
|
||||
Filter("type", "in", ["foo", "bar"]),
|
||||
Filter("id", "=", "foo--00000000-0000-0000-0000-000000000000")
|
||||
]
|
||||
|
||||
auth_types, auth_ids = _find_search_optimizations(filters)
|
||||
|
||||
assert auth_types.auth_type == AuthSet.WHITE
|
||||
assert auth_types.values == {"foo"}
|
||||
assert auth_ids.auth_type == AuthSet.WHITE
|
||||
assert auth_ids.values == {"foo--00000000-0000-0000-0000-000000000000"}
|
||||
|
||||
|
||||
def test_optimize_types_ids2():
|
||||
filters = [
|
||||
Filter("type", "=", "foo"),
|
||||
Filter("id", "=", "bar--00000000-0000-0000-0000-000000000000")
|
||||
]
|
||||
|
||||
auth_types, auth_ids = _find_search_optimizations(filters)
|
||||
|
||||
assert auth_types.auth_type == AuthSet.WHITE
|
||||
assert len(auth_types.values) == 0
|
||||
assert auth_ids.auth_type == AuthSet.WHITE
|
||||
assert len(auth_ids.values) == 0
|
||||
|
||||
|
||||
def test_optimize_types_ids3():
|
||||
filters = [
|
||||
Filter("type", "in", ["foo", "bar"]),
|
||||
Filter("id", "!=", "bar--00000000-0000-0000-0000-000000000000")
|
||||
]
|
||||
|
||||
auth_types, auth_ids = _find_search_optimizations(filters)
|
||||
|
||||
assert auth_types.auth_type == AuthSet.WHITE
|
||||
assert auth_types.values == {"foo", "bar"}
|
||||
assert auth_ids.auth_type == AuthSet.BLACK
|
||||
assert auth_ids.values == {"bar--00000000-0000-0000-0000-000000000000"}
|
||||
|
||||
|
||||
def test_optimize_types_ids4():
|
||||
filters = [
|
||||
Filter("type", "in", ["A", "B", "C"]),
|
||||
Filter("id", "in", [
|
||||
"B--00000000-0000-0000-0000-000000000000",
|
||||
"C--00000000-0000-0000-0000-000000000000",
|
||||
"D--00000000-0000-0000-0000-000000000000",
|
||||
])
|
||||
]
|
||||
|
||||
auth_types, auth_ids = _find_search_optimizations(filters)
|
||||
|
||||
assert auth_types.auth_type == AuthSet.WHITE
|
||||
assert auth_types.values == {"B", "C"}
|
||||
assert auth_ids.auth_type == AuthSet.WHITE
|
||||
assert auth_ids.values == {
|
||||
"B--00000000-0000-0000-0000-000000000000",
|
||||
"C--00000000-0000-0000-0000-000000000000"
|
||||
}
|
||||
|
||||
|
||||
def test_optimize_types_ids5():
|
||||
filters = [
|
||||
Filter("type", "in", ["A", "B", "C"]),
|
||||
Filter("type", "!=", "C"),
|
||||
Filter("id", "in", [
|
||||
"B--00000000-0000-0000-0000-000000000000",
|
||||
"C--00000000-0000-0000-0000-000000000000",
|
||||
"D--00000000-0000-0000-0000-000000000000"
|
||||
]),
|
||||
Filter("id", "!=", "D--00000000-0000-0000-0000-000000000000")
|
||||
]
|
||||
|
||||
auth_types, auth_ids = _find_search_optimizations(filters)
|
||||
|
||||
assert auth_types.auth_type == AuthSet.WHITE
|
||||
assert auth_types.values == {"B"}
|
||||
assert auth_ids.auth_type == AuthSet.WHITE
|
||||
assert auth_ids.values == {"B--00000000-0000-0000-0000-000000000000"}
|
||||
|
||||
|
||||
def test_optimize_types_ids6():
|
||||
filters = [
|
||||
Filter("id", "=", "A--00000000-0000-0000-0000-000000000000")
|
||||
]
|
||||
|
||||
auth_types, auth_ids = _find_search_optimizations(filters)
|
||||
|
||||
assert auth_types.auth_type == AuthSet.WHITE
|
||||
assert auth_types.values == {"A"}
|
||||
assert auth_ids.auth_type == AuthSet.WHITE
|
||||
assert auth_ids.values == {"A--00000000-0000-0000-0000-000000000000"}
|
||||
|
||||
|
||||
def test_search_auth_set_white1():
|
||||
auth_set = AuthSet(
|
||||
{"attack-pattern", "doesntexist"},
|
||||
set()
|
||||
)
|
||||
|
||||
results = _get_matching_dir_entries(FS_PATH, auth_set, stat.S_ISDIR)
|
||||
assert results == ["attack-pattern"]
|
||||
|
||||
results = _get_matching_dir_entries(FS_PATH, auth_set, stat.S_ISREG)
|
||||
assert len(results) == 0
|
||||
|
||||
|
||||
def test_search_auth_set_white2():
|
||||
auth_set = AuthSet(
|
||||
{
|
||||
"malware--6b616fc1-1505-48e3-8b2c-0d19337bff38",
|
||||
"malware--92ec0cbd-2c30-44a2-b270-73f4ec949841"
|
||||
|
||||
},
|
||||
{
|
||||
"malware--92ec0cbd-2c30-44a2-b270-73f4ec949841",
|
||||
"malware--96b08451-b27a-4ff6-893f-790e26393a8e",
|
||||
"doesntexist"
|
||||
}
|
||||
)
|
||||
|
||||
results = _get_matching_dir_entries(
|
||||
os.path.join(FS_PATH, "malware"),
|
||||
auth_set, stat.S_ISDIR
|
||||
)
|
||||
|
||||
assert results == ["malware--6b616fc1-1505-48e3-8b2c-0d19337bff38"]
|
||||
|
||||
|
||||
def test_search_auth_set_white3():
|
||||
auth_set = AuthSet({"20170531213258226477", "doesntexist"}, set())
|
||||
|
||||
results = _get_matching_dir_entries(
|
||||
os.path.join(FS_PATH, "malware",
|
||||
"malware--6b616fc1-1505-48e3-8b2c-0d19337bff38"),
|
||||
auth_set, stat.S_ISREG, ".json"
|
||||
)
|
||||
|
||||
assert results == ["20170531213258226477.json"]
|
||||
|
||||
|
||||
def test_search_auth_set_black1():
|
||||
auth_set = AuthSet(
|
||||
None,
|
||||
{"tool--242f3da3-4425-4d11-8f5c-b842886da966", "doesntexist"}
|
||||
)
|
||||
|
||||
results = _get_matching_dir_entries(
|
||||
os.path.join(FS_PATH, "tool"),
|
||||
auth_set, stat.S_ISDIR
|
||||
)
|
||||
|
||||
assert set(results) == {
|
||||
"tool--03342581-f790-4f03-ba41-e82e67392e23"
|
||||
}
|
||||
|
||||
|
||||
def test_search_auth_set_white_empty():
|
||||
auth_set = AuthSet(
|
||||
set(),
|
||||
set()
|
||||
)
|
||||
|
||||
results = _get_matching_dir_entries(FS_PATH, auth_set, stat.S_ISDIR)
|
||||
|
||||
assert len(results) == 0
|
||||
|
||||
|
||||
def test_search_auth_set_black_empty(rel_fs_store):
|
||||
# Ensure rel_fs_store fixture has run so that the type directories are
|
||||
# predictable (it adds "campaign").
|
||||
auth_set = AuthSet(
|
||||
None,
|
||||
set()
|
||||
)
|
||||
|
||||
results = _get_matching_dir_entries(FS_PATH, auth_set, stat.S_ISDIR)
|
||||
|
||||
# Should get all dirs
|
||||
assert set(results) == {
|
||||
"attack-pattern",
|
||||
"campaign",
|
||||
"course-of-action",
|
||||
"identity",
|
||||
"indicator",
|
||||
"intrusion-set",
|
||||
"malware",
|
||||
"marking-definition",
|
||||
"relationship",
|
||||
"tool"
|
||||
}
|
||||
|
|
|
@ -7,6 +7,8 @@ import json
|
|||
from dateutil import parser
|
||||
import pytz
|
||||
|
||||
import stix2.base
|
||||
|
||||
from .exceptions import (InvalidValueError, RevokeError,
|
||||
UnmodifiablePropertyError)
|
||||
|
||||
|
@ -364,3 +366,20 @@ def remove_custom_stix(stix_obj):
|
|||
|
||||
def get_type_from_id(stix_id):
|
||||
return stix_id.split('--', 1)[0]
|
||||
|
||||
|
||||
def is_marking(obj_or_id):
|
||||
"""Determines whether the given object or object ID is/is for a marking
|
||||
definition.
|
||||
|
||||
:param obj_or_id: A STIX object or object ID as a string.
|
||||
:return: True if a marking definition, False otherwise.
|
||||
"""
|
||||
|
||||
if isinstance(obj_or_id, (stix2.base._STIXBase, dict)):
|
||||
result = obj_or_id["type"] == "marking-definition"
|
||||
else:
|
||||
# it's a string ID
|
||||
result = obj_or_id.startswith("marking-definition--")
|
||||
|
||||
return result
|
||||
|
|
Loading…
Reference in New Issue