Merge pull request #444 from chisholm/comp_ds_unversioned_objs

Update CompositeDataSource and deduplicate() support for unversioned objects
pull/1/head
Chris Lenk 2020-10-15 08:57:55 -04:00 committed by GitHub
commit d17d01d165
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 38 additions and 19 deletions

View File

@ -481,14 +481,14 @@ class CompositeDataSource(DataSource):
if data:
all_data.append(data)
# remove duplicate versions
if len(all_data) > 0:
all_data = deduplicate(all_data)
else:
return None
# Search for latest version
stix_obj = latest_ver = None
for obj in all_data:
ver = obj.get("modified") or obj.get("created")
# reduce to most recent version
stix_obj = sorted(all_data, key=lambda k: k['modified'], reverse=True)[0]
if stix_obj is None or ver is None or ver > latest_ver:
stix_obj = obj
latest_ver = ver
return stix_obj

View File

@ -114,7 +114,7 @@ def test_deduplicate(stix_objs1):
mods = [obj['modified'] for obj in unique]
assert "indicator--00000000-0000-4000-8000-000000000001" in ids
assert "indicator--00000000-0000-4000-8000-000000000001" in ids
assert "indicator--00000000-0000-4000-8000-000000000002" in ids
assert "2017-01-27T13:49:53.935Z" in mods
assert "2017-01-27T13:49:53.936Z" in mods

View File

@ -132,7 +132,13 @@ def stix_objs1():
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z",
}
return [ind1, ind2, ind3, ind4, ind5]
sco = {
"type": "url",
"spec_version": "2.1",
"id": "url--cc1deced-d99b-4d72-9268-8182420cb2fd",
"value": "http://example.com/",
}
return [ind1, ind2, ind3, ind4, ind5, sco]
@pytest.fixture

View File

@ -59,6 +59,17 @@ def test_composite_datasource_operations(stix_objs1, stix_objs2):
assert indicator["modified"] == parse_into_datetime("2017-01-31T13:49:53.935Z")
assert indicator["type"] == "indicator"
sco = cds1.get("url--cc1deced-d99b-4d72-9268-8182420cb2fd")
assert sco["id"] == "url--cc1deced-d99b-4d72-9268-8182420cb2fd"
scos = cds1.all_versions("url--cc1deced-d99b-4d72-9268-8182420cb2fd")
assert len(scos) == 1
assert scos[0]["id"] == "url--cc1deced-d99b-4d72-9268-8182420cb2fd"
scos = cds1.query([Filter("value", "=", "http://example.com/")])
assert len(scos) == 1
assert scos[0]["id"] == "url--cc1deced-d99b-4d72-9268-8182420cb2fd"
query1 = [
Filter("type", "=", "indicator"),
]

View File

@ -104,17 +104,18 @@ def test_get_type_from_id(stix_id, type):
def test_deduplicate(stix_objs1):
unique = stix2.utils.deduplicate(stix_objs1)
# Only 3 objects are unique
# 2 id's vary
# Only 4 objects are unique
# 3 id's vary
# 2 modified times vary for a particular id
assert len(unique) == 3
assert len(unique) == 4
ids = [obj['id'] for obj in unique]
mods = [obj['modified'] for obj in unique]
mods = [obj.get('modified') for obj in unique]
assert "indicator--00000000-0000-4000-8000-000000000001" in ids
assert "indicator--00000000-0000-4000-8000-000000000001" in ids
assert "indicator--00000000-0000-4000-8000-000000000002" in ids
assert "url--cc1deced-d99b-4d72-9268-8182420cb2fd" in ids
assert "2017-01-27T13:49:53.935Z" in mods
assert "2017-01-27T13:49:53.936Z" in mods

View File

@ -132,11 +132,12 @@ def deduplicate(stix_obj_list):
unique_objs = {}
for obj in stix_obj_list:
try:
unique_objs[(obj['id'], obj['modified'])] = obj
except KeyError:
# Handle objects with no `modified` property, e.g. marking-definition
unique_objs[(obj['id'], obj['created'])] = obj
ver = obj.get("modified") or obj.get("created")
if ver is None:
unique_objs[obj["id"]] = obj
else:
unique_objs[(obj['id'], ver)] = obj
return list(unique_objs.values())