commit
						3c2d877bc3
					
				|  | @ -301,11 +301,27 @@ class FileSystemSource(DataSource): | |||
|         for path in include_paths: | ||||
|             for root, dirs, files in os.walk(path): | ||||
|                 for file_ in files: | ||||
|                     if not file_.endswith(".json"): | ||||
|                         # skip non '.json' files as more likely to be random non-STIX files | ||||
|                         continue | ||||
| 
 | ||||
|                     if not id_ or id_ == file_.split(".")[0]: | ||||
|                         # have to load into memory regardless to evaluate other filters | ||||
|                         stix_obj = json.load(open(os.path.join(root, file_))) | ||||
|                         if stix_obj.get('type', '') == 'bundle': | ||||
|                             stix_obj = stix_obj['objects'][0] | ||||
|                         try: | ||||
|                             stix_obj = json.load(open(os.path.join(root, file_))) | ||||
| 
 | ||||
|                             if stix_obj["type"] == "bundle": | ||||
|                                 stix_obj = stix_obj["objects"][0] | ||||
| 
 | ||||
|                             # naive STIX type checking | ||||
|                             stix_obj["type"] | ||||
|                             stix_obj["id"] | ||||
| 
 | ||||
|                         except (ValueError, KeyError):  # likely not a JSON file | ||||
|                             print("filesytem TypeError raised") | ||||
|                             raise TypeError("STIX JSON object at '{0}' could either not be parsed to " | ||||
|                                             "JSON or was not valid STIX JSON".format(os.path.join(root, file_))) | ||||
| 
 | ||||
|                         # check against other filters, add if match | ||||
|                         all_data.extend(apply_common_filters([stix_obj], query)) | ||||
| 
 | ||||
|  |  | |||
|  | @ -1,3 +1,4 @@ | |||
| import json | ||||
| import os | ||||
| import shutil | ||||
| 
 | ||||
|  | @ -45,6 +46,41 @@ def fs_sink(): | |||
|     shutil.rmtree(os.path.join(FS_PATH, "campaign"), True) | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture | ||||
| def bad_json_files(): | ||||
|     # create erroneous JSON files for tests to make sure handled gracefully | ||||
| 
 | ||||
|     with open(os.path.join(FS_PATH, "intrusion-set", "intrusion-set--test-non-json.txt"), "w+") as f: | ||||
|         f.write("Im not a JSON file") | ||||
| 
 | ||||
|     with open(os.path.join(FS_PATH, "intrusion-set", "intrusion-set--test-bad-json.json"), "w+") as f: | ||||
|         f.write("Im not a JSON formatted file") | ||||
| 
 | ||||
|     yield True  # dummy yield so can have teardown | ||||
| 
 | ||||
|     os.remove(os.path.join(FS_PATH, "intrusion-set", "intrusion-set--test-non-json.txt")) | ||||
|     os.remove(os.path.join(FS_PATH, "intrusion-set", "intrusion-set--test-bad-json.json")) | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture | ||||
| def bad_stix_files(): | ||||
|     # create erroneous STIX JSON files for tests to make sure handled correctly | ||||
| 
 | ||||
|     # bad STIX object | ||||
|     stix_obj = { | ||||
|         "id": "intrusion-set--test-bad-stix", | ||||
|         "spec_version": "2.0" | ||||
|         # no "type" field | ||||
|     } | ||||
| 
 | ||||
|     with open(os.path.join(FS_PATH, "intrusion-set", "intrusion-set--test-non-stix.json"), "w+") as f: | ||||
|         f.write(json.dumps(stix_obj)) | ||||
| 
 | ||||
|     yield True  # dummy yield so can have teardown | ||||
| 
 | ||||
|     os.remove(os.path.join(FS_PATH, "intrusion-set", "intrusion-set--test-non-stix.json")) | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture(scope='module') | ||||
| def rel_fs_store(): | ||||
|     cam = Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS) | ||||
|  | @ -76,6 +112,26 @@ def test_filesystem_sink_nonexistent_folder(): | |||
|     assert "for STIX data does not exist" in str(excinfo) | ||||
| 
 | ||||
| 
 | ||||
| def test_filesystem_source_bad_json_file(fs_source, bad_json_files): | ||||
|     # this tests the handling of two bad json files | ||||
|     #  - one file should just be skipped (silently) as its a ".txt" extension | ||||
|     #  - one file should be parsed and raise Exception bc its not JSON | ||||
|     try: | ||||
|         fs_source.get("intrusion-set--test-bad-json") | ||||
|     except TypeError as e: | ||||
|         assert "intrusion-set--test-bad-json" in str(e) | ||||
|         assert "could either not be parsed to JSON or was not valid STIX JSON" in str(e) | ||||
| 
 | ||||
| 
 | ||||
| def test_filesystem_source_bad_stix_file(fs_source, bad_stix_files): | ||||
|     # this tests handling of bad STIX json object | ||||
|     try: | ||||
|         fs_source.get("intrusion-set--test-non-stix") | ||||
|     except TypeError as e: | ||||
|         assert "intrusion-set--test-non-stix" in str(e) | ||||
|         assert "could either not be parsed to JSON or was not valid STIX JSON" in str(e) | ||||
| 
 | ||||
| 
 | ||||
| def test_filesytem_source_get_object(fs_source): | ||||
|     # get object | ||||
|     mal = fs_source.get("malware--6b616fc1-1505-48e3-8b2c-0d19337bff38") | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Greg Back
						Greg Back