Updates to opentracing hackery
parent
99e7fb1d52
commit
5cc41f1b05
|
@ -41,6 +41,7 @@ from prometheus_client import Histogram
|
|||
from typing_extensions import Literal
|
||||
|
||||
from twisted.enterprise import adbapi
|
||||
from twisted.python import reflect
|
||||
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.config.database import DatabaseConnectionConfig
|
||||
|
@ -50,7 +51,6 @@ from synapse.logging.context import (
|
|||
current_context,
|
||||
make_deferred_yieldable,
|
||||
)
|
||||
from synapse.logging.opentracing import trace
|
||||
from synapse.metrics import register_threadpool
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.background_updates import BackgroundUpdater
|
||||
|
@ -91,6 +91,20 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = {
|
|||
}
|
||||
|
||||
|
||||
class NastyConnectionWrapper:
|
||||
def __init__(self, connection):
|
||||
self._connection = connection
|
||||
self._synapse_parent_context = None
|
||||
|
||||
def commit(self, *args, **kwargs):
|
||||
with LoggingContext("db_commit", parent_context = self._synapse_parent_context):
|
||||
with opentracing.start_active_span("db.conn.commit"):
|
||||
self._connection.commit(*args, **kwargs)
|
||||
|
||||
def __getattr__(self, item):
|
||||
return getattr(self._connection, item)
|
||||
|
||||
|
||||
def make_pool(
|
||||
reactor, db_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine
|
||||
) -> adbapi.ConnectionPool:
|
||||
|
@ -105,22 +119,29 @@ def make_pool(
|
|||
# Ensure we have a logging context so we can correctly track queries,
|
||||
# etc.
|
||||
with LoggingContext("db.on_new_connection"):
|
||||
# HACK Patch the connection's commit function so that we can see
|
||||
# how long it's taking from Jaeger.
|
||||
class NastyConnectionWrapper:
|
||||
def __init__(self, connection):
|
||||
self._connection = connection
|
||||
self.commit = trace(connection.commit, "db.conn.commit")
|
||||
|
||||
def __getattr__(self, item):
|
||||
return getattr(self._connection, item)
|
||||
|
||||
engine.on_new_connection(
|
||||
LoggingDatabaseConnection(
|
||||
NastyConnectionWrapper(conn), engine, "on_new_connection"
|
||||
conn, engine, "on_new_connection"
|
||||
)
|
||||
)
|
||||
|
||||
# HACK Patch the connection's commit function so that we can see
|
||||
# how long it's taking from Jaeger. To do that, we need to patch the
|
||||
# dbapi module's 'connect' method so that it returns a wrapped 'Connection'
|
||||
# object to the connection pool. (psycopg2's Connection class is a C thing
|
||||
# which we can't monkey-patch directly).
|
||||
dbapiname = db_config.config["name"]
|
||||
dbapi = reflect.namedModule(dbapiname)
|
||||
if not getattr(dbapi, "_synapse_wrapped_dbapi", False):
|
||||
real_connect = dbapi.connect
|
||||
|
||||
def wrapped_connect(*args, **kwargs):
|
||||
conn = real_connect(*args, **kwargs)
|
||||
return NastyConnectionWrapper(conn)
|
||||
|
||||
dbapi.connect = wrapped_connect
|
||||
dbapi._synapse_wrapped_dbapi = True
|
||||
|
||||
connection_pool = adbapi.ConnectionPool(
|
||||
db_config.config["name"],
|
||||
cp_reactor=reactor,
|
||||
|
@ -813,6 +834,10 @@ class DatabasePool:
|
|||
# pool).
|
||||
assert not self.engine.in_transaction(conn)
|
||||
|
||||
# HACK: record the parent context in 'conn' so that we can tie later commits
|
||||
# back to it
|
||||
conn._connection._synapse_parent_context = parent_context
|
||||
|
||||
with LoggingContext(
|
||||
str(curr_context), parent_context=parent_context
|
||||
) as context:
|
||||
|
|
Loading…
Reference in New Issue