Convert run_as_background_process inner function to async. (#8032)

pull/8043/head
Patrick Cloke 2020-08-06 08:20:42 -04:00 committed by GitHub
parent 66f24449dd
commit c36228c403
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 16 additions and 26 deletions

1
changelog.d/8032.misc Normal file
View File

@ -0,0 +1 @@
Convert various parts of the codebase to async/await.

View File

@ -101,7 +101,7 @@ class ApplicationServicesHandler(object):
async def start_scheduler(): async def start_scheduler():
try: try:
return self.scheduler.start() return await self.scheduler.start()
except Exception: except Exception:
logger.error("Application Services Failure") logger.error("Application Services Failure")

View File

@ -146,10 +146,9 @@ class SynapseRequest(Request):
Returns a context manager; the correct way to use this is: Returns a context manager; the correct way to use this is:
@defer.inlineCallbacks async def handle_request(request):
def handle_request(request):
with request.processing("FooServlet"): with request.processing("FooServlet"):
yield really_handle_the_request() await really_handle_the_request()
Once the context manager is closed, the completion of the request will be logged, Once the context manager is closed, the completion of the request will be logged,
and the various metrics will be updated. and the various metrics will be updated.

View File

@ -13,16 +13,15 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import inspect
import logging import logging
import threading import threading
from asyncio import iscoroutine
from functools import wraps from functools import wraps
from typing import TYPE_CHECKING, Dict, Optional, Set from typing import TYPE_CHECKING, Dict, Optional, Set
from prometheus_client.core import REGISTRY, Counter, Gauge from prometheus_client.core import REGISTRY, Counter, Gauge
from twisted.internet import defer from twisted.internet import defer
from twisted.python.failure import Failure
from synapse.logging.context import LoggingContext, PreserveLoggingContext from synapse.logging.context import LoggingContext, PreserveLoggingContext
@ -167,7 +166,7 @@ class _BackgroundProcess(object):
) )
def run_as_background_process(desc, func, *args, **kwargs): def run_as_background_process(desc: str, func, *args, **kwargs):
"""Run the given function in its own logcontext, with resource metrics """Run the given function in its own logcontext, with resource metrics
This should be used to wrap processes which are fired off to run in the This should be used to wrap processes which are fired off to run in the
@ -179,7 +178,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
normal synapse inlineCallbacks function). normal synapse inlineCallbacks function).
Args: Args:
desc (str): a description for this background process type desc: a description for this background process type
func: a function, which may return a Deferred or a coroutine func: a function, which may return a Deferred or a coroutine
args: positional args for func args: positional args for func
kwargs: keyword args for func kwargs: keyword args for func
@ -188,8 +187,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
follow the synapse logcontext rules. follow the synapse logcontext rules.
""" """
@defer.inlineCallbacks async def run():
def run():
with _bg_metrics_lock: with _bg_metrics_lock:
count = _background_process_counts.get(desc, 0) count = _background_process_counts.get(desc, 0)
_background_process_counts[desc] = count + 1 _background_process_counts[desc] = count + 1
@ -203,29 +201,21 @@ def run_as_background_process(desc, func, *args, **kwargs):
try: try:
result = func(*args, **kwargs) result = func(*args, **kwargs)
# We probably don't have an ensureDeferred in our call stack to handle if inspect.isawaitable(result):
# coroutine results, so we need to ensureDeferred here. result = await result
#
# But we need this check because ensureDeferred doesn't like being
# called on immediate values (as opposed to Deferreds or coroutines).
if iscoroutine(result):
result = defer.ensureDeferred(result)
return (yield result) return result
except Exception: except Exception:
# failure.Failure() fishes the original Failure out of our stack, and logger.exception(
# thus gives us a sensible stack trace. "Background process '%s' threw an exception", desc,
f = Failure()
logger.error(
"Background process '%s' threw an exception",
desc,
exc_info=(f.type, f.value, f.getTracebackObject()),
) )
finally: finally:
_background_process_in_flight_count.labels(desc).dec() _background_process_in_flight_count.labels(desc).dec()
with PreserveLoggingContext(): with PreserveLoggingContext():
return run() # Note that we return a Deferred here so that it can be used in a
# looping_call and other places that expect a Deferred.
return defer.ensureDeferred(run())
def wrap_as_background_process(desc): def wrap_as_background_process(desc):