diff --git a/synapse/storage/database.py b/synapse/storage/database.py index fc4b85b9b6..94570d1f10 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -461,11 +461,16 @@ class DatabasePool: exception_callbacks: List[_CallbackListEntry], func: "Callable[..., R]", *args: Any, - db_retry: bool = True, **kwargs: Any ) -> R: """Start a new database transaction with the given connection. + Note: The given func may be called multiple times under certain + failure modes. This is normally fine when in a standard transaction, + but care must be taken if the connection is in `autocommit` mode that + the function will correctly handle being aborted and retried half way + through its execution. + Args: conn desc @@ -473,8 +478,6 @@ class DatabasePool: exception_callbacks func *args - db_retry: Whether to retry the transaction by calling `func` again. - This should be disabled if connection is in autocommit mode. **kwargs """ @@ -508,7 +511,7 @@ class DatabasePool: transaction_logger.warning( "[TXN OPERROR] {%s} %s %d/%d", name, e, i, N, ) - if db_retry and i < N: + if i < N: i += 1 try: conn.rollback() @@ -521,7 +524,7 @@ class DatabasePool: transaction_logger.warning( "[TXN DEADLOCK] {%s} %d/%d", name, i, N ) - if db_retry and i < N: + if i < N: i += 1 try: conn.rollback() @@ -600,7 +603,9 @@ class DatabasePool: i.e. outside of a transaction. This is useful for transaction that are only a single query. Currently only affects postgres. WARNING: This means that if func fails half way through then - the changes will *not* be rolled back. + the changes will *not* be rolled back. `func` may also get + called multiple times if the transaction is retried, so must + correctly handle that case. args: positional args to pass to `func` kwargs: named args to pass to `func` @@ -623,7 +628,6 @@ class DatabasePool: func, *args, db_autocommit=db_autocommit, - db_retry=not db_autocommit, # Don't retry in auto commit mode. **kwargs ) diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index a8563faefd..e30e19d203 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -108,7 +108,6 @@ class PostgresEngine(BaseDatabaseEngine): db_conn.set_isolation_level( self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ ) - db_conn.set_session(self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ) # Set the bytea output to escape, vs the default of hex cursor = db_conn.cursor() diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index bd32eea9d4..d7e40aaa8b 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -24,6 +24,7 @@ from typing_extensions import Deque from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.database import DatabasePool, LoggingTransaction +from synapse.storage.types import Cursor from synapse.storage.util.sequence import PostgresSequenceGenerator logger = logging.getLogger(__name__) @@ -548,7 +549,7 @@ class MultiWriterIdGenerator: # do. break - def _update_stream_positions_table_txn(self, txn: LoggingTransaction): + def _update_stream_positions_table_txn(self, txn: Cursor): """Update the `stream_positions` table with newly persisted position. """