Add async helpers

pull/14716/head
Olivier Wilkinson (reivilibre) 2022-12-20 11:49:39 +00:00
parent 9c4562c74a
commit 227c953d99
1 changed files with 51 additions and 4 deletions

View File

@ -205,7 +205,10 @@ T = TypeVar("T")
async def concurrently_execute(
func: Callable[[T], Any], args: Iterable[T], limit: int
func: Callable[[T], Any],
args: Iterable[T],
limit: int,
delay_cancellation: bool = False,
) -> None:
"""Executes the function with each argument concurrently while limiting
the number of concurrent executions.
@ -215,6 +218,8 @@ async def concurrently_execute(
args: List of arguments to pass to func, each invocation of func
gets a single argument.
limit: Maximum number of conccurent executions.
delay_cancellation: Whether to delay cancellation until after the invocations
have finished.
Returns:
None, when all function invocations have finished. The return values
@ -233,8 +238,15 @@ async def concurrently_execute(
# We use `itertools.islice` to handle the case where the number of args is
# less than the limit, avoiding needlessly spawning unnecessary background
# tasks.
if delay_cancellation:
await yieldable_gather_results_delaying_cancellation(
_concurrently_execute_inner,
(value for value in itertools.islice(it, limit)),
)
else:
await yieldable_gather_results(
_concurrently_execute_inner, (value for value in itertools.islice(it, limit))
_concurrently_execute_inner,
(value for value in itertools.islice(it, limit)),
)
@ -292,6 +304,41 @@ async def yieldable_gather_results(
raise dfe.subFailure.value from None
async def yieldable_gather_results_delaying_cancellation(
func: Callable[Concatenate[T, P], Awaitable[R]],
iter: Iterable[T],
*args: P.args,
**kwargs: P.kwargs,
) -> List[R]:
"""Executes the function with each argument concurrently.
Cancellation is delayed until after all the results have been gathered.
See `yieldable_gather_results`.
Args:
func: Function to execute that returns a Deferred
iter: An iterable that yields items that get passed as the first
argument to the function
*args: Arguments to be passed to each call to func
**kwargs: Keyword arguments to be passed to each call to func
Returns
A list containing the results of the function
"""
try:
return await make_deferred_yieldable(
delay_cancellation(
defer.gatherResults(
[run_in_background(func, item, *args, **kwargs) for item in iter], # type: ignore[arg-type]
consumeErrors=True,
)
)
)
except defer.FirstError as dfe:
assert isinstance(dfe.subFailure.value, BaseException)
raise dfe.subFailure.value from None
T1 = TypeVar("T1")
T2 = TypeVar("T2")
T3 = TypeVar("T3")