Fix uncommitted connections leaking to pool when first connected
parent
e3debf9682
commit
d8c2c96dfb
|
@ -633,6 +633,13 @@ class DatabasePool:
|
|||
start_time = monotonic_time()
|
||||
|
||||
def inner_func(conn, *args, **kwargs):
|
||||
# We shouldn't be in a transaction. If we are then something
|
||||
# somewhere hasn't committed after doing work. (This is likely only
|
||||
# possible during startup, as `run*` will ensure changes are
|
||||
# committed/rolled back before putting the connection back in the
|
||||
# pool).
|
||||
assert not self.engine.in_transaction(conn)
|
||||
|
||||
with LoggingContext("runWithConnection", parent_context) as context:
|
||||
sched_duration_sec = monotonic_time() - start_time
|
||||
sql_scheduling_timer.observe(sched_duration_sec)
|
||||
|
|
|
@ -97,3 +97,9 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
|
|||
"""Gets a string giving the server version. For example: '3.22.0'
|
||||
"""
|
||||
...
|
||||
|
||||
@abc.abstractmethod
|
||||
def in_transaction(self, conn: Connection) -> bool:
|
||||
"""Whether the connection is currently in a transaction.
|
||||
"""
|
||||
...
|
||||
|
|
|
@ -15,7 +15,8 @@
|
|||
|
||||
import logging
|
||||
|
||||
from ._base import BaseDatabaseEngine, IncorrectDatabaseSetup
|
||||
from synapse.storage.engines._base import BaseDatabaseEngine, IncorrectDatabaseSetup
|
||||
from synapse.storage.types import Connection
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -119,6 +120,7 @@ class PostgresEngine(BaseDatabaseEngine):
|
|||
cursor.execute("SET synchronous_commit TO OFF")
|
||||
|
||||
cursor.close()
|
||||
db_conn.commit()
|
||||
|
||||
@property
|
||||
def can_native_upsert(self):
|
||||
|
@ -171,3 +173,6 @@ class PostgresEngine(BaseDatabaseEngine):
|
|||
return "%i.%i" % (numver / 10000, numver % 10000)
|
||||
else:
|
||||
return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100)
|
||||
|
||||
def in_transaction(self, conn: Connection) -> bool:
|
||||
return conn.status != self.module.extensions.STATUS_READY # type: ignore
|
||||
|
|
|
@ -17,6 +17,7 @@ import threading
|
|||
import typing
|
||||
|
||||
from synapse.storage.engines import BaseDatabaseEngine
|
||||
from synapse.storage.types import Connection
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
import sqlite3 # noqa: F401
|
||||
|
@ -86,6 +87,7 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]):
|
|||
|
||||
db_conn.create_function("rank", 1, _rank)
|
||||
db_conn.execute("PRAGMA foreign_keys = ON;")
|
||||
db_conn.commit()
|
||||
|
||||
def is_deadlock(self, error):
|
||||
return False
|
||||
|
@ -105,6 +107,9 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]):
|
|||
"""
|
||||
return "%i.%i.%i" % self.module.sqlite_version_info
|
||||
|
||||
def in_transaction(self, conn: Connection) -> bool:
|
||||
return conn.in_transaction # type: ignore
|
||||
|
||||
|
||||
# Following functions taken from: https://github.com/coleifer/peewee
|
||||
|
||||
|
|
Loading…
Reference in New Issue