From d8c2c96dfbb730736f5f549ea2bdf2ad051db820 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Oct 2020 11:06:18 +0100 Subject: [PATCH] Fix uncommitted connections leaking to pool when first connected --- synapse/storage/database.py | 7 +++++++ synapse/storage/engines/_base.py | 6 ++++++ synapse/storage/engines/postgres.py | 7 ++++++- synapse/storage/engines/sqlite.py | 5 +++++ 4 files changed, 24 insertions(+), 1 deletion(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 0d9d9b7cc0..b36ea7ccdf 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -633,6 +633,13 @@ class DatabasePool: start_time = monotonic_time() def inner_func(conn, *args, **kwargs): + # We shouldn't be in a transaction. If we are then something + # somewhere hasn't committed after doing work. (This is likely only + # possible during startup, as `run*` will ensure changes are + # committed/rolled back before putting the connection back in the + # pool). + assert not self.engine.in_transaction(conn) + with LoggingContext("runWithConnection", parent_context) as context: sched_duration_sec = monotonic_time() - start_time sql_scheduling_timer.observe(sched_duration_sec) diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 908cbc79e3..314d2e8d80 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -97,3 +97,9 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta): """Gets a string giving the server version. For example: '3.22.0' """ ... + + @abc.abstractmethod + def in_transaction(self, conn: Connection) -> bool: + """Whether the connection is currently in a transaction. + """ + ... diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index ff39281f85..655f2a8b5f 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -15,7 +15,8 @@ import logging -from ._base import BaseDatabaseEngine, IncorrectDatabaseSetup +from synapse.storage.engines._base import BaseDatabaseEngine, IncorrectDatabaseSetup +from synapse.storage.types import Connection logger = logging.getLogger(__name__) @@ -119,6 +120,7 @@ class PostgresEngine(BaseDatabaseEngine): cursor.execute("SET synchronous_commit TO OFF") cursor.close() + db_conn.commit() @property def can_native_upsert(self): @@ -171,3 +173,6 @@ class PostgresEngine(BaseDatabaseEngine): return "%i.%i" % (numver / 10000, numver % 10000) else: return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100) + + def in_transaction(self, conn: Connection) -> bool: + return conn.status != self.module.extensions.STATUS_READY # type: ignore diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 8a0f8c89d1..00bced3618 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -17,6 +17,7 @@ import threading import typing from synapse.storage.engines import BaseDatabaseEngine +from synapse.storage.types import Connection if typing.TYPE_CHECKING: import sqlite3 # noqa: F401 @@ -86,6 +87,7 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]): db_conn.create_function("rank", 1, _rank) db_conn.execute("PRAGMA foreign_keys = ON;") + db_conn.commit() def is_deadlock(self, error): return False @@ -105,6 +107,9 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]): """ return "%i.%i.%i" % self.module.sqlite_version_info + def in_transaction(self, conn: Connection) -> bool: + return conn.in_transaction # type: ignore + # Following functions taken from: https://github.com/coleifer/peewee