Increase code coverage for filesystem datastore
Found a couple bugs in the process and fixed them, too.stix2.0
parent
f103084439
commit
612f2fbab8
|
@ -119,9 +119,9 @@ class FileSystemSink(DataSink):
|
|||
self.add(stix_obj)
|
||||
|
||||
else:
|
||||
raise ValueError("stix_data must be a STIX object (or list of), "
|
||||
"json formatted STIX (or list of), "
|
||||
"or a json formatted STIX bundle")
|
||||
raise TypeError("stix_data must be a STIX object (or list of), "
|
||||
"JSON formatted STIX (or list of), "
|
||||
"or a JSON formatted STIX bundle")
|
||||
|
||||
|
||||
class FileSystemSource(DataSource):
|
||||
|
@ -198,8 +198,8 @@ class FileSystemSource(DataSource):
|
|||
"""Search and retrieve STIX objects based on the complete query.
|
||||
|
||||
A "complete query" includes the filters from the query, the filters
|
||||
attached to MemorySource, and any filters passed from a
|
||||
CompositeDataSource (i.e. _composite_filters)
|
||||
attached to this FileSystemSource, and any filters passed from a
|
||||
CompositeDataSource (i.e. _composite_filters).
|
||||
|
||||
Args:
|
||||
query (list): list of filters to search on
|
||||
|
@ -222,7 +222,7 @@ class FileSystemSource(DataSource):
|
|||
if not isinstance(query, list):
|
||||
# make sure dont make set from a Filter object,
|
||||
# need to make a set from a list of Filter objects (even if just one Filter)
|
||||
query = list(query)
|
||||
query = [query]
|
||||
query = set(query)
|
||||
|
||||
# combine all query filters
|
||||
|
@ -267,8 +267,8 @@ class FileSystemSource(DataSource):
|
|||
# so query will look in all STIX directories that are not
|
||||
# the specified type. Compile correct dir paths
|
||||
for dir in os.listdir(self._stix_dir):
|
||||
if os.path.abspath(dir) not in declude_paths:
|
||||
include_paths.append(os.path.abspath(dir))
|
||||
if os.path.abspath(os.path.join(self._stix_dir, dir)) not in declude_paths:
|
||||
include_paths.append(os.path.abspath(os.path.join(self._stix_dir, dir)))
|
||||
|
||||
# grab stix object ID as well - if present in filters, as
|
||||
# may forgo the loading of STIX content into memory
|
||||
|
|
|
@ -260,8 +260,8 @@ class MemorySource(DataSource):
|
|||
"""Search and retrieve STIX objects based on the complete query.
|
||||
|
||||
A "complete query" includes the filters from the query, the filters
|
||||
attached to MemorySource, and any filters passed from a
|
||||
CompositeDataSource (i.e. _composite_filters)
|
||||
attached to this MemorySource, and any filters passed from a
|
||||
CompositeDataSource (i.e. _composite_filters).
|
||||
|
||||
Args:
|
||||
query (list): list of filters to search on
|
||||
|
|
|
@ -40,6 +40,18 @@ def fs_sink():
|
|||
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
|
||||
|
||||
|
||||
def test_filesystem_source_nonexistent_folder():
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
FileSystemSource('nonexistent-folder')
|
||||
assert "for STIX data does not exist" in str(excinfo)
|
||||
|
||||
|
||||
def test_filesystem_sink_nonexistent_folder():
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
FileSystemSink('nonexistent-folder')
|
||||
assert "for STIX data does not exist" in str(excinfo)
|
||||
|
||||
|
||||
def test_filesytem_source_get_object(fs_source):
|
||||
# get object
|
||||
mal = fs_source.get("malware--6b616fc1-1505-48e3-8b2c-0d19337bff38")
|
||||
|
@ -47,6 +59,11 @@ def test_filesytem_source_get_object(fs_source):
|
|||
assert mal.name == "Rover"
|
||||
|
||||
|
||||
def test_filesytem_source_get_nonexistent_object(fs_source):
|
||||
ind = fs_source.get("indicator--6b616fc1-1505-48e3-8b2c-0d19337bff38")
|
||||
assert ind is None
|
||||
|
||||
|
||||
def test_filesytem_source_all_versions(fs_source):
|
||||
# all versions - (currently not a true all versions call as FileSystem cant have multiple versions)
|
||||
id_ = fs_source.get("identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5")
|
||||
|
@ -240,6 +257,33 @@ def test_filesystem_store_query(fs_store):
|
|||
assert "tool--03342581-f790-4f03-ba41-e82e67392e23" in [tool.id for tool in tools]
|
||||
|
||||
|
||||
def test_filesystem_store_query_single_filter(fs_store):
|
||||
query = Filter("labels", "in", "tool")
|
||||
tools = fs_store.query(query)
|
||||
assert len(tools) == 2
|
||||
assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [tool.id for tool in tools]
|
||||
assert "tool--03342581-f790-4f03-ba41-e82e67392e23" in [tool.id for tool in tools]
|
||||
|
||||
|
||||
def test_filesystem_store_empty_query(fs_store):
|
||||
results = fs_store.query() # returns all
|
||||
assert len(results) == 26
|
||||
assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [obj.id for obj in results]
|
||||
assert "marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168" in [obj.id for obj in results]
|
||||
|
||||
|
||||
def test_filesystem_store_query_multiple_filters(fs_store):
|
||||
fs_store.source.filters.add(Filter("labels", "in", "tool"))
|
||||
tools = fs_store.query(Filter("id", "=", "tool--242f3da3-4425-4d11-8f5c-b842886da966"))
|
||||
assert len(tools) == 1
|
||||
assert tools[0].id == "tool--242f3da3-4425-4d11-8f5c-b842886da966"
|
||||
|
||||
|
||||
def test_filesystem_store_query_dont_include_type_folder(fs_store):
|
||||
results = fs_store.query(Filter("type", "!=", "tool"))
|
||||
assert len(results) == 24
|
||||
|
||||
|
||||
def test_filesystem_store_add(fs_store):
|
||||
# add()
|
||||
camp1 = Campaign(name="Great Heathen Army",
|
||||
|
@ -278,6 +322,16 @@ def test_filesystem_add_bundle_object(fs_store):
|
|||
fs_store.add(bundle)
|
||||
|
||||
|
||||
def test_filesystem_store_add_invalid_object(fs_store):
|
||||
ind = ('campaign', 'campaign--111111b6-1112-4fb0-111b-b111107ca70a') # tuple isn't valid
|
||||
with pytest.raises(TypeError) as excinfo:
|
||||
fs_store.add(ind)
|
||||
assert 'stix_data must be' in str(excinfo.value)
|
||||
assert 'a STIX object' in str(excinfo.value)
|
||||
assert 'JSON formatted STIX' in str(excinfo.value)
|
||||
assert 'JSON formatted STIX bundle' in str(excinfo.value)
|
||||
|
||||
|
||||
def test_filesystem_object_with_custom_property(fs_store):
|
||||
camp = Campaign(name="Scipio Africanus",
|
||||
objective="Defeat the Carthaginians",
|
||||
|
|
|
@ -34,7 +34,7 @@ class STIXdatetime(dt.datetime):
|
|||
|
||||
|
||||
def deduplicate(stix_obj_list):
|
||||
"""Deduplicate a list of STIX objects to a unique set
|
||||
"""Deduplicate a list of STIX objects to a unique set.
|
||||
|
||||
Reduces a set of STIX objects to unique set by looking
|
||||
at 'id' and 'modified' fields - as a unique object version
|
||||
|
@ -44,7 +44,6 @@ def deduplicate(stix_obj_list):
|
|||
of deduplicate(),that if the "stix_obj_list" argument has
|
||||
multiple STIX objects of the same version, the last object
|
||||
version found in the list will be the one that is returned.
|
||||
()
|
||||
|
||||
Args:
|
||||
stix_obj_list (list): list of STIX objects (dicts)
|
||||
|
@ -56,7 +55,11 @@ def deduplicate(stix_obj_list):
|
|||
unique_objs = {}
|
||||
|
||||
for obj in stix_obj_list:
|
||||
unique_objs[(obj['id'], obj['modified'])] = obj
|
||||
try:
|
||||
unique_objs[(obj['id'], obj['modified'])] = obj
|
||||
except KeyError:
|
||||
# Handle objects with no `modified` property, e.g. marking-definition
|
||||
unique_objs[(obj['id'], obj['created'])] = obj
|
||||
|
||||
return list(unique_objs.values())
|
||||
|
||||
|
|
Loading…
Reference in New Issue