Merge pull request #61 from oasis-open/list_comp

frivolous list comprehensions converted to list()
stix2.1
Chris Lenk 2017-10-06 13:29:22 -04:00 committed by GitHub
commit 79902f361e
4 changed files with 24 additions and 26 deletions

View File

@ -181,7 +181,7 @@ class FileSystemSource(DataSource):
a python STIX objects and then returned a python STIX objects and then returned
""" """
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)] return list(self.get(stix_id=stix_id, _composite_filters=_composite_filters))
def query(self, query=None, _composite_filters=None): def query(self, query=None, _composite_filters=None):
"""search and retrieve STIX objects based on the complete query """search and retrieve STIX objects based on the complete query
@ -279,13 +279,11 @@ class FileSystemSource(DataSource):
# since ID is specified in one of filters, can evaluate against filename first without loading # since ID is specified in one of filters, can evaluate against filename first without loading
stix_obj = json.load(open(os.path.join(root, file_)))["objects"][0] stix_obj = json.load(open(os.path.join(root, file_)))["objects"][0]
# check against other filters, add if match # check against other filters, add if match
matches = [stix_obj_ for stix_obj_ in apply_common_filters([stix_obj], query)] all_data.extend(apply_common_filters([stix_obj], query))
all_data.extend(matches)
else: else:
# have to load into memory regardless to evaluate other filters # have to load into memory regardless to evaluate other filters
stix_obj = json.load(open(os.path.join(root, file_)))["objects"][0] stix_obj = json.load(open(os.path.join(root, file_)))["objects"][0]
matches = [stix_obj_ for stix_obj_ in apply_common_filters([stix_obj], query)] all_data.extend(apply_common_filters([stix_obj], query))
all_data.extend(matches)
all_data = deduplicate(all_data) all_data = deduplicate(all_data)

View File

@ -281,7 +281,7 @@ class MemorySource(DataSource):
query.update(_composite_filters) query.update(_composite_filters)
# Apply STIX common property filters. # Apply STIX common property filters.
all_data = [stix_obj for stix_obj in apply_common_filters(self._data.values(), query)] all_data = list(apply_common_filters(self._data.values(), query))
return all_data return all_data

View File

@ -126,7 +126,7 @@ class TAXIICollectionSource(DataSource):
stix_objs = self.collection.get_object(stix_id, taxii_filters)["objects"] stix_objs = self.collection.get_object(stix_id, taxii_filters)["objects"]
stix_obj = [stix_obj for stix_obj in apply_common_filters(stix_objs, query)] stix_obj = list(apply_common_filters(stix_objs, query))
if len(stix_obj): if len(stix_obj):
stix_obj = stix_obj[0] stix_obj = stix_obj[0]
@ -204,7 +204,7 @@ class TAXIICollectionSource(DataSource):
all_data = deduplicate(all_data) all_data = deduplicate(all_data)
# apply local (CompositeDataSource, TAXIICollectionSource and query filters) # apply local (CompositeDataSource, TAXIICollectionSource and query filters)
all_data = [stix_obj for stix_obj in apply_common_filters(all_data, query)] all_data = list(apply_common_filters(all_data, query))
# parse python STIX objects from the STIX object dicts # parse python STIX objects from the STIX object dicts
stix_objs = [parse(stix_obj_dict) for stix_obj_dict in all_data] stix_objs = [parse(stix_obj_dict) for stix_obj_dict in all_data]

View File

@ -330,7 +330,7 @@ def test_apply_common_filters(ds):
] ]
# "Return any object whose type is not relationship" # "Return any object whose type is not relationship"
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[0]])] resp = list(apply_common_filters(stix_objs, [filters[0]]))
ids = [r['id'] for r in resp] ids = [r['id'] for r in resp]
assert stix_objs[0]['id'] in ids assert stix_objs[0]['id'] in ids
assert stix_objs[1]['id'] in ids assert stix_objs[1]['id'] in ids
@ -338,88 +338,88 @@ def test_apply_common_filters(ds):
assert len(ids) == 3 assert len(ids) == 3
# "Return any object that matched id relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463" # "Return any object that matched id relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[1]])] resp = list(apply_common_filters(stix_objs, [filters[1]]))
assert resp[0]['id'] == stix_objs[2]['id'] assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1 assert len(resp) == 1
# "Return any object that contains remote-access-trojan in labels" # "Return any object that contains remote-access-trojan in labels"
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[2]])] resp = list(apply_common_filters(stix_objs, [filters[2]]))
assert resp[0]['id'] == stix_objs[0]['id'] assert resp[0]['id'] == stix_objs[0]['id']
assert len(resp) == 1 assert len(resp) == 1
# "Return any object created after 2015-01-01T01:00:00.000Z" # "Return any object created after 2015-01-01T01:00:00.000Z"
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[3]])] resp = list(apply_common_filters(stix_objs, [filters[3]]))
assert resp[0]['id'] == stix_objs[0]['id'] assert resp[0]['id'] == stix_objs[0]['id']
assert len(resp) == 2 assert len(resp) == 2
# "Return any revoked object" # "Return any revoked object"
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[4]])] resp = list(apply_common_filters(stix_objs, [filters[4]]))
assert resp[0]['id'] == stix_objs[2]['id'] assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1 assert len(resp) == 1
# "Return any object whose not revoked" # "Return any object whose not revoked"
# Note that if 'revoked' property is not present in object. # Note that if 'revoked' property is not present in object.
# Currently we can't use such an expression to filter for... :( # Currently we can't use such an expression to filter for... :(
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[5]])] resp = list(apply_common_filters(stix_objs, [filters[5]]))
assert len(resp) == 0 assert len(resp) == 0
# "Return any object that matches marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9 in object_marking_refs" # "Return any object that matches marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9 in object_marking_refs"
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[6]])] resp = list(apply_common_filters(stix_objs, [filters[6]]))
assert resp[0]['id'] == stix_objs[2]['id'] assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1 assert len(resp) == 1
# "Return any object that contains relationship_type in their selectors AND # "Return any object that contains relationship_type in their selectors AND
# also has marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed in marking_ref" # also has marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed in marking_ref"
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[7], filters[8]])] resp = list(apply_common_filters(stix_objs, [filters[7], filters[8]]))
assert resp[0]['id'] == stix_objs[2]['id'] assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1 assert len(resp) == 1
# "Return any object that contains CVE-2014-0160,CVE-2017-6608 in their external_id" # "Return any object that contains CVE-2014-0160,CVE-2017-6608 in their external_id"
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[9]])] resp = list(apply_common_filters(stix_objs, [filters[9]]))
assert resp[0]['id'] == stix_objs[3]['id'] assert resp[0]['id'] == stix_objs[3]['id']
assert len(resp) == 1 assert len(resp) == 1
# "Return any object that matches created_by_ref identity--00000000-0000-0000-0000-b8e91df99dc9" # "Return any object that matches created_by_ref identity--00000000-0000-0000-0000-b8e91df99dc9"
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[10]])] resp = list(apply_common_filters(stix_objs, [filters[10]]))
assert len(resp) == 1 assert len(resp) == 1
# "Return any object that matches marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9 in object_marking_refs" (None) # "Return any object that matches marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9 in object_marking_refs" (None)
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[11]])] resp = list(apply_common_filters(stix_objs, [filters[11]]))
assert len(resp) == 0 assert len(resp) == 0
# "Return any object that contains description in its selectors" (None) # "Return any object that contains description in its selectors" (None)
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[12]])] resp = list(apply_common_filters(stix_objs, [filters[12]]))
assert len(resp) == 0 assert len(resp) == 0
# "Return any object that object that matches CVE in source_name" (None, case sensitive) # "Return any object that object that matches CVE in source_name" (None, case sensitive)
resp = [stix_obj for stix_obj in apply_common_filters(stix_objs, [filters[13]])] resp = list(apply_common_filters(stix_objs, [filters[13]]))
assert len(resp) == 0 assert len(resp) == 0
def test_filters0(ds): def test_filters0(ds):
# "Return any object modified before 2017-01-28T13:49:53.935Z" # "Return any object modified before 2017-01-28T13:49:53.935Z"
resp = [stix_obj for stix_obj in apply_common_filters(STIX_OBJS2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")])] resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")]))
assert resp[0]['id'] == STIX_OBJS2[1]['id'] assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2 assert len(resp) == 2
def test_filters1(ds): def test_filters1(ds):
# "Return any object modified after 2017-01-28T13:49:53.935Z" # "Return any object modified after 2017-01-28T13:49:53.935Z"
resp = [stix_obj for stix_obj in apply_common_filters(STIX_OBJS2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")])] resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")]))
assert resp[0]['id'] == STIX_OBJS2[0]['id'] assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1 assert len(resp) == 1
def test_filters2(ds): def test_filters2(ds):
# "Return any object modified after or on 2017-01-28T13:49:53.935Z" # "Return any object modified after or on 2017-01-28T13:49:53.935Z"
resp = [stix_obj for stix_obj in apply_common_filters(STIX_OBJS2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")])] resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")]))
assert resp[0]['id'] == STIX_OBJS2[0]['id'] assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 3 assert len(resp) == 3
def test_filters3(ds): def test_filters3(ds):
# "Return any object modified before or on 2017-01-28T13:49:53.935Z" # "Return any object modified before or on 2017-01-28T13:49:53.935Z"
resp = [stix_obj for stix_obj in apply_common_filters(STIX_OBJS2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")])] resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")]))
assert resp[0]['id'] == STIX_OBJS2[1]['id'] assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2 assert len(resp) == 2
@ -434,7 +434,7 @@ def test_filters4(ds):
def test_filters5(ds): def test_filters5(ds):
# "Return any object whose id is not indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f" # "Return any object whose id is not indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"
resp = [stix_obj for stix_obj in apply_common_filters(STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")])] resp = list(apply_common_filters(STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")]))
assert resp[0]['id'] == STIX_OBJS2[0]['id'] assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1 assert len(resp) == 1