pre-commit formatting changes

master
Emmanuelle Vargas-Gonzalez 2018-12-06 15:19:50 -05:00
parent 01df0ccc57
commit 96b81fc489
4 changed files with 274 additions and 188 deletions

View File

@ -91,7 +91,7 @@ class AuthSet(object):
def __repr__(self):
return "{}list: {}".format(
"white" if self.auth_type == AuthSet.WHITE else "black",
self.values
self.values,
)
@ -167,15 +167,19 @@ def _find_search_optimizations(filters):
# An "allow" ID filter implies a type filter too, since IDs
# contain types within them.
allowed_ids = _update_allow(allowed_ids, filter_.value)
allowed_types = _update_allow(allowed_types,
get_type_from_id(filter_.value))
allowed_types = _update_allow(
allowed_types,
get_type_from_id(filter_.value),
)
elif filter_.op == "!=":
prohibited_ids.add(filter_.value)
elif filter_.op == "in":
allowed_ids = _update_allow(allowed_ids, filter_.value)
allowed_types = _update_allow(allowed_types, (
get_type_from_id(id_) for id_ in filter_.value
))
allowed_types = _update_allow(
allowed_types, (
get_type_from_id(id_) for id_ in filter_.value
),
)
opt_types = AuthSet(allowed_types, prohibited_types)
opt_ids = AuthSet(allowed_ids, prohibited_ids)
@ -311,7 +315,7 @@ def _check_object_from_file(query, filepath, allow_custom, version):
except ValueError: # not a JSON file
raise TypeError(
"STIX JSON object at '{0}' could either not be parsed "
"to JSON or was not valid STIX JSON".format(filepath)
"to JSON or was not valid STIX JSON".format(filepath),
)
stix_obj = parse(stix_json, allow_custom, version)
@ -352,22 +356,28 @@ def _search_versioned(query, type_path, auth_ids, allow_custom, version):
"""
results = []
id_dirs = _get_matching_dir_entries(type_path, auth_ids,
stat.S_ISDIR)
id_dirs = _get_matching_dir_entries(
type_path, auth_ids,
stat.S_ISDIR,
)
for id_dir in id_dirs:
id_path = os.path.join(type_path, id_dir)
# This leverages a more sophisticated function to do a simple thing:
# get all the JSON files from a directory. I guess it does give us
# file type checking, ensuring we only get regular files.
version_files = _get_matching_dir_entries(id_path, _AUTHSET_ANY,
stat.S_ISREG, ".json")
version_files = _get_matching_dir_entries(
id_path, _AUTHSET_ANY,
stat.S_ISREG, ".json",
)
for version_file in version_files:
version_path = os.path.join(id_path, version_file)
try:
stix_obj = _check_object_from_file(query, version_path,
allow_custom, version)
stix_obj = _check_object_from_file(
query, version_path,
allow_custom, version,
)
if stix_obj:
results.append(stix_obj)
except IOError as e:
@ -377,14 +387,18 @@ def _search_versioned(query, type_path, auth_ids, allow_custom, version):
# For backward-compatibility, also search for plain files named after
# object IDs, in the type directory.
id_files = _get_matching_dir_entries(type_path, auth_ids, stat.S_ISREG,
".json")
id_files = _get_matching_dir_entries(
type_path, auth_ids, stat.S_ISREG,
".json",
)
for id_file in id_files:
id_path = os.path.join(type_path, id_file)
try:
stix_obj = _check_object_from_file(query, id_path, allow_custom,
version)
stix_obj = _check_object_from_file(
query, id_path, allow_custom,
version,
)
if stix_obj:
results.append(stix_obj)
except IOError as e:
@ -421,14 +435,18 @@ def _search_markings(query, markings_path, auth_ids, allow_custom, version):
"""
results = []
id_files = _get_matching_dir_entries(markings_path, auth_ids, stat.S_ISREG,
".json")
id_files = _get_matching_dir_entries(
markings_path, auth_ids, stat.S_ISREG,
".json",
)
for id_file in id_files:
id_path = os.path.join(markings_path, id_file)
try:
stix_obj = _check_object_from_file(query, id_path, allow_custom,
version)
stix_obj = _check_object_from_file(
query, id_path, allow_custom,
version,
)
if stix_obj:
results.append(stix_obj)
except IOError as e:
@ -569,7 +587,7 @@ class FileSystemSink(DataSink):
raise TypeError(
"stix_data must be a STIX object (or list of), "
"JSON formatted STIX (or list of), "
"or a JSON formatted STIX bundle"
"or a JSON formatted STIX bundle",
)
@ -683,16 +701,22 @@ class FileSystemSource(DataSource):
query.add(_composite_filters)
auth_types, auth_ids = _find_search_optimizations(query)
type_dirs = _get_matching_dir_entries(self._stix_dir, auth_types,
stat.S_ISDIR)
type_dirs = _get_matching_dir_entries(
self._stix_dir, auth_types,
stat.S_ISDIR,
)
for type_dir in type_dirs:
type_path = os.path.join(self._stix_dir, type_dir)
if type_dir == "marking-definition":
type_results = _search_markings(query, type_path, auth_ids,
self.allow_custom, version)
type_results = _search_markings(
query, type_path, auth_ids,
self.allow_custom, version,
)
else:
type_results = _search_versioned(query, type_path, auth_ids,
self.allow_custom, version)
type_results = _search_versioned(
query, type_path, auth_ids,
self.allow_custom, version,
)
all_data.extend(type_results)
return all_data

View File

@ -11,8 +11,10 @@ import stix2.utils
FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=', 'contains']
"""Supported filter value types"""
FILTER_VALUE_TYPES = (bool, dict, float, int, list, tuple, six.string_types,
datetime)
FILTER_VALUE_TYPES = (
bool, dict, float, int, list, tuple, six.string_types,
datetime,
)
def _check_filter_components(prop, op, value):

View File

@ -9,9 +9,10 @@ import pytest
import pytz
import stix2
from stix2.datastore.filesystem import (AuthSet, _find_search_optimizations,
_get_matching_dir_entries,
_timestamp2filename)
from stix2.datastore.filesystem import (
AuthSet, _find_search_optimizations, _get_matching_dir_entries,
_timestamp2filename,
)
from stix2.exceptions import STIXError
from .constants import (
@ -104,8 +105,10 @@ def rel_fs_store():
yield fs
for o in stix_objs:
filepath = os.path.join(FS_PATH, o.type, o.id,
_timestamp2filename(o.modified) + '.json')
filepath = os.path.join(
FS_PATH, o.type, o.id,
_timestamp2filename(o.modified) + '.json',
)
# Some test-scoped fixtures (e.g. fs_store) delete all campaigns, so by
# the time this module-scoped fixture tears itself down, it may find
@ -156,8 +159,10 @@ def test_filesystem_source_get_object(fs_source):
mal = fs_source.get("malware--6b616fc1-1505-48e3-8b2c-0d19337bff38")
assert mal.id == "malware--6b616fc1-1505-48e3-8b2c-0d19337bff38"
assert mal.name == "Rover"
assert mal.modified == datetime.datetime(2018, 11, 16, 22, 54, 20, 390000,
pytz.utc)
assert mal.modified == datetime.datetime(
2018, 11, 16, 22, 54, 20, 390000,
pytz.utc,
)
def test_filesystem_source_get_nonexistent_object(fs_source):
@ -167,11 +172,13 @@ def test_filesystem_source_get_nonexistent_object(fs_source):
def test_filesystem_source_all_versions(fs_source):
ids = fs_source.all_versions(
"identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5"
"identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5",
)
assert len(ids) == 2
assert all(id_.id == "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5"
for id_ in ids)
assert all(
id_.id == "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5"
for id_ in ids
)
assert all(id_.name == "The MITRE Corporation" for id_ in ids)
assert all(id_.type == "identity" for id_ in ids)
@ -205,7 +212,7 @@ def test_filesystem_source_backward_compatible(fs_source):
results = fs_source.query([
stix2.Filter("type", "=", "malware"),
stix2.Filter("id", "=", "malware--6b616fc1-1505-48e3-8b2c-0d19337bff38"),
stix2.Filter("modified", "=", modified)
stix2.Filter("modified", "=", modified),
])
assert len(results) == 1
@ -226,8 +233,10 @@ def test_filesystem_sink_add_python_stix_object(fs_sink, fs_source):
fs_sink.add(camp1)
filepath = os.path.join(FS_PATH, "campaign", camp1.id,
_timestamp2filename(camp1.modified) + ".json")
filepath = os.path.join(
FS_PATH, "campaign", camp1.id,
_timestamp2filename(camp1.modified) + ".json",
)
assert os.path.exists(filepath)
camp1_r = fs_source.get(camp1.id)
@ -247,7 +256,7 @@ def test_filesystem_sink_add_stix_object_dict(fs_sink, fs_source):
"aliases": ["Purple Robes"],
"id": "campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
"created": "2017-05-31T21:31:53.197755Z",
"modified": "2017-05-31T21:31:53.197755Z"
"modified": "2017-05-31T21:31:53.197755Z",
}
fs_sink.add(camp2)
@ -259,8 +268,10 @@ def test_filesystem_sink_add_stix_object_dict(fs_sink, fs_source):
# constraint (e.g. truncate to milliseconds), which results in a slightly
# different name.
camp2obj = stix2.parse(camp2)
filepath = os.path.join(FS_PATH, "campaign", camp2obj["id"],
_timestamp2filename(camp2obj["modified"]) + ".json")
filepath = os.path.join(
FS_PATH, "campaign", camp2obj["id"],
_timestamp2filename(camp2obj["modified"]) + ".json",
)
assert os.path.exists(filepath)
@ -286,16 +297,18 @@ def test_filesystem_sink_add_stix_bundle_dict(fs_sink, fs_source):
"aliases": ["Huns"],
"id": "campaign--b8f86161-ccae-49de-973a-4ca320c62478",
"created": "2017-05-31T21:31:53.197755Z",
"modified": "2017-05-31T21:31:53.197755Z"
}
]
"modified": "2017-05-31T21:31:53.197755Z",
},
],
}
fs_sink.add(bund)
camp_obj = stix2.parse(bund["objects"][0])
filepath = os.path.join(FS_PATH, "campaign", camp_obj["id"],
_timestamp2filename(camp_obj["modified"]) + ".json")
filepath = os.path.join(
FS_PATH, "campaign", camp_obj["id"],
_timestamp2filename(camp_obj["modified"]) + ".json",
)
assert os.path.exists(filepath)
@ -317,9 +330,11 @@ def test_filesystem_sink_add_json_stix_object(fs_sink, fs_source):
fs_sink.add(camp4)
camp4obj = stix2.parse(camp4)
filepath = os.path.join(FS_PATH, "campaign",
"campaign--6a6ca372-ba07-42cc-81ef-9840fc1f963d",
_timestamp2filename(camp4obj["modified"]) + ".json")
filepath = os.path.join(
FS_PATH, "campaign",
"campaign--6a6ca372-ba07-42cc-81ef-9840fc1f963d",
_timestamp2filename(camp4obj["modified"]) + ".json",
)
assert os.path.exists(filepath)
@ -342,9 +357,11 @@ def test_filesystem_sink_json_stix_bundle(fs_sink, fs_source):
bund2obj = stix2.parse(bund2)
camp_obj = bund2obj["objects"][0]
filepath = os.path.join(FS_PATH, "campaign",
"campaign--2c03b8bf-82ee-433e-9918-ca2cb6e9534b",
_timestamp2filename(camp_obj["modified"]) + ".json")
filepath = os.path.join(
FS_PATH, "campaign",
"campaign--2c03b8bf-82ee-433e-9918-ca2cb6e9534b",
_timestamp2filename(camp_obj["modified"]) + ".json",
)
assert os.path.exists(filepath)
@ -370,19 +387,22 @@ def test_filesystem_sink_add_objects_list(fs_sink, fs_source):
"aliases": ["The Frenchmen"],
"id": "campaign--122818b6-1112-4fb0-b11b-b111107ca70a",
"created": "2017-05-31T21:31:53.197755Z",
"modified": "2017-05-31T21:31:53.197755Z"
"modified": "2017-05-31T21:31:53.197755Z",
}
fs_sink.add([camp6, camp7])
camp7obj = stix2.parse(camp7)
camp6filepath = os.path.join(FS_PATH, "campaign", camp6.id,
_timestamp2filename(camp6["modified"]) +
".json")
camp6filepath = os.path.join(
FS_PATH, "campaign", camp6.id,
_timestamp2filename(camp6["modified"]) +
".json",
)
camp7filepath = os.path.join(
FS_PATH, "campaign", "campaign--122818b6-1112-4fb0-b11b-b111107ca70a",
_timestamp2filename(camp7obj["modified"]) + ".json")
_timestamp2filename(camp7obj["modified"]) + ".json",
)
assert os.path.exists(camp6filepath)
assert os.path.exists(camp7filepath)
@ -403,12 +423,12 @@ def test_filesystem_sink_add_objects_list(fs_sink, fs_source):
def test_filesystem_sink_marking(fs_sink):
marking = stix2.v20.MarkingDefinition(
definition_type="tlp",
definition=stix2.v20.TLPMarking(tlp="green")
definition=stix2.v20.TLPMarking(tlp="green"),
)
fs_sink.add(marking)
marking_filepath = os.path.join(
FS_PATH, "marking-definition", marking["id"] + ".json"
FS_PATH, "marking-definition", marking["id"] + ".json",
)
assert os.path.exists(marking_filepath)
@ -484,8 +504,10 @@ def test_filesystem_store_add(fs_store):
assert camp1_r.id == camp1.id
assert camp1_r.name == camp1.name
filepath = os.path.join(FS_PATH, "campaign", camp1_r.id,
_timestamp2filename(camp1_r.modified) + ".json")
filepath = os.path.join(
FS_PATH, "campaign", camp1_r.id,
_timestamp2filename(camp1_r.modified) + ".json",
)
# remove
os.remove(filepath)
@ -501,8 +523,10 @@ def test_filesystem_store_add_as_bundle():
)
fs_store.add(camp1)
filepath = os.path.join(FS_PATH, "campaign", camp1.id,
_timestamp2filename(camp1.modified) + ".json")
filepath = os.path.join(
FS_PATH, "campaign", camp1.id,
_timestamp2filename(camp1.modified) + ".json",
)
with open(filepath) as bundle_file:
assert '"type": "bundle"' in bundle_file.read()
@ -532,12 +556,12 @@ def test_filesystem_store_add_invalid_object(fs_store):
def test_filesystem_store_add_marking(fs_store):
marking = stix2.v20.MarkingDefinition(
definition_type="tlp",
definition=stix2.v20.TLPMarking(tlp="green")
definition=stix2.v20.TLPMarking(tlp="green"),
)
fs_store.add(marking)
marking_filepath = os.path.join(
FS_PATH, "marking-definition", marking["id"] + ".json"
FS_PATH, "marking-definition", marking["id"] + ".json",
)
assert os.path.exists(marking_filepath)
@ -702,7 +726,7 @@ def test_auth_set_black1():
def test_optimize_types1():
filters = [
stix2.Filter("type", "=", "foo")
stix2.Filter("type", "=", "foo"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -716,7 +740,7 @@ def test_optimize_types1():
def test_optimize_types2():
filters = [
stix2.Filter("type", "=", "foo"),
stix2.Filter("type", "=", "bar")
stix2.Filter("type", "=", "bar"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -730,7 +754,7 @@ def test_optimize_types2():
def test_optimize_types3():
filters = [
stix2.Filter("type", "in", ["A", "B", "C"]),
stix2.Filter("type", "in", ["B", "C", "D"])
stix2.Filter("type", "in", ["B", "C", "D"]),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -744,7 +768,7 @@ def test_optimize_types3():
def test_optimize_types4():
filters = [
stix2.Filter("type", "in", ["A", "B", "C"]),
stix2.Filter("type", "in", ["D", "E", "F"])
stix2.Filter("type", "in", ["D", "E", "F"]),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -758,7 +782,7 @@ def test_optimize_types4():
def test_optimize_types5():
filters = [
stix2.Filter("type", "in", ["foo", "bar"]),
stix2.Filter("type", "!=", "bar")
stix2.Filter("type", "!=", "bar"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -772,7 +796,7 @@ def test_optimize_types5():
def test_optimize_types6():
filters = [
stix2.Filter("type", "!=", "foo"),
stix2.Filter("type", "!=", "bar")
stix2.Filter("type", "!=", "bar"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -786,7 +810,7 @@ def test_optimize_types6():
def test_optimize_types7():
filters = [
stix2.Filter("type", "=", "foo"),
stix2.Filter("type", "!=", "foo")
stix2.Filter("type", "!=", "foo"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -811,7 +835,7 @@ def test_optimize_types8():
def test_optimize_types_ids1():
filters = [
stix2.Filter("type", "in", ["foo", "bar"]),
stix2.Filter("id", "=", "foo--00000000-0000-0000-0000-000000000000")
stix2.Filter("id", "=", "foo--00000000-0000-0000-0000-000000000000"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -825,7 +849,7 @@ def test_optimize_types_ids1():
def test_optimize_types_ids2():
filters = [
stix2.Filter("type", "=", "foo"),
stix2.Filter("id", "=", "bar--00000000-0000-0000-0000-000000000000")
stix2.Filter("id", "=", "bar--00000000-0000-0000-0000-000000000000"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -839,7 +863,7 @@ def test_optimize_types_ids2():
def test_optimize_types_ids3():
filters = [
stix2.Filter("type", "in", ["foo", "bar"]),
stix2.Filter("id", "!=", "bar--00000000-0000-0000-0000-000000000000")
stix2.Filter("id", "!=", "bar--00000000-0000-0000-0000-000000000000"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -853,11 +877,13 @@ def test_optimize_types_ids3():
def test_optimize_types_ids4():
filters = [
stix2.Filter("type", "in", ["A", "B", "C"]),
stix2.Filter("id", "in", [
"B--00000000-0000-0000-0000-000000000000",
"C--00000000-0000-0000-0000-000000000000",
"D--00000000-0000-0000-0000-000000000000",
])
stix2.Filter(
"id", "in", [
"B--00000000-0000-0000-0000-000000000000",
"C--00000000-0000-0000-0000-000000000000",
"D--00000000-0000-0000-0000-000000000000",
],
),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -867,7 +893,7 @@ def test_optimize_types_ids4():
assert auth_ids.auth_type == AuthSet.WHITE
assert auth_ids.values == {
"B--00000000-0000-0000-0000-000000000000",
"C--00000000-0000-0000-0000-000000000000"
"C--00000000-0000-0000-0000-000000000000",
}
@ -875,12 +901,14 @@ def test_optimize_types_ids5():
filters = [
stix2.Filter("type", "in", ["A", "B", "C"]),
stix2.Filter("type", "!=", "C"),
stix2.Filter("id", "in", [
"B--00000000-0000-0000-0000-000000000000",
"C--00000000-0000-0000-0000-000000000000",
"D--00000000-0000-0000-0000-000000000000"
]),
stix2.Filter("id", "!=", "D--00000000-0000-0000-0000-000000000000")
stix2.Filter(
"id", "in", [
"B--00000000-0000-0000-0000-000000000000",
"C--00000000-0000-0000-0000-000000000000",
"D--00000000-0000-0000-0000-000000000000",
],
),
stix2.Filter("id", "!=", "D--00000000-0000-0000-0000-000000000000"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -893,7 +921,7 @@ def test_optimize_types_ids5():
def test_optimize_types_ids6():
filters = [
stix2.Filter("id", "=", "A--00000000-0000-0000-0000-000000000000")
stix2.Filter("id", "=", "A--00000000-0000-0000-0000-000000000000"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -907,7 +935,7 @@ def test_optimize_types_ids6():
def test_search_auth_set_white1():
auth_set = AuthSet(
{"attack-pattern", "doesntexist"},
set()
set(),
)
results = _get_matching_dir_entries(FS_PATH, auth_set, stat.S_ISDIR)
@ -921,19 +949,19 @@ def test_search_auth_set_white2():
auth_set = AuthSet(
{
"malware--6b616fc1-1505-48e3-8b2c-0d19337bff38",
"malware--92ec0cbd-2c30-44a2-b270-73f4ec949841"
"malware--92ec0cbd-2c30-44a2-b270-73f4ec949841",
},
{
"malware--92ec0cbd-2c30-44a2-b270-73f4ec949841",
"malware--96b08451-b27a-4ff6-893f-790e26393a8e",
"doesntexist"
}
"doesntexist",
},
)
results = _get_matching_dir_entries(
os.path.join(FS_PATH, "malware"),
auth_set, stat.S_ISDIR
auth_set, stat.S_ISDIR,
)
assert results == ["malware--6b616fc1-1505-48e3-8b2c-0d19337bff38"]
@ -943,9 +971,11 @@ def test_search_auth_set_white3():
auth_set = AuthSet({"20170531213258226477", "doesntexist"}, set())
results = _get_matching_dir_entries(
os.path.join(FS_PATH, "malware",
"malware--6b616fc1-1505-48e3-8b2c-0d19337bff38"),
auth_set, stat.S_ISREG, ".json"
os.path.join(
FS_PATH, "malware",
"malware--6b616fc1-1505-48e3-8b2c-0d19337bff38",
),
auth_set, stat.S_ISREG, ".json",
)
assert results == ["20170531213258226477.json"]
@ -954,23 +984,23 @@ def test_search_auth_set_white3():
def test_search_auth_set_black1():
auth_set = AuthSet(
None,
{"tool--242f3da3-4425-4d11-8f5c-b842886da966", "doesntexist"}
{"tool--242f3da3-4425-4d11-8f5c-b842886da966", "doesntexist"},
)
results = _get_matching_dir_entries(
os.path.join(FS_PATH, "tool"),
auth_set, stat.S_ISDIR
auth_set, stat.S_ISDIR,
)
assert set(results) == {
"tool--03342581-f790-4f03-ba41-e82e67392e23"
"tool--03342581-f790-4f03-ba41-e82e67392e23",
}
def test_search_auth_set_white_empty():
auth_set = AuthSet(
set(),
set()
set(),
)
results = _get_matching_dir_entries(FS_PATH, auth_set, stat.S_ISDIR)
@ -983,7 +1013,7 @@ def test_search_auth_set_black_empty(rel_fs_store):
# predictable (it adds "campaign").
auth_set = AuthSet(
None,
set()
set(),
)
results = _get_matching_dir_entries(FS_PATH, auth_set, stat.S_ISDIR)
@ -999,14 +1029,14 @@ def test_search_auth_set_black_empty(rel_fs_store):
"malware",
"marking-definition",
"relationship",
"tool"
"tool",
}
def test_timestamp2filename_naive():
dt = datetime.datetime(
2010, 6, 15,
8, 30, 10, 1234
8, 30, 10, 1234,
)
filename = _timestamp2filename(dt)
@ -1019,7 +1049,7 @@ def test_timestamp2filename_tz():
dt = datetime.datetime(
2010, 6, 15,
7, 30, 10, 1234,
tz
tz,
)
filename = _timestamp2filename(dt)

View File

@ -9,9 +9,10 @@ import pytest
import pytz
import stix2
from stix2.datastore.filesystem import (AuthSet, _find_search_optimizations,
_get_matching_dir_entries,
_timestamp2filename)
from stix2.datastore.filesystem import (
AuthSet, _find_search_optimizations, _get_matching_dir_entries,
_timestamp2filename,
)
from stix2.exceptions import STIXError
from .constants import (
@ -104,8 +105,10 @@ def rel_fs_store():
yield fs
for o in stix_objs:
filepath = os.path.join(FS_PATH, o.type, o.id,
_timestamp2filename(o.modified) + '.json')
filepath = os.path.join(
FS_PATH, o.type, o.id,
_timestamp2filename(o.modified) + '.json',
)
# Some test-scoped fixtures (e.g. fs_store) delete all campaigns, so by
# the time this module-scoped fixture tears itself down, it may find
@ -156,8 +159,10 @@ def test_filesystem_source_get_object(fs_source):
mal = fs_source.get("malware--6b616fc1-1505-48e3-8b2c-0d19337bff38")
assert mal.id == "malware--6b616fc1-1505-48e3-8b2c-0d19337bff38"
assert mal.name == "Rover"
assert mal.modified == datetime.datetime(2018, 11, 16, 22, 54, 20, 390000,
pytz.utc)
assert mal.modified == datetime.datetime(
2018, 11, 16, 22, 54, 20, 390000,
pytz.utc,
)
def test_filesystem_source_get_nonexistent_object(fs_source):
@ -167,11 +172,13 @@ def test_filesystem_source_get_nonexistent_object(fs_source):
def test_filesystem_source_all_versions(fs_source):
ids = fs_source.all_versions(
"identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5"
"identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5",
)
assert len(ids) == 2
assert all(id_.id == "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5"
for id_ in ids)
assert all(
id_.id == "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5"
for id_ in ids
)
assert all(id_.name == "The MITRE Corporation" for id_ in ids)
assert all(id_.type == "identity" for id_ in ids)
@ -205,7 +212,7 @@ def test_filesystem_source_backward_compatible(fs_source):
results = fs_source.query([
stix2.Filter("type", "=", "malware"),
stix2.Filter("id", "=", "malware--6b616fc1-1505-48e3-8b2c-0d19337bff38"),
stix2.Filter("modified", "=", modified)
stix2.Filter("modified", "=", modified),
])
assert len(results) == 1
@ -226,8 +233,10 @@ def test_filesystem_sink_add_python_stix_object(fs_sink, fs_source):
fs_sink.add(camp1)
filepath = os.path.join(FS_PATH, "campaign", camp1.id,
_timestamp2filename(camp1.modified) + ".json")
filepath = os.path.join(
FS_PATH, "campaign", camp1.id,
_timestamp2filename(camp1.modified) + ".json",
)
assert os.path.exists(filepath)
camp1_r = fs_source.get(camp1.id)
@ -247,7 +256,7 @@ def test_filesystem_sink_add_stix_object_dict(fs_sink, fs_source):
"aliases": ["Purple Robes"],
"id": "campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
"created": "2017-05-31T21:31:53.197755Z",
"modified": "2017-05-31T21:31:53.197755Z"
"modified": "2017-05-31T21:31:53.197755Z",
}
fs_sink.add(camp2)
@ -259,8 +268,10 @@ def test_filesystem_sink_add_stix_object_dict(fs_sink, fs_source):
# constraint (e.g. truncate to milliseconds), which results in a slightly
# different name.
camp2obj = stix2.parse(camp2)
filepath = os.path.join(FS_PATH, "campaign", camp2obj["id"],
_timestamp2filename(camp2obj["modified"]) + ".json")
filepath = os.path.join(
FS_PATH, "campaign", camp2obj["id"],
_timestamp2filename(camp2obj["modified"]) + ".json",
)
assert os.path.exists(filepath)
@ -285,16 +296,18 @@ def test_filesystem_sink_add_stix_bundle_dict(fs_sink, fs_source):
"aliases": ["Huns"],
"id": "campaign--b8f86161-ccae-49de-973a-4ca320c62478",
"created": "2017-05-31T21:31:53.197755Z",
"modified": "2017-05-31T21:31:53.197755Z"
}
]
"modified": "2017-05-31T21:31:53.197755Z",
},
],
}
fs_sink.add(bund)
camp_obj = stix2.parse(bund["objects"][0])
filepath = os.path.join(FS_PATH, "campaign", camp_obj["id"],
_timestamp2filename(camp_obj["modified"]) + ".json")
filepath = os.path.join(
FS_PATH, "campaign", camp_obj["id"],
_timestamp2filename(camp_obj["modified"]) + ".json",
)
assert os.path.exists(filepath)
@ -316,9 +329,11 @@ def test_filesystem_sink_add_json_stix_object(fs_sink, fs_source):
fs_sink.add(camp4)
camp4obj = stix2.parse(camp4)
filepath = os.path.join(FS_PATH, "campaign",
"campaign--6a6ca372-ba07-42cc-81ef-9840fc1f963d",
_timestamp2filename(camp4obj["modified"]) + ".json")
filepath = os.path.join(
FS_PATH, "campaign",
"campaign--6a6ca372-ba07-42cc-81ef-9840fc1f963d",
_timestamp2filename(camp4obj["modified"]) + ".json",
)
assert os.path.exists(filepath)
@ -341,9 +356,11 @@ def test_filesystem_sink_json_stix_bundle(fs_sink, fs_source):
bund2obj = stix2.parse(bund2)
camp_obj = bund2obj["objects"][0]
filepath = os.path.join(FS_PATH, "campaign",
"campaign--2c03b8bf-82ee-433e-9918-ca2cb6e9534b",
_timestamp2filename(camp_obj["modified"]) + ".json")
filepath = os.path.join(
FS_PATH, "campaign",
"campaign--2c03b8bf-82ee-433e-9918-ca2cb6e9534b",
_timestamp2filename(camp_obj["modified"]) + ".json",
)
assert os.path.exists(filepath)
@ -369,19 +386,22 @@ def test_filesystem_sink_add_objects_list(fs_sink, fs_source):
"aliases": ["The Frenchmen"],
"id": "campaign--122818b6-1112-4fb0-b11b-b111107ca70a",
"created": "2017-05-31T21:31:53.197755Z",
"modified": "2017-05-31T21:31:53.197755Z"
"modified": "2017-05-31T21:31:53.197755Z",
}
fs_sink.add([camp6, camp7])
camp7obj = stix2.parse(camp7)
camp6filepath = os.path.join(FS_PATH, "campaign", camp6.id,
_timestamp2filename(camp6["modified"]) +
".json")
camp6filepath = os.path.join(
FS_PATH, "campaign", camp6.id,
_timestamp2filename(camp6["modified"]) +
".json",
)
camp7filepath = os.path.join(
FS_PATH, "campaign", "campaign--122818b6-1112-4fb0-b11b-b111107ca70a",
_timestamp2filename(camp7obj["modified"]) + ".json")
_timestamp2filename(camp7obj["modified"]) + ".json",
)
assert os.path.exists(camp6filepath)
assert os.path.exists(camp7filepath)
@ -402,12 +422,12 @@ def test_filesystem_sink_add_objects_list(fs_sink, fs_source):
def test_filesystem_sink_marking(fs_sink):
marking = stix2.v21.MarkingDefinition(
definition_type="tlp",
definition=stix2.v21.TLPMarking(tlp="green")
definition=stix2.v21.TLPMarking(tlp="green"),
)
fs_sink.add(marking)
marking_filepath = os.path.join(
FS_PATH, "marking-definition", marking["id"] + ".json"
FS_PATH, "marking-definition", marking["id"] + ".json",
)
assert os.path.exists(marking_filepath)
@ -483,8 +503,10 @@ def test_filesystem_store_add(fs_store):
assert camp1_r.id == camp1.id
assert camp1_r.name == camp1.name
filepath = os.path.join(FS_PATH, "campaign", camp1_r.id,
_timestamp2filename(camp1_r.modified) + ".json")
filepath = os.path.join(
FS_PATH, "campaign", camp1_r.id,
_timestamp2filename(camp1_r.modified) + ".json",
)
# remove
os.remove(filepath)
@ -500,8 +522,10 @@ def test_filesystem_store_add_as_bundle():
)
fs_store.add(camp1)
filepath = os.path.join(FS_PATH, "campaign", camp1.id,
_timestamp2filename(camp1.modified) + ".json")
filepath = os.path.join(
FS_PATH, "campaign", camp1.id,
_timestamp2filename(camp1.modified) + ".json",
)
with open(filepath) as bundle_file:
assert '"type": "bundle"' in bundle_file.read()
@ -531,12 +555,12 @@ def test_filesystem_store_add_invalid_object(fs_store):
def test_filesystem_store_add_marking(fs_store):
marking = stix2.v21.MarkingDefinition(
definition_type="tlp",
definition=stix2.v21.TLPMarking(tlp="green")
definition=stix2.v21.TLPMarking(tlp="green"),
)
fs_store.add(marking)
marking_filepath = os.path.join(
FS_PATH, "marking-definition", marking["id"] + ".json"
FS_PATH, "marking-definition", marking["id"] + ".json",
)
assert os.path.exists(marking_filepath)
@ -701,7 +725,7 @@ def test_auth_set_black1():
def test_optimize_types1():
filters = [
stix2.Filter("type", "=", "foo")
stix2.Filter("type", "=", "foo"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -715,7 +739,7 @@ def test_optimize_types1():
def test_optimize_types2():
filters = [
stix2.Filter("type", "=", "foo"),
stix2.Filter("type", "=", "bar")
stix2.Filter("type", "=", "bar"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -729,7 +753,7 @@ def test_optimize_types2():
def test_optimize_types3():
filters = [
stix2.Filter("type", "in", ["A", "B", "C"]),
stix2.Filter("type", "in", ["B", "C", "D"])
stix2.Filter("type", "in", ["B", "C", "D"]),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -743,7 +767,7 @@ def test_optimize_types3():
def test_optimize_types4():
filters = [
stix2.Filter("type", "in", ["A", "B", "C"]),
stix2.Filter("type", "in", ["D", "E", "F"])
stix2.Filter("type", "in", ["D", "E", "F"]),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -757,7 +781,7 @@ def test_optimize_types4():
def test_optimize_types5():
filters = [
stix2.Filter("type", "in", ["foo", "bar"]),
stix2.Filter("type", "!=", "bar")
stix2.Filter("type", "!=", "bar"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -771,7 +795,7 @@ def test_optimize_types5():
def test_optimize_types6():
filters = [
stix2.Filter("type", "!=", "foo"),
stix2.Filter("type", "!=", "bar")
stix2.Filter("type", "!=", "bar"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -785,7 +809,7 @@ def test_optimize_types6():
def test_optimize_types7():
filters = [
stix2.Filter("type", "=", "foo"),
stix2.Filter("type", "!=", "foo")
stix2.Filter("type", "!=", "foo"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -810,7 +834,7 @@ def test_optimize_types8():
def test_optimize_types_ids1():
filters = [
stix2.Filter("type", "in", ["foo", "bar"]),
stix2.Filter("id", "=", "foo--00000000-0000-0000-0000-000000000000")
stix2.Filter("id", "=", "foo--00000000-0000-0000-0000-000000000000"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -824,7 +848,7 @@ def test_optimize_types_ids1():
def test_optimize_types_ids2():
filters = [
stix2.Filter("type", "=", "foo"),
stix2.Filter("id", "=", "bar--00000000-0000-0000-0000-000000000000")
stix2.Filter("id", "=", "bar--00000000-0000-0000-0000-000000000000"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -838,7 +862,7 @@ def test_optimize_types_ids2():
def test_optimize_types_ids3():
filters = [
stix2.Filter("type", "in", ["foo", "bar"]),
stix2.Filter("id", "!=", "bar--00000000-0000-0000-0000-000000000000")
stix2.Filter("id", "!=", "bar--00000000-0000-0000-0000-000000000000"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -852,11 +876,13 @@ def test_optimize_types_ids3():
def test_optimize_types_ids4():
filters = [
stix2.Filter("type", "in", ["A", "B", "C"]),
stix2.Filter("id", "in", [
"B--00000000-0000-0000-0000-000000000000",
"C--00000000-0000-0000-0000-000000000000",
"D--00000000-0000-0000-0000-000000000000",
])
stix2.Filter(
"id", "in", [
"B--00000000-0000-0000-0000-000000000000",
"C--00000000-0000-0000-0000-000000000000",
"D--00000000-0000-0000-0000-000000000000",
],
),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -866,7 +892,7 @@ def test_optimize_types_ids4():
assert auth_ids.auth_type == AuthSet.WHITE
assert auth_ids.values == {
"B--00000000-0000-0000-0000-000000000000",
"C--00000000-0000-0000-0000-000000000000"
"C--00000000-0000-0000-0000-000000000000",
}
@ -874,12 +900,14 @@ def test_optimize_types_ids5():
filters = [
stix2.Filter("type", "in", ["A", "B", "C"]),
stix2.Filter("type", "!=", "C"),
stix2.Filter("id", "in", [
"B--00000000-0000-0000-0000-000000000000",
"C--00000000-0000-0000-0000-000000000000",
"D--00000000-0000-0000-0000-000000000000"
]),
stix2.Filter("id", "!=", "D--00000000-0000-0000-0000-000000000000")
stix2.Filter(
"id", "in", [
"B--00000000-0000-0000-0000-000000000000",
"C--00000000-0000-0000-0000-000000000000",
"D--00000000-0000-0000-0000-000000000000",
],
),
stix2.Filter("id", "!=", "D--00000000-0000-0000-0000-000000000000"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -892,7 +920,7 @@ def test_optimize_types_ids5():
def test_optimize_types_ids6():
filters = [
stix2.Filter("id", "=", "A--00000000-0000-0000-0000-000000000000")
stix2.Filter("id", "=", "A--00000000-0000-0000-0000-000000000000"),
]
auth_types, auth_ids = _find_search_optimizations(filters)
@ -906,7 +934,7 @@ def test_optimize_types_ids6():
def test_search_auth_set_white1():
auth_set = AuthSet(
{"attack-pattern", "doesntexist"},
set()
set(),
)
results = _get_matching_dir_entries(FS_PATH, auth_set, stat.S_ISDIR)
@ -920,19 +948,19 @@ def test_search_auth_set_white2():
auth_set = AuthSet(
{
"malware--6b616fc1-1505-48e3-8b2c-0d19337bff38",
"malware--92ec0cbd-2c30-44a2-b270-73f4ec949841"
"malware--92ec0cbd-2c30-44a2-b270-73f4ec949841",
},
{
"malware--92ec0cbd-2c30-44a2-b270-73f4ec949841",
"malware--96b08451-b27a-4ff6-893f-790e26393a8e",
"doesntexist"
}
"doesntexist",
},
)
results = _get_matching_dir_entries(
os.path.join(FS_PATH, "malware"),
auth_set, stat.S_ISDIR
auth_set, stat.S_ISDIR,
)
assert results == ["malware--6b616fc1-1505-48e3-8b2c-0d19337bff38"]
@ -942,9 +970,11 @@ def test_search_auth_set_white3():
auth_set = AuthSet({"20170531213258226477", "doesntexist"}, set())
results = _get_matching_dir_entries(
os.path.join(FS_PATH, "malware",
"malware--6b616fc1-1505-48e3-8b2c-0d19337bff38"),
auth_set, stat.S_ISREG, ".json"
os.path.join(
FS_PATH, "malware",
"malware--6b616fc1-1505-48e3-8b2c-0d19337bff38",
),
auth_set, stat.S_ISREG, ".json",
)
assert results == ["20170531213258226477.json"]
@ -953,23 +983,23 @@ def test_search_auth_set_white3():
def test_search_auth_set_black1():
auth_set = AuthSet(
None,
{"tool--242f3da3-4425-4d11-8f5c-b842886da966", "doesntexist"}
{"tool--242f3da3-4425-4d11-8f5c-b842886da966", "doesntexist"},
)
results = _get_matching_dir_entries(
os.path.join(FS_PATH, "tool"),
auth_set, stat.S_ISDIR
auth_set, stat.S_ISDIR,
)
assert set(results) == {
"tool--03342581-f790-4f03-ba41-e82e67392e23"
"tool--03342581-f790-4f03-ba41-e82e67392e23",
}
def test_search_auth_set_white_empty():
auth_set = AuthSet(
set(),
set()
set(),
)
results = _get_matching_dir_entries(FS_PATH, auth_set, stat.S_ISDIR)
@ -982,7 +1012,7 @@ def test_search_auth_set_black_empty(rel_fs_store):
# predictable (it adds "campaign").
auth_set = AuthSet(
None,
set()
set(),
)
results = _get_matching_dir_entries(FS_PATH, auth_set, stat.S_ISDIR)
@ -998,14 +1028,14 @@ def test_search_auth_set_black_empty(rel_fs_store):
"malware",
"marking-definition",
"relationship",
"tool"
"tool",
}
def test_timestamp2filename_naive():
dt = datetime.datetime(
2010, 6, 15,
8, 30, 10, 1234
8, 30, 10, 1234,
)
filename = _timestamp2filename(dt)
@ -1018,7 +1048,7 @@ def test_timestamp2filename_tz():
dt = datetime.datetime(
2010, 6, 15,
7, 30, 10, 1234,
tz
tz,
)
filename = _timestamp2filename(dt)