Prevent federation user keys query from returning device names if disallowed (#14304)

pull/14328/head
Andrew Morgan 2022-10-28 18:06:02 +01:00 committed by GitHub
parent 730b13dbc9
commit 7911e2835d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 46 additions and 9 deletions

1
changelog.d/14304.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a bug introduced in 1.34.0 where device names would be returned via a federation user key query request when `allow_device_name_lookup_over_federation` was set to `false`.

View File

@ -49,6 +49,7 @@ logger = logging.getLogger(__name__)
class E2eKeysHandler: class E2eKeysHandler:
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
self.config = hs.config
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self.federation = hs.get_federation_client() self.federation = hs.get_federation_client()
self.device_handler = hs.get_device_handler() self.device_handler = hs.get_device_handler()
@ -431,13 +432,17 @@ class E2eKeysHandler:
@trace @trace
@cancellable @cancellable
async def query_local_devices( async def query_local_devices(
self, query: Mapping[str, Optional[List[str]]] self,
query: Mapping[str, Optional[List[str]]],
include_displaynames: bool = True,
) -> Dict[str, Dict[str, dict]]: ) -> Dict[str, Dict[str, dict]]:
"""Get E2E device keys for local users """Get E2E device keys for local users
Args: Args:
query: map from user_id to a list query: map from user_id to a list
of devices to query (None for all devices) of devices to query (None for all devices)
include_displaynames: Whether to include device displaynames in the returned
device details.
Returns: Returns:
A map from user_id -> device_id -> device details A map from user_id -> device_id -> device details
@ -469,7 +474,9 @@ class E2eKeysHandler:
# make sure that each queried user appears in the result dict # make sure that each queried user appears in the result dict
result_dict[user_id] = {} result_dict[user_id] = {}
results = await self.store.get_e2e_device_keys_for_cs_api(local_query) results = await self.store.get_e2e_device_keys_for_cs_api(
local_query, include_displaynames
)
# Build the result structure # Build the result structure
for user_id, device_keys in results.items(): for user_id, device_keys in results.items():
@ -482,11 +489,33 @@ class E2eKeysHandler:
async def on_federation_query_client_keys( async def on_federation_query_client_keys(
self, query_body: Dict[str, Dict[str, Optional[List[str]]]] self, query_body: Dict[str, Dict[str, Optional[List[str]]]]
) -> JsonDict: ) -> JsonDict:
"""Handle a device key query from a federated server""" """Handle a device key query from a federated server:
Handles the path: GET /_matrix/federation/v1/users/keys/query
Args:
query_body: The body of the query request. Should contain a key
"device_keys" that map to a dictionary of user ID's -> list of
device IDs. If the list of device IDs is empty, all devices of
that user will be queried.
Returns:
A json dictionary containing the following:
- device_keys: A dictionary containing the requested device information.
- master_keys: An optional dictionary of user ID -> master cross-signing
key info.
- self_signing_key: An optional dictionary of user ID -> self-signing
key info.
"""
device_keys_query: Dict[str, Optional[List[str]]] = query_body.get( device_keys_query: Dict[str, Optional[List[str]]] = query_body.get(
"device_keys", {} "device_keys", {}
) )
res = await self.query_local_devices(device_keys_query) res = await self.query_local_devices(
device_keys_query,
include_displaynames=(
self.config.federation.allow_device_name_lookup_over_federation
),
)
ret = {"device_keys": res} ret = {"device_keys": res}
# add in the cross-signing keys # add in the cross-signing keys

View File

@ -139,11 +139,15 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
@trace @trace
@cancellable @cancellable
async def get_e2e_device_keys_for_cs_api( async def get_e2e_device_keys_for_cs_api(
self, query_list: List[Tuple[str, Optional[str]]] self,
query_list: List[Tuple[str, Optional[str]]],
include_displaynames: bool = True,
) -> Dict[str, Dict[str, JsonDict]]: ) -> Dict[str, Dict[str, JsonDict]]:
"""Fetch a list of device keys, formatted suitably for the C/S API. """Fetch a list of device keys, formatted suitably for the C/S API.
Args: Args:
query_list(list): List of pairs of user_ids and device_ids. query_list: List of pairs of user_ids and device_ids.
include_displaynames: Whether to include the displayname of returned devices
(if one exists).
Returns: Returns:
Dict mapping from user-id to dict mapping from device_id to Dict mapping from user-id to dict mapping from device_id to
key data. The key data will be a dict in the same format as the key data. The key data will be a dict in the same format as the
@ -166,9 +170,12 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
continue continue
r["unsigned"] = {} r["unsigned"] = {}
display_name = device_info.display_name if include_displaynames:
if display_name is not None: # Include the device's display name in the "unsigned" dictionary
r["unsigned"]["device_display_name"] = display_name display_name = device_info.display_name
if display_name is not None:
r["unsigned"]["device_display_name"] = display_name
rv[user_id][device_id] = r rv[user_id][device_id] = r
return rv return rv