Update CompositeDataSource and deduplicate() to handle unversioned

objects, including 2.1 SCOs.  Updated some unit tests to test
this.  Fixed a typo in a 2.0 unit test (2.0 deduplicate() test).
pull/1/head
Michael Chisholm 2020-08-17 18:38:29 -04:00
parent 1948b38eec
commit 7fa3c78dea
6 changed files with 38 additions and 19 deletions

View File

@ -481,14 +481,14 @@ class CompositeDataSource(DataSource):
if data:
all_data.append(data)
# remove duplicate versions
if len(all_data) > 0:
all_data = deduplicate(all_data)
else:
return None
# Search for latest version
stix_obj = latest_ver = None
for obj in all_data:
ver = obj.get("modified") or obj.get("created")
# reduce to most recent version
stix_obj = sorted(all_data, key=lambda k: k['modified'], reverse=True)[0]
if stix_obj is None or ver is None or ver > latest_ver:
stix_obj = obj
latest_ver = ver
return stix_obj

View File

@ -114,7 +114,7 @@ def test_deduplicate(stix_objs1):
mods = [obj['modified'] for obj in unique]
assert "indicator--00000000-0000-4000-8000-000000000001" in ids
assert "indicator--00000000-0000-4000-8000-000000000001" in ids
assert "indicator--00000000-0000-4000-8000-000000000002" in ids
assert "2017-01-27T13:49:53.935Z" in mods
assert "2017-01-27T13:49:53.936Z" in mods

View File

@ -132,7 +132,13 @@ def stix_objs1():
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z",
}
return [ind1, ind2, ind3, ind4, ind5]
sco = {
"type": "url",
"spec_version": "2.1",
"id": "url--cc1deced-d99b-4d72-9268-8182420cb2fd",
"value": "http://example.com/"
}
return [ind1, ind2, ind3, ind4, ind5, sco]
@pytest.fixture

View File

@ -59,6 +59,17 @@ def test_composite_datasource_operations(stix_objs1, stix_objs2):
assert indicator["modified"] == parse_into_datetime("2017-01-31T13:49:53.935Z")
assert indicator["type"] == "indicator"
sco = cds1.get("url--cc1deced-d99b-4d72-9268-8182420cb2fd")
assert sco["id"] == "url--cc1deced-d99b-4d72-9268-8182420cb2fd"
scos = cds1.all_versions("url--cc1deced-d99b-4d72-9268-8182420cb2fd")
assert len(scos) == 1
assert scos[0]["id"] == "url--cc1deced-d99b-4d72-9268-8182420cb2fd"
scos = cds1.query([Filter("value", "=", "http://example.com/")])
assert len(scos) == 1
assert scos[0]["id"] == "url--cc1deced-d99b-4d72-9268-8182420cb2fd"
query1 = [
Filter("type", "=", "indicator"),
]

View File

@ -104,17 +104,18 @@ def test_get_type_from_id(stix_id, type):
def test_deduplicate(stix_objs1):
unique = stix2.utils.deduplicate(stix_objs1)
# Only 3 objects are unique
# 2 id's vary
# Only 4 objects are unique
# 3 id's vary
# 2 modified times vary for a particular id
assert len(unique) == 3
assert len(unique) == 4
ids = [obj['id'] for obj in unique]
mods = [obj['modified'] for obj in unique]
mods = [obj.get('modified') for obj in unique]
assert "indicator--00000000-0000-4000-8000-000000000001" in ids
assert "indicator--00000000-0000-4000-8000-000000000001" in ids
assert "indicator--00000000-0000-4000-8000-000000000002" in ids
assert "url--cc1deced-d99b-4d72-9268-8182420cb2fd" in ids
assert "2017-01-27T13:49:53.935Z" in mods
assert "2017-01-27T13:49:53.936Z" in mods

View File

@ -132,11 +132,12 @@ def deduplicate(stix_obj_list):
unique_objs = {}
for obj in stix_obj_list:
try:
unique_objs[(obj['id'], obj['modified'])] = obj
except KeyError:
# Handle objects with no `modified` property, e.g. marking-definition
unique_objs[(obj['id'], obj['created'])] = obj
ver = obj.get("modified") or obj.get("created")
if ver is None:
unique_objs[obj["id"]] = obj
else:
unique_objs[(obj['id'], ver)] = obj
return list(unique_objs.values())