Correctly account for cpu usage by background threads (#4074)
Wrap calls to deferToThread() in a thing which uses a child logcontext to attribute CPU usage to the right request. While we're in the area, remove the logcontext_tracer stuff, which is never used, and afaik doesn't work. Fixes #4064pull/4082/head
parent
1fe6bbb555
commit
5c445114d3
|
@ -0,0 +1 @@
|
||||||
|
Correctly account for cpu usage by background threads
|
|
@ -22,7 +22,7 @@ import bcrypt
|
||||||
import pymacaroons
|
import pymacaroons
|
||||||
from canonicaljson import json
|
from canonicaljson import json
|
||||||
|
|
||||||
from twisted.internet import defer, threads
|
from twisted.internet import defer
|
||||||
from twisted.web.client import PartialDownloadError
|
from twisted.web.client import PartialDownloadError
|
||||||
|
|
||||||
import synapse.util.stringutils as stringutils
|
import synapse.util.stringutils as stringutils
|
||||||
|
@ -37,8 +37,8 @@ from synapse.api.errors import (
|
||||||
)
|
)
|
||||||
from synapse.module_api import ModuleApi
|
from synapse.module_api import ModuleApi
|
||||||
from synapse.types import UserID
|
from synapse.types import UserID
|
||||||
|
from synapse.util import logcontext
|
||||||
from synapse.util.caches.expiringcache import ExpiringCache
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
from synapse.util.logcontext import make_deferred_yieldable
|
|
||||||
|
|
||||||
from ._base import BaseHandler
|
from ._base import BaseHandler
|
||||||
|
|
||||||
|
@ -884,11 +884,7 @@ class AuthHandler(BaseHandler):
|
||||||
bcrypt.gensalt(self.bcrypt_rounds),
|
bcrypt.gensalt(self.bcrypt_rounds),
|
||||||
).decode('ascii')
|
).decode('ascii')
|
||||||
|
|
||||||
return make_deferred_yieldable(
|
return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash)
|
||||||
threads.deferToThreadPool(
|
|
||||||
self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
def validate_hash(self, password, stored_hash):
|
def validate_hash(self, password, stored_hash):
|
||||||
"""Validates that self.hash(password) == stored_hash.
|
"""Validates that self.hash(password) == stored_hash.
|
||||||
|
@ -913,13 +909,7 @@ class AuthHandler(BaseHandler):
|
||||||
if not isinstance(stored_hash, bytes):
|
if not isinstance(stored_hash, bytes):
|
||||||
stored_hash = stored_hash.encode('ascii')
|
stored_hash = stored_hash.encode('ascii')
|
||||||
|
|
||||||
return make_deferred_yieldable(
|
return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
|
||||||
threads.deferToThreadPool(
|
|
||||||
self.hs.get_reactor(),
|
|
||||||
self.hs.get_reactor().getThreadPool(),
|
|
||||||
_do_validate_hash,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
return defer.succeed(False)
|
return defer.succeed(False)
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ from six.moves.urllib import parse as urlparse
|
||||||
|
|
||||||
import twisted.internet.error
|
import twisted.internet.error
|
||||||
import twisted.web.http
|
import twisted.web.http
|
||||||
from twisted.internet import defer, threads
|
from twisted.internet import defer
|
||||||
from twisted.web.resource import Resource
|
from twisted.web.resource import Resource
|
||||||
|
|
||||||
from synapse.api.errors import (
|
from synapse.api.errors import (
|
||||||
|
@ -36,8 +36,8 @@ from synapse.api.errors import (
|
||||||
)
|
)
|
||||||
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
|
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
|
from synapse.util import logcontext
|
||||||
from synapse.util.async_helpers import Linearizer
|
from synapse.util.async_helpers import Linearizer
|
||||||
from synapse.util.logcontext import make_deferred_yieldable
|
|
||||||
from synapse.util.retryutils import NotRetryingDestination
|
from synapse.util.retryutils import NotRetryingDestination
|
||||||
from synapse.util.stringutils import is_ascii, random_string
|
from synapse.util.stringutils import is_ascii, random_string
|
||||||
|
|
||||||
|
@ -492,10 +492,11 @@ class MediaRepository(object):
|
||||||
))
|
))
|
||||||
|
|
||||||
thumbnailer = Thumbnailer(input_path)
|
thumbnailer = Thumbnailer(input_path)
|
||||||
t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
|
t_byte_source = yield logcontext.defer_to_thread(
|
||||||
|
self.hs.get_reactor(),
|
||||||
self._generate_thumbnail,
|
self._generate_thumbnail,
|
||||||
thumbnailer, t_width, t_height, t_method, t_type
|
thumbnailer, t_width, t_height, t_method, t_type
|
||||||
))
|
)
|
||||||
|
|
||||||
if t_byte_source:
|
if t_byte_source:
|
||||||
try:
|
try:
|
||||||
|
@ -534,10 +535,11 @@ class MediaRepository(object):
|
||||||
))
|
))
|
||||||
|
|
||||||
thumbnailer = Thumbnailer(input_path)
|
thumbnailer = Thumbnailer(input_path)
|
||||||
t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
|
t_byte_source = yield logcontext.defer_to_thread(
|
||||||
|
self.hs.get_reactor(),
|
||||||
self._generate_thumbnail,
|
self._generate_thumbnail,
|
||||||
thumbnailer, t_width, t_height, t_method, t_type
|
thumbnailer, t_width, t_height, t_method, t_type
|
||||||
))
|
)
|
||||||
|
|
||||||
if t_byte_source:
|
if t_byte_source:
|
||||||
try:
|
try:
|
||||||
|
@ -620,15 +622,17 @@ class MediaRepository(object):
|
||||||
for (t_width, t_height, t_type), t_method in iteritems(thumbnails):
|
for (t_width, t_height, t_type), t_method in iteritems(thumbnails):
|
||||||
# Generate the thumbnail
|
# Generate the thumbnail
|
||||||
if t_method == "crop":
|
if t_method == "crop":
|
||||||
t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
|
t_byte_source = yield logcontext.defer_to_thread(
|
||||||
|
self.hs.get_reactor(),
|
||||||
thumbnailer.crop,
|
thumbnailer.crop,
|
||||||
t_width, t_height, t_type,
|
t_width, t_height, t_type,
|
||||||
))
|
)
|
||||||
elif t_method == "scale":
|
elif t_method == "scale":
|
||||||
t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
|
t_byte_source = yield logcontext.defer_to_thread(
|
||||||
|
self.hs.get_reactor(),
|
||||||
thumbnailer.scale,
|
thumbnailer.scale,
|
||||||
t_width, t_height, t_type,
|
t_width, t_height, t_type,
|
||||||
))
|
)
|
||||||
else:
|
else:
|
||||||
logger.error("Unrecognized method: %r", t_method)
|
logger.error("Unrecognized method: %r", t_method)
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -21,9 +21,10 @@ import sys
|
||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
from twisted.internet import defer, threads
|
from twisted.internet import defer
|
||||||
from twisted.protocols.basic import FileSender
|
from twisted.protocols.basic import FileSender
|
||||||
|
|
||||||
|
from synapse.util import logcontext
|
||||||
from synapse.util.file_consumer import BackgroundFileConsumer
|
from synapse.util.file_consumer import BackgroundFileConsumer
|
||||||
from synapse.util.logcontext import make_deferred_yieldable
|
from synapse.util.logcontext import make_deferred_yieldable
|
||||||
|
|
||||||
|
@ -64,9 +65,10 @@ class MediaStorage(object):
|
||||||
|
|
||||||
with self.store_into_file(file_info) as (f, fname, finish_cb):
|
with self.store_into_file(file_info) as (f, fname, finish_cb):
|
||||||
# Write to the main repository
|
# Write to the main repository
|
||||||
yield make_deferred_yieldable(threads.deferToThread(
|
yield logcontext.defer_to_thread(
|
||||||
|
self.hs.get_reactor(),
|
||||||
_write_file_synchronously, source, f,
|
_write_file_synchronously, source, f,
|
||||||
))
|
)
|
||||||
yield finish_cb()
|
yield finish_cb()
|
||||||
|
|
||||||
defer.returnValue(fname)
|
defer.returnValue(fname)
|
||||||
|
|
|
@ -17,9 +17,10 @@ import logging
|
||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
|
|
||||||
from twisted.internet import defer, threads
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.config._base import Config
|
from synapse.config._base import Config
|
||||||
|
from synapse.util import logcontext
|
||||||
from synapse.util.logcontext import run_in_background
|
from synapse.util.logcontext import run_in_background
|
||||||
|
|
||||||
from .media_storage import FileResponder
|
from .media_storage import FileResponder
|
||||||
|
@ -120,7 +121,8 @@ class FileStorageProviderBackend(StorageProvider):
|
||||||
if not os.path.exists(dirname):
|
if not os.path.exists(dirname):
|
||||||
os.makedirs(dirname)
|
os.makedirs(dirname)
|
||||||
|
|
||||||
return threads.deferToThread(
|
return logcontext.defer_to_thread(
|
||||||
|
self.hs.get_reactor(),
|
||||||
shutil.copyfile, primary_fname, backup_fname,
|
shutil.copyfile, primary_fname, backup_fname,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ See doc/log_contexts.rst for details on how this works.
|
||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer, threads
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -562,58 +562,76 @@ def _set_context_cb(result, context):
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
# modules to ignore in `logcontext_tracer`
|
def defer_to_thread(reactor, f, *args, **kwargs):
|
||||||
_to_ignore = [
|
|
||||||
"synapse.util.logcontext",
|
|
||||||
"synapse.http.server",
|
|
||||||
"synapse.storage._base",
|
|
||||||
"synapse.util.async_helpers",
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
def logcontext_tracer(frame, event, arg):
|
|
||||||
"""A tracer that logs whenever a logcontext "unexpectedly" changes within
|
|
||||||
a function. Probably inaccurate.
|
|
||||||
|
|
||||||
Use by calling `sys.settrace(logcontext_tracer)` in the main thread.
|
|
||||||
"""
|
"""
|
||||||
if event == 'call':
|
Calls the function `f` using a thread from the reactor's default threadpool and
|
||||||
name = frame.f_globals["__name__"]
|
returns the result as a Deferred.
|
||||||
if name.startswith("synapse"):
|
|
||||||
if name == "synapse.util.logcontext":
|
|
||||||
if frame.f_code.co_name in ["__enter__", "__exit__"]:
|
|
||||||
tracer = frame.f_back.f_trace
|
|
||||||
if tracer:
|
|
||||||
tracer.just_changed = True
|
|
||||||
|
|
||||||
tracer = frame.f_trace
|
Creates a new logcontext for `f`, which is created as a child of the current
|
||||||
if tracer:
|
logcontext (so its CPU usage metrics will get attributed to the current
|
||||||
return tracer
|
logcontext). `f` should preserve the logcontext it is given.
|
||||||
|
|
||||||
if not any(name.startswith(ig) for ig in _to_ignore):
|
The result deferred follows the Synapse logcontext rules: you should `yield`
|
||||||
return LineTracer()
|
on it.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
|
||||||
|
the Deferred will be invoked, and whose threadpool we should use for the
|
||||||
|
function.
|
||||||
|
|
||||||
|
Normally this will be hs.get_reactor().
|
||||||
|
|
||||||
|
f (callable): The function to call.
|
||||||
|
|
||||||
|
args: positional arguments to pass to f.
|
||||||
|
|
||||||
|
kwargs: keyword arguments to pass to f.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred: A Deferred which fires a callback with the result of `f`, or an
|
||||||
|
errback if `f` throws an exception.
|
||||||
|
"""
|
||||||
|
return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
class LineTracer(object):
|
def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
|
||||||
__slots__ = ["context", "just_changed"]
|
"""
|
||||||
|
A wrapper for twisted.internet.threads.deferToThreadpool, which handles
|
||||||
|
logcontexts correctly.
|
||||||
|
|
||||||
def __init__(self):
|
Calls the function `f` using a thread from the given threadpool and returns
|
||||||
self.context = LoggingContext.current_context()
|
the result as a Deferred.
|
||||||
self.just_changed = False
|
|
||||||
|
|
||||||
def __call__(self, frame, event, arg):
|
Creates a new logcontext for `f`, which is created as a child of the current
|
||||||
if event in 'line':
|
logcontext (so its CPU usage metrics will get attributed to the current
|
||||||
if self.just_changed:
|
logcontext). `f` should preserve the logcontext it is given.
|
||||||
self.context = LoggingContext.current_context()
|
|
||||||
self.just_changed = False
|
The result deferred follows the Synapse logcontext rules: you should `yield`
|
||||||
else:
|
on it.
|
||||||
c = LoggingContext.current_context()
|
|
||||||
if c != self.context:
|
Args:
|
||||||
logger.info(
|
reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
|
||||||
"Context changed! %s -> %s, %s, %s",
|
the Deferred will be invoked. Normally this will be hs.get_reactor().
|
||||||
self.context, c,
|
|
||||||
frame.f_code.co_filename, frame.f_lineno
|
threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for
|
||||||
|
running `f`. Normally this will be hs.get_reactor().getThreadPool().
|
||||||
|
|
||||||
|
f (callable): The function to call.
|
||||||
|
|
||||||
|
args: positional arguments to pass to f.
|
||||||
|
|
||||||
|
kwargs: keyword arguments to pass to f.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred: A Deferred which fires a callback with the result of `f`, or an
|
||||||
|
errback if `f` throws an exception.
|
||||||
|
"""
|
||||||
|
logcontext = LoggingContext.current_context()
|
||||||
|
|
||||||
|
def g():
|
||||||
|
with LoggingContext(parent_context=logcontext):
|
||||||
|
return f(*args, **kwargs)
|
||||||
|
|
||||||
|
return make_deferred_yieldable(
|
||||||
|
threads.deferToThreadPool(reactor, threadpool, g)
|
||||||
)
|
)
|
||||||
self.context = c
|
|
||||||
|
|
||||||
return self
|
|
||||||
|
|
Loading…
Reference in New Issue