Limit query_devices_for_destination to 10 concurrent invocations

pull/14716/head
Olivier Wilkinson (reivilibre) 2022-12-20 11:49:56 +00:00
parent 227c953d99
commit 7d2261f922
1 changed files with 20 additions and 19 deletions

View File

@ -36,8 +36,8 @@ from synapse.types import (
get_domain_from_id,
get_verify_key_from_cross_signing_key,
)
from synapse.util import json_decoder, unwrapFirstError
from synapse.util.async_helpers import Linearizer, delay_cancellation
from synapse.util import json_decoder
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.cancellation import cancellable
from synapse.util.retryutils import NotRetryingDestination
@ -241,24 +241,25 @@ class E2eKeysHandler:
logger.debug(
"%d destinations to query devices for", len(remote_queries_not_in_cache)
)
await make_deferred_yieldable(
delay_cancellation(
defer.gatherResults(
[
run_in_background(
self._query_devices_for_destination,
results,
cross_signing_keys,
failures,
destination,
queries,
timeout,
)
for destination, queries in remote_queries_not_in_cache.items()
],
consumeErrors=True,
).addErrback(unwrapFirstError)
async def _query(
destination_queries: Tuple[str, Dict[str, Iterable[str]]]
) -> None:
destination, queries = destination_queries
return await self._query_devices_for_destination(
results,
cross_signing_keys,
failures,
destination,
queries,
timeout,
)
await concurrently_execute(
_query,
remote_queries_not_in_cache.items(),
10,
delay_cancellation=True,
)
ret = {"device_keys": results, "failures": failures}