From 612f2fbab8e607abd44efb5610184aaadf6ef23f Mon Sep 17 00:00:00 2001 From: Chris Lenk Date: Fri, 27 Oct 2017 15:50:43 -0400 Subject: [PATCH] Increase code coverage for filesystem datastore Found a couple bugs in the process and fixed them, too. --- stix2/sources/filesystem.py | 16 +++++------ stix2/sources/memory.py | 4 +-- stix2/test/test_filesystem.py | 54 +++++++++++++++++++++++++++++++++++ stix2/utils.py | 9 ++++-- 4 files changed, 70 insertions(+), 13 deletions(-) diff --git a/stix2/sources/filesystem.py b/stix2/sources/filesystem.py index 9fd658d..34dbcf0 100644 --- a/stix2/sources/filesystem.py +++ b/stix2/sources/filesystem.py @@ -119,9 +119,9 @@ class FileSystemSink(DataSink): self.add(stix_obj) else: - raise ValueError("stix_data must be a STIX object (or list of), " - "json formatted STIX (or list of), " - "or a json formatted STIX bundle") + raise TypeError("stix_data must be a STIX object (or list of), " + "JSON formatted STIX (or list of), " + "or a JSON formatted STIX bundle") class FileSystemSource(DataSource): @@ -198,8 +198,8 @@ class FileSystemSource(DataSource): """Search and retrieve STIX objects based on the complete query. A "complete query" includes the filters from the query, the filters - attached to MemorySource, and any filters passed from a - CompositeDataSource (i.e. _composite_filters) + attached to this FileSystemSource, and any filters passed from a + CompositeDataSource (i.e. _composite_filters). Args: query (list): list of filters to search on @@ -222,7 +222,7 @@ class FileSystemSource(DataSource): if not isinstance(query, list): # make sure dont make set from a Filter object, # need to make a set from a list of Filter objects (even if just one Filter) - query = list(query) + query = [query] query = set(query) # combine all query filters @@ -267,8 +267,8 @@ class FileSystemSource(DataSource): # so query will look in all STIX directories that are not # the specified type. Compile correct dir paths for dir in os.listdir(self._stix_dir): - if os.path.abspath(dir) not in declude_paths: - include_paths.append(os.path.abspath(dir)) + if os.path.abspath(os.path.join(self._stix_dir, dir)) not in declude_paths: + include_paths.append(os.path.abspath(os.path.join(self._stix_dir, dir))) # grab stix object ID as well - if present in filters, as # may forgo the loading of STIX content into memory diff --git a/stix2/sources/memory.py b/stix2/sources/memory.py index 0846f9b..ec44dba 100644 --- a/stix2/sources/memory.py +++ b/stix2/sources/memory.py @@ -260,8 +260,8 @@ class MemorySource(DataSource): """Search and retrieve STIX objects based on the complete query. A "complete query" includes the filters from the query, the filters - attached to MemorySource, and any filters passed from a - CompositeDataSource (i.e. _composite_filters) + attached to this MemorySource, and any filters passed from a + CompositeDataSource (i.e. _composite_filters). Args: query (list): list of filters to search on diff --git a/stix2/test/test_filesystem.py b/stix2/test/test_filesystem.py index 1e79d05..7aaa3f5 100644 --- a/stix2/test/test_filesystem.py +++ b/stix2/test/test_filesystem.py @@ -40,6 +40,18 @@ def fs_sink(): shutil.rmtree(os.path.join(FS_PATH, "campaign"), True) +def test_filesystem_source_nonexistent_folder(): + with pytest.raises(ValueError) as excinfo: + FileSystemSource('nonexistent-folder') + assert "for STIX data does not exist" in str(excinfo) + + +def test_filesystem_sink_nonexistent_folder(): + with pytest.raises(ValueError) as excinfo: + FileSystemSink('nonexistent-folder') + assert "for STIX data does not exist" in str(excinfo) + + def test_filesytem_source_get_object(fs_source): # get object mal = fs_source.get("malware--6b616fc1-1505-48e3-8b2c-0d19337bff38") @@ -47,6 +59,11 @@ def test_filesytem_source_get_object(fs_source): assert mal.name == "Rover" +def test_filesytem_source_get_nonexistent_object(fs_source): + ind = fs_source.get("indicator--6b616fc1-1505-48e3-8b2c-0d19337bff38") + assert ind is None + + def test_filesytem_source_all_versions(fs_source): # all versions - (currently not a true all versions call as FileSystem cant have multiple versions) id_ = fs_source.get("identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5") @@ -240,6 +257,33 @@ def test_filesystem_store_query(fs_store): assert "tool--03342581-f790-4f03-ba41-e82e67392e23" in [tool.id for tool in tools] +def test_filesystem_store_query_single_filter(fs_store): + query = Filter("labels", "in", "tool") + tools = fs_store.query(query) + assert len(tools) == 2 + assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [tool.id for tool in tools] + assert "tool--03342581-f790-4f03-ba41-e82e67392e23" in [tool.id for tool in tools] + + +def test_filesystem_store_empty_query(fs_store): + results = fs_store.query() # returns all + assert len(results) == 26 + assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [obj.id for obj in results] + assert "marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168" in [obj.id for obj in results] + + +def test_filesystem_store_query_multiple_filters(fs_store): + fs_store.source.filters.add(Filter("labels", "in", "tool")) + tools = fs_store.query(Filter("id", "=", "tool--242f3da3-4425-4d11-8f5c-b842886da966")) + assert len(tools) == 1 + assert tools[0].id == "tool--242f3da3-4425-4d11-8f5c-b842886da966" + + +def test_filesystem_store_query_dont_include_type_folder(fs_store): + results = fs_store.query(Filter("type", "!=", "tool")) + assert len(results) == 24 + + def test_filesystem_store_add(fs_store): # add() camp1 = Campaign(name="Great Heathen Army", @@ -278,6 +322,16 @@ def test_filesystem_add_bundle_object(fs_store): fs_store.add(bundle) +def test_filesystem_store_add_invalid_object(fs_store): + ind = ('campaign', 'campaign--111111b6-1112-4fb0-111b-b111107ca70a') # tuple isn't valid + with pytest.raises(TypeError) as excinfo: + fs_store.add(ind) + assert 'stix_data must be' in str(excinfo.value) + assert 'a STIX object' in str(excinfo.value) + assert 'JSON formatted STIX' in str(excinfo.value) + assert 'JSON formatted STIX bundle' in str(excinfo.value) + + def test_filesystem_object_with_custom_property(fs_store): camp = Campaign(name="Scipio Africanus", objective="Defeat the Carthaginians", diff --git a/stix2/utils.py b/stix2/utils.py index 8df4323..4623f28 100644 --- a/stix2/utils.py +++ b/stix2/utils.py @@ -34,7 +34,7 @@ class STIXdatetime(dt.datetime): def deduplicate(stix_obj_list): - """Deduplicate a list of STIX objects to a unique set + """Deduplicate a list of STIX objects to a unique set. Reduces a set of STIX objects to unique set by looking at 'id' and 'modified' fields - as a unique object version @@ -44,7 +44,6 @@ def deduplicate(stix_obj_list): of deduplicate(),that if the "stix_obj_list" argument has multiple STIX objects of the same version, the last object version found in the list will be the one that is returned. - () Args: stix_obj_list (list): list of STIX objects (dicts) @@ -56,7 +55,11 @@ def deduplicate(stix_obj_list): unique_objs = {} for obj in stix_obj_list: - unique_objs[(obj['id'], obj['modified'])] = obj + try: + unique_objs[(obj['id'], obj['modified'])] = obj + except KeyError: + # Handle objects with no `modified` property, e.g. marking-definition + unique_objs[(obj['id'], obj['created'])] = obj return list(unique_objs.values())