pre-commit errors
parent
6f762e7ea0
commit
f4558c0958
|
@ -313,13 +313,14 @@ class FileSystemSource(DataSource):
|
||||||
if stix_obj["type"] == "bundle":
|
if stix_obj["type"] == "bundle":
|
||||||
stix_obj = stix_obj["objects"][0]
|
stix_obj = stix_obj["objects"][0]
|
||||||
|
|
||||||
# naive STIX check
|
# naive STIX type checking
|
||||||
stix_obj["type"]
|
stix_obj["type"]
|
||||||
stix_obj["id"]
|
stix_obj["id"]
|
||||||
|
|
||||||
except (ValueError, KeyError) as e: # likely not a JSON file
|
except (ValueError, KeyError): # likely not a JSON file
|
||||||
print("filesytem TypeError raised")
|
print("filesytem TypeError raised")
|
||||||
raise TypeError("STIX JSON object at '{0}' could either not be parsed to JSON or was not valid STIX JSON".format(os.path.join(root, file_)))
|
raise TypeError("STIX JSON object at '{0}' could either not be parsed to "
|
||||||
|
"JSON or was not valid STIX JSON".format(os.path.join(root, file_)))
|
||||||
|
|
||||||
# check against other filters, add if match
|
# check against other filters, add if match
|
||||||
all_data.extend(apply_common_filters([stix_obj], query))
|
all_data.extend(apply_common_filters([stix_obj], query))
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import os
|
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
@ -45,21 +45,23 @@ def fs_sink():
|
||||||
# remove campaign dir
|
# remove campaign dir
|
||||||
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
|
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def bad_json_files():
|
def bad_json_files():
|
||||||
# create erroneous JSON files for tests to make sure handled gracefully
|
# create erroneous JSON files for tests to make sure handled gracefully
|
||||||
|
|
||||||
with open(os.path.join(FS_PATH, "indicator", "indicator--test-non-json.txt"), "w") as f:
|
with open(os.path.join(FS_PATH, "indicator", "indicator--test-non-json.txt"), "w+") as f:
|
||||||
f.write("Im not a JSON file")
|
f.write("Im not a JSON file")
|
||||||
|
|
||||||
with open(os.path.join(FS_PATH, "indicator", "indicator--test-bad-json.json"), "w") as f:
|
with open(os.path.join(FS_PATH, "indicator", "indicator--test-bad-json.json"), "w+") as f:
|
||||||
f.write("Im not a JSON formatted file")
|
f.write("Im not a JSON formatted file")
|
||||||
|
|
||||||
yield True # dummy yield so can have teardown
|
yield True # dummy yield so can have teardown
|
||||||
|
|
||||||
os.remove(os.path.join(FS_PATH, "indicator", "indicator--test-non-json.txt"))
|
os.remove(os.path.join(FS_PATH, "indicator", "indicator--test-non-json.txt"))
|
||||||
os.remove(os.path.join(FS_PATH, "indicator", "indicator--test-bad-json.json"))
|
os.remove(os.path.join(FS_PATH, "indicator", "indicator--test-bad-json.json"))
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def bad_stix_files():
|
def bad_stix_files():
|
||||||
# create erroneous STIX JSON files for tests to make sure handled correctly
|
# create erroneous STIX JSON files for tests to make sure handled correctly
|
||||||
|
@ -71,10 +73,10 @@ def bad_stix_files():
|
||||||
# no "type" field
|
# no "type" field
|
||||||
}
|
}
|
||||||
|
|
||||||
with open(os.path.join(FS_PATH, "indicator", "indicator--test-non-stix.json"), "w") as f:
|
with open(os.path.join(FS_PATH, "indicator", "indicator--test-non-stix.json"), "w+") as f:
|
||||||
f.write(json.dumps(stix_obj))
|
f.write(json.dumps(stix_obj))
|
||||||
|
|
||||||
yield True # dummy yield so can have teardown
|
yield True # dummy yield so can have teardown
|
||||||
|
|
||||||
os.remove(os.path.join(FS_PATH, "indicator", "indicator--test-non-stix.json"))
|
os.remove(os.path.join(FS_PATH, "indicator", "indicator--test-non-stix.json"))
|
||||||
|
|
||||||
|
@ -115,7 +117,7 @@ def test_filesystem_source_bad_json_file(fs_source, bad_json_files):
|
||||||
# - one file should just be skipped (silently) as its a ".txt" extension
|
# - one file should just be skipped (silently) as its a ".txt" extension
|
||||||
# - one file should be parsed and raise Exception bc its not JSON
|
# - one file should be parsed and raise Exception bc its not JSON
|
||||||
try:
|
try:
|
||||||
bad_json_indicator = fs_source.get("indicator--test-bad-json")
|
fs_source.get("indicator--test-bad-json")
|
||||||
except TypeError as e:
|
except TypeError as e:
|
||||||
assert "indicator--test-bad-json" in str(e)
|
assert "indicator--test-bad-json" in str(e)
|
||||||
assert "could either not be parsed to JSON or was not valid STIX JSON" in str(e)
|
assert "could either not be parsed to JSON or was not valid STIX JSON" in str(e)
|
||||||
|
@ -124,7 +126,7 @@ def test_filesystem_source_bad_json_file(fs_source, bad_json_files):
|
||||||
def test_filesystem_source_bad_stix_file(fs_source, bad_stix_files):
|
def test_filesystem_source_bad_stix_file(fs_source, bad_stix_files):
|
||||||
# this tests handling of bad STIX json object
|
# this tests handling of bad STIX json object
|
||||||
try:
|
try:
|
||||||
bad_stix_indicator = fs_source.get("indicator--test-non-stix")
|
fs_source.get("indicator--test-non-stix")
|
||||||
except TypeError as e:
|
except TypeError as e:
|
||||||
assert "indicator--test-non-stix" in str(e)
|
assert "indicator--test-non-stix" in str(e)
|
||||||
assert "could either not be parsed to JSON or was not valid STIX JSON" in str(e)
|
assert "could either not be parsed to JSON or was not valid STIX JSON" in str(e)
|
||||||
|
|
Loading…
Reference in New Issue