Merge pull request #343 from chisholm/sco_tlo_filesystemstore

Fix the filesystem store to support the new top-level 2.1 SCOs.
master
Chris Lenk 2020-03-05 17:08:20 -05:00 committed by GitHub
commit 3803e4bdd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 133 additions and 40 deletions

View File

@ -15,7 +15,7 @@ from stix2.datastore import (
DataSink, DataSource, DataSourceError, DataStoreMixin,
)
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
from stix2.utils import format_datetime, get_type_from_id, is_marking
from stix2.utils import format_datetime, get_type_from_id
def _timestamp2filename(timestamp):
@ -329,11 +329,50 @@ def _check_object_from_file(query, filepath, allow_custom, version, encoding):
return result
def _is_versioned_type_dir(type_path, type_name):
"""
Try to detect whether the given directory is for a versioned type of STIX
object. This is done by looking for a directory whose name is a STIX ID
of the appropriate type. If found, treat this type as versioned. This
doesn't work when a versioned type directory is empty (it will be
mis-classified as unversioned), but this detection is only necessary when
reading/querying data. If a directory is empty, you'll get no results
either way.
Args:
type_path: A path to a directory containing one type of STIX object.
type_name: The STIX type name.
Returns:
True if the directory looks like it contains versioned objects; False
if not.
Raises:
OSError: If there are errors accessing directory contents or stat()'ing
files
"""
id_regex = re.compile(
r"^" + re.escape(type_name) +
r"--[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}"
r"-[0-9a-f]{12}$",
re.I,
)
for entry in os.listdir(type_path):
s = os.stat(os.path.join(type_path, entry))
if stat.S_ISDIR(s.st_mode) and id_regex.match(entry):
is_versioned = True
break
else:
is_versioned = False
return is_versioned
def _search_versioned(query, type_path, auth_ids, allow_custom, version, encoding):
"""
Searches the given directory, which contains data for STIX objects of a
particular versioned type (i.e. not markings), and return any which match
the query.
particular versioned type, and return any which match the query.
Args:
query: The query to match against
@ -390,36 +429,24 @@ def _search_versioned(query, type_path, auth_ids, allow_custom, version, encodin
# For backward-compatibility, also search for plain files named after
# object IDs, in the type directory.
id_files = _get_matching_dir_entries(
type_path, auth_ids, stat.S_ISREG,
".json",
backcompat_results = _search_unversioned(
query, type_path, auth_ids, allow_custom, version, encoding,
)
for id_file in id_files:
id_path = os.path.join(type_path, id_file)
try:
stix_obj = _check_object_from_file(
query, id_path, allow_custom,
version, encoding,
)
if stix_obj:
results.append(stix_obj)
except IOError as e:
if e.errno != errno.ENOENT:
raise
# else, file-not-found is ok, just skip
results.extend(backcompat_results)
return results
def _search_markings(query, markings_path, auth_ids, allow_custom, version, encoding):
def _search_unversioned(
query, type_path, auth_ids, allow_custom, version, encoding,
):
"""
Searches the given directory, which contains markings data, and return any
which match the query.
Searches the given directory, which contains unversioned data, and return
any objects which match the query.
Args:
query: The query to match against
markings_path: The directory with STIX markings files
type_path: The directory with STIX files of unversioned type
auth_ids: Search optimization based on object ID
allow_custom (bool): Whether to allow custom properties as well unknown
custom objects.
@ -441,11 +468,11 @@ def _search_markings(query, markings_path, auth_ids, allow_custom, version, enco
"""
results = []
id_files = _get_matching_dir_entries(
markings_path, auth_ids, stat.S_ISREG,
type_path, auth_ids, stat.S_ISREG,
".json",
)
for id_file in id_files:
id_path = os.path.join(markings_path, id_file)
id_path = os.path.join(type_path, id_file)
try:
stix_obj = _check_object_from_file(
@ -530,12 +557,14 @@ class FileSystemSink(DataSink):
"""Write the given STIX object to a file in the STIX file directory.
"""
type_dir = os.path.join(self._stix_dir, stix_obj["type"])
if is_marking(stix_obj):
filename = stix_obj["id"]
obj_dir = type_dir
else:
# All versioned objects should have a "modified" property.
if "modified" in stix_obj:
filename = _timestamp2filename(stix_obj["modified"])
obj_dir = os.path.join(type_dir, stix_obj["id"])
else:
filename = stix_obj["id"]
obj_dir = type_dir
file_path = os.path.join(obj_dir, filename + ".json")
@ -649,12 +678,14 @@ class FileSystemSource(DataSource):
all_data = self.all_versions(stix_id, version=version, _composite_filters=_composite_filters)
if all_data:
if is_marking(stix_id):
# Markings are unversioned; there shouldn't be more than one
# result.
stix_obj = all_data[0]
else:
# Simple check for a versioned STIX type: see if the objects have a
# "modified" property. (Need only check one, since they are all of
# the same type.)
is_versioned = "modified" in all_data[0]
if is_versioned:
stix_obj = sorted(all_data, key=lambda k: k['modified'])[-1]
else:
stix_obj = all_data[0]
else:
stix_obj = None
@ -720,14 +751,15 @@ class FileSystemSource(DataSource):
)
for type_dir in type_dirs:
type_path = os.path.join(self._stix_dir, type_dir)
if type_dir == "marking-definition":
type_results = _search_markings(
type_is_versioned = _is_versioned_type_dir(type_path, type_dir)
if type_is_versioned:
type_results = _search_versioned(
query, type_path, auth_ids,
self.allow_custom, version,
self.encoding,
)
else:
type_results = _search_versioned(
type_results = _search_unversioned(
query, type_path, auth_ids,
self.allow_custom, version,
self.encoding,

View File

@ -0,0 +1,11 @@
{
"ctime": "2020-10-06T01:54:32.000Z",
"contains_refs": [
"directory--80539e31-85f3-4304-bd14-e2e8c10859a5",
"file--e9e03175-0357-41b5-a2aa-eb99b455cd0c",
"directory--f6c54233-027b-4464-8126-da1324d8f66c"
],
"path": "/performance/Democrat.gif",
"type": "directory",
"id": "directory--572827aa-e0cd-44fd-afd5-a717a7585f39"
}

View File

@ -221,6 +221,16 @@ def test_filesystem_source_backward_compatible(fs_source):
assert result.malware_types == ["version four"]
def test_filesystem_source_sco(fs_source):
results = fs_source.query([stix2.Filter("type", "=", "directory")])
assert len(results) == 1
result = results[0]
assert result["type"] == "directory"
assert result["id"] == "directory--572827aa-e0cd-44fd-afd5-a717a7585f39"
assert result["path"] == "/performance/Democrat.gif"
def test_filesystem_sink_add_python_stix_object(fs_sink, fs_source):
# add python stix object
camp1 = stix2.v21.Campaign(
@ -435,6 +445,24 @@ def test_filesystem_sink_marking(fs_sink):
os.remove(marking_filepath)
def test_filesystem_sink_sco(fs_sink):
file_sco = {
"type": "file",
"id": "file--decfcc48-31b3-45f5-87c8-1b3a5d71a307",
"name": "cats.png",
}
fs_sink.add(file_sco)
sco_filepath = os.path.join(
FS_PATH, "file", file_sco["id"] + ".json",
)
assert os.path.exists(sco_filepath)
os.remove(sco_filepath)
os.rmdir(os.path.dirname(sco_filepath))
def test_filesystem_store_get_stored_as_bundle(fs_store):
coa = fs_store.get("course-of-action--95ddb356-7ba0-4bd9-a889-247262b8946f")
assert coa.id == "course-of-action--95ddb356-7ba0-4bd9-a889-247262b8946f"
@ -473,9 +501,10 @@ def test_filesystem_store_query_single_filter(fs_store):
def test_filesystem_store_empty_query(fs_store):
results = fs_store.query() # returns all
assert len(results) == 30
assert len(results) == 31
assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [obj.id for obj in results]
assert "marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168" in [obj.id for obj in results]
assert "directory--572827aa-e0cd-44fd-afd5-a717a7585f39" in [obj.id for obj in results]
def test_filesystem_store_query_multiple_filters(fs_store):
@ -487,7 +516,7 @@ def test_filesystem_store_query_multiple_filters(fs_store):
def test_filesystem_store_query_dont_include_type_folder(fs_store):
results = fs_store.query(stix2.Filter("type", "!=", "tool"))
assert len(results) == 28
assert len(results) == 29
def test_filesystem_store_add(fs_store):
@ -574,6 +603,26 @@ def test_filesystem_store_add_marking(fs_store):
os.remove(marking_filepath)
def test_filesystem_store_add_sco(fs_store):
sco = stix2.v21.EmailAddress(
value="jdoe@example.com",
)
fs_store.add(sco)
sco_filepath = os.path.join(
FS_PATH, "email-addr", sco["id"] + ".json",
)
assert os.path.exists(sco_filepath)
sco_r = fs_store.get(sco["id"])
assert sco_r["id"] == sco["id"]
assert sco_r["value"] == sco["value"]
os.remove(sco_filepath)
os.rmdir(os.path.dirname(sco_filepath))
def test_filesystem_object_with_custom_property(fs_store):
camp = stix2.v21.Campaign(
name="Scipio Africanus",
@ -1024,6 +1073,7 @@ def test_search_auth_set_black_empty(rel_fs_store):
"attack-pattern",
"campaign",
"course-of-action",
"directory",
"identity",
"indicator",
"intrusion-set",