diff --git a/stix2/datastore/filesystem.py b/stix2/datastore/filesystem.py index 26d0c58..b525932 100644 --- a/stix2/datastore/filesystem.py +++ b/stix2/datastore/filesystem.py @@ -301,11 +301,27 @@ class FileSystemSource(DataSource): for path in include_paths: for root, dirs, files in os.walk(path): for file_ in files: + if not file_.endswith(".json"): + # skip non '.json' files as more likely to be random non-STIX files + continue + if not id_ or id_ == file_.split(".")[0]: # have to load into memory regardless to evaluate other filters - stix_obj = json.load(open(os.path.join(root, file_))) - if stix_obj.get('type', '') == 'bundle': - stix_obj = stix_obj['objects'][0] + try: + stix_obj = json.load(open(os.path.join(root, file_))) + + if stix_obj["type"] == "bundle": + stix_obj = stix_obj["objects"][0] + + # naive STIX type checking + stix_obj["type"] + stix_obj["id"] + + except (ValueError, KeyError): # likely not a JSON file + print("filesytem TypeError raised") + raise TypeError("STIX JSON object at '{0}' could either not be parsed to " + "JSON or was not valid STIX JSON".format(os.path.join(root, file_))) + # check against other filters, add if match all_data.extend(apply_common_filters([stix_obj], query)) diff --git a/stix2/test/test_filesystem.py b/stix2/test/test_filesystem.py index 020fee5..f59136e 100644 --- a/stix2/test/test_filesystem.py +++ b/stix2/test/test_filesystem.py @@ -1,3 +1,4 @@ +import json import os import shutil @@ -45,6 +46,41 @@ def fs_sink(): shutil.rmtree(os.path.join(FS_PATH, "campaign"), True) +@pytest.fixture +def bad_json_files(): + # create erroneous JSON files for tests to make sure handled gracefully + + with open(os.path.join(FS_PATH, "intrusion-set", "intrusion-set--test-non-json.txt"), "w+") as f: + f.write("Im not a JSON file") + + with open(os.path.join(FS_PATH, "intrusion-set", "intrusion-set--test-bad-json.json"), "w+") as f: + f.write("Im not a JSON formatted file") + + yield True # dummy yield so can have teardown + + os.remove(os.path.join(FS_PATH, "intrusion-set", "intrusion-set--test-non-json.txt")) + os.remove(os.path.join(FS_PATH, "intrusion-set", "intrusion-set--test-bad-json.json")) + + +@pytest.fixture +def bad_stix_files(): + # create erroneous STIX JSON files for tests to make sure handled correctly + + # bad STIX object + stix_obj = { + "id": "intrusion-set--test-bad-stix", + "spec_version": "2.0" + # no "type" field + } + + with open(os.path.join(FS_PATH, "intrusion-set", "intrusion-set--test-non-stix.json"), "w+") as f: + f.write(json.dumps(stix_obj)) + + yield True # dummy yield so can have teardown + + os.remove(os.path.join(FS_PATH, "intrusion-set", "intrusion-set--test-non-stix.json")) + + @pytest.fixture(scope='module') def rel_fs_store(): cam = Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS) @@ -76,6 +112,26 @@ def test_filesystem_sink_nonexistent_folder(): assert "for STIX data does not exist" in str(excinfo) +def test_filesystem_source_bad_json_file(fs_source, bad_json_files): + # this tests the handling of two bad json files + # - one file should just be skipped (silently) as its a ".txt" extension + # - one file should be parsed and raise Exception bc its not JSON + try: + fs_source.get("intrusion-set--test-bad-json") + except TypeError as e: + assert "intrusion-set--test-bad-json" in str(e) + assert "could either not be parsed to JSON or was not valid STIX JSON" in str(e) + + +def test_filesystem_source_bad_stix_file(fs_source, bad_stix_files): + # this tests handling of bad STIX json object + try: + fs_source.get("intrusion-set--test-non-stix") + except TypeError as e: + assert "intrusion-set--test-non-stix" in str(e) + assert "could either not be parsed to JSON or was not valid STIX JSON" in str(e) + + def test_filesytem_source_get_object(fs_source): # get object mal = fs_source.get("malware--6b616fc1-1505-48e3-8b2c-0d19337bff38")