diff --git a/stix2/datastore/filesystem.py b/stix2/datastore/filesystem.py index e8442e6..d5acc24 100644 --- a/stix2/datastore/filesystem.py +++ b/stix2/datastore/filesystem.py @@ -15,7 +15,7 @@ from stix2.datastore import ( DataSink, DataSource, DataSourceError, DataStoreMixin, ) from stix2.datastore.filters import Filter, FilterSet, apply_common_filters -from stix2.utils import format_datetime, get_type_from_id, is_marking +from stix2.utils import format_datetime, get_type_from_id def _timestamp2filename(timestamp): @@ -329,11 +329,50 @@ def _check_object_from_file(query, filepath, allow_custom, version, encoding): return result +def _is_versioned_type_dir(type_path, type_name): + """ + Try to detect whether the given directory is for a versioned type of STIX + object. This is done by looking for a directory whose name is a STIX ID + of the appropriate type. If found, treat this type as versioned. This + doesn't work when a versioned type directory is empty (it will be + mis-classified as unversioned), but this detection is only necessary when + reading/querying data. If a directory is empty, you'll get no results + either way. + + Args: + type_path: A path to a directory containing one type of STIX object. + type_name: The STIX type name. + + Returns: + True if the directory looks like it contains versioned objects; False + if not. + + Raises: + OSError: If there are errors accessing directory contents or stat()'ing + files + """ + id_regex = re.compile( + r"^" + re.escape(type_name) + + r"--[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}" + r"-[0-9a-f]{12}$", + re.I, + ) + + for entry in os.listdir(type_path): + s = os.stat(os.path.join(type_path, entry)) + if stat.S_ISDIR(s.st_mode) and id_regex.match(entry): + is_versioned = True + break + else: + is_versioned = False + + return is_versioned + + def _search_versioned(query, type_path, auth_ids, allow_custom, version, encoding): """ Searches the given directory, which contains data for STIX objects of a - particular versioned type (i.e. not markings), and return any which match - the query. + particular versioned type, and return any which match the query. Args: query: The query to match against @@ -390,36 +429,24 @@ def _search_versioned(query, type_path, auth_ids, allow_custom, version, encodin # For backward-compatibility, also search for plain files named after # object IDs, in the type directory. - id_files = _get_matching_dir_entries( - type_path, auth_ids, stat.S_ISREG, - ".json", + backcompat_results = _search_unversioned( + query, type_path, auth_ids, allow_custom, version, encoding, ) - for id_file in id_files: - id_path = os.path.join(type_path, id_file) - - try: - stix_obj = _check_object_from_file( - query, id_path, allow_custom, - version, encoding, - ) - if stix_obj: - results.append(stix_obj) - except IOError as e: - if e.errno != errno.ENOENT: - raise - # else, file-not-found is ok, just skip + results.extend(backcompat_results) return results -def _search_markings(query, markings_path, auth_ids, allow_custom, version, encoding): +def _search_unversioned( + query, type_path, auth_ids, allow_custom, version, encoding, +): """ - Searches the given directory, which contains markings data, and return any - which match the query. + Searches the given directory, which contains unversioned data, and return + any objects which match the query. Args: query: The query to match against - markings_path: The directory with STIX markings files + type_path: The directory with STIX files of unversioned type auth_ids: Search optimization based on object ID allow_custom (bool): Whether to allow custom properties as well unknown custom objects. @@ -441,11 +468,11 @@ def _search_markings(query, markings_path, auth_ids, allow_custom, version, enco """ results = [] id_files = _get_matching_dir_entries( - markings_path, auth_ids, stat.S_ISREG, + type_path, auth_ids, stat.S_ISREG, ".json", ) for id_file in id_files: - id_path = os.path.join(markings_path, id_file) + id_path = os.path.join(type_path, id_file) try: stix_obj = _check_object_from_file( @@ -530,12 +557,14 @@ class FileSystemSink(DataSink): """Write the given STIX object to a file in the STIX file directory. """ type_dir = os.path.join(self._stix_dir, stix_obj["type"]) - if is_marking(stix_obj): - filename = stix_obj["id"] - obj_dir = type_dir - else: + + # All versioned objects should have a "modified" property. + if "modified" in stix_obj: filename = _timestamp2filename(stix_obj["modified"]) obj_dir = os.path.join(type_dir, stix_obj["id"]) + else: + filename = stix_obj["id"] + obj_dir = type_dir file_path = os.path.join(obj_dir, filename + ".json") @@ -649,12 +678,14 @@ class FileSystemSource(DataSource): all_data = self.all_versions(stix_id, version=version, _composite_filters=_composite_filters) if all_data: - if is_marking(stix_id): - # Markings are unversioned; there shouldn't be more than one - # result. - stix_obj = all_data[0] - else: + # Simple check for a versioned STIX type: see if the objects have a + # "modified" property. (Need only check one, since they are all of + # the same type.) + is_versioned = "modified" in all_data[0] + if is_versioned: stix_obj = sorted(all_data, key=lambda k: k['modified'])[-1] + else: + stix_obj = all_data[0] else: stix_obj = None @@ -720,14 +751,15 @@ class FileSystemSource(DataSource): ) for type_dir in type_dirs: type_path = os.path.join(self._stix_dir, type_dir) - if type_dir == "marking-definition": - type_results = _search_markings( + type_is_versioned = _is_versioned_type_dir(type_path, type_dir) + if type_is_versioned: + type_results = _search_versioned( query, type_path, auth_ids, self.allow_custom, version, self.encoding, ) else: - type_results = _search_versioned( + type_results = _search_unversioned( query, type_path, auth_ids, self.allow_custom, version, self.encoding, diff --git a/stix2/test/v21/stix2_data/directory/directory--572827aa-e0cd-44fd-afd5-a717a7585f39.json b/stix2/test/v21/stix2_data/directory/directory--572827aa-e0cd-44fd-afd5-a717a7585f39.json new file mode 100644 index 0000000..3812ed4 --- /dev/null +++ b/stix2/test/v21/stix2_data/directory/directory--572827aa-e0cd-44fd-afd5-a717a7585f39.json @@ -0,0 +1,11 @@ +{ + "ctime": "2020-10-06T01:54:32.000Z", + "contains_refs": [ + "directory--80539e31-85f3-4304-bd14-e2e8c10859a5", + "file--e9e03175-0357-41b5-a2aa-eb99b455cd0c", + "directory--f6c54233-027b-4464-8126-da1324d8f66c" + ], + "path": "/performance/Democrat.gif", + "type": "directory", + "id": "directory--572827aa-e0cd-44fd-afd5-a717a7585f39" +} diff --git a/stix2/test/v21/test_datastore_filesystem.py b/stix2/test/v21/test_datastore_filesystem.py index 9917ccd..3eb8aaa 100644 --- a/stix2/test/v21/test_datastore_filesystem.py +++ b/stix2/test/v21/test_datastore_filesystem.py @@ -221,6 +221,16 @@ def test_filesystem_source_backward_compatible(fs_source): assert result.malware_types == ["version four"] +def test_filesystem_source_sco(fs_source): + results = fs_source.query([stix2.Filter("type", "=", "directory")]) + + assert len(results) == 1 + result = results[0] + assert result["type"] == "directory" + assert result["id"] == "directory--572827aa-e0cd-44fd-afd5-a717a7585f39" + assert result["path"] == "/performance/Democrat.gif" + + def test_filesystem_sink_add_python_stix_object(fs_sink, fs_source): # add python stix object camp1 = stix2.v21.Campaign( @@ -435,6 +445,24 @@ def test_filesystem_sink_marking(fs_sink): os.remove(marking_filepath) +def test_filesystem_sink_sco(fs_sink): + file_sco = { + "type": "file", + "id": "file--decfcc48-31b3-45f5-87c8-1b3a5d71a307", + "name": "cats.png", + } + + fs_sink.add(file_sco) + sco_filepath = os.path.join( + FS_PATH, "file", file_sco["id"] + ".json", + ) + + assert os.path.exists(sco_filepath) + + os.remove(sco_filepath) + os.rmdir(os.path.dirname(sco_filepath)) + + def test_filesystem_store_get_stored_as_bundle(fs_store): coa = fs_store.get("course-of-action--95ddb356-7ba0-4bd9-a889-247262b8946f") assert coa.id == "course-of-action--95ddb356-7ba0-4bd9-a889-247262b8946f" @@ -473,9 +501,10 @@ def test_filesystem_store_query_single_filter(fs_store): def test_filesystem_store_empty_query(fs_store): results = fs_store.query() # returns all - assert len(results) == 30 + assert len(results) == 31 assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [obj.id for obj in results] assert "marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168" in [obj.id for obj in results] + assert "directory--572827aa-e0cd-44fd-afd5-a717a7585f39" in [obj.id for obj in results] def test_filesystem_store_query_multiple_filters(fs_store): @@ -487,7 +516,7 @@ def test_filesystem_store_query_multiple_filters(fs_store): def test_filesystem_store_query_dont_include_type_folder(fs_store): results = fs_store.query(stix2.Filter("type", "!=", "tool")) - assert len(results) == 28 + assert len(results) == 29 def test_filesystem_store_add(fs_store): @@ -574,6 +603,26 @@ def test_filesystem_store_add_marking(fs_store): os.remove(marking_filepath) +def test_filesystem_store_add_sco(fs_store): + sco = stix2.v21.EmailAddress( + value="jdoe@example.com", + ) + + fs_store.add(sco) + sco_filepath = os.path.join( + FS_PATH, "email-addr", sco["id"] + ".json", + ) + + assert os.path.exists(sco_filepath) + + sco_r = fs_store.get(sco["id"]) + assert sco_r["id"] == sco["id"] + assert sco_r["value"] == sco["value"] + + os.remove(sco_filepath) + os.rmdir(os.path.dirname(sco_filepath)) + + def test_filesystem_object_with_custom_property(fs_store): camp = stix2.v21.Campaign( name="Scipio Africanus", @@ -1024,6 +1073,7 @@ def test_search_auth_set_black_empty(rel_fs_store): "attack-pattern", "campaign", "course-of-action", + "directory", "identity", "indicator", "intrusion-set",