Code review
parent
9be577c470
commit
b16b7efb5e
|
@ -461,11 +461,16 @@ class DatabasePool:
|
||||||
exception_callbacks: List[_CallbackListEntry],
|
exception_callbacks: List[_CallbackListEntry],
|
||||||
func: "Callable[..., R]",
|
func: "Callable[..., R]",
|
||||||
*args: Any,
|
*args: Any,
|
||||||
db_retry: bool = True,
|
|
||||||
**kwargs: Any
|
**kwargs: Any
|
||||||
) -> R:
|
) -> R:
|
||||||
"""Start a new database transaction with the given connection.
|
"""Start a new database transaction with the given connection.
|
||||||
|
|
||||||
|
Note: The given func may be called multiple times under certain
|
||||||
|
failure modes. This is normally fine when in a standard transaction,
|
||||||
|
but care must be taken if the connection is in `autocommit` mode that
|
||||||
|
the function will correctly handle being aborted and retried half way
|
||||||
|
through its execution.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
conn
|
conn
|
||||||
desc
|
desc
|
||||||
|
@ -473,8 +478,6 @@ class DatabasePool:
|
||||||
exception_callbacks
|
exception_callbacks
|
||||||
func
|
func
|
||||||
*args
|
*args
|
||||||
db_retry: Whether to retry the transaction by calling `func` again.
|
|
||||||
This should be disabled if connection is in autocommit mode.
|
|
||||||
**kwargs
|
**kwargs
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@ -508,7 +511,7 @@ class DatabasePool:
|
||||||
transaction_logger.warning(
|
transaction_logger.warning(
|
||||||
"[TXN OPERROR] {%s} %s %d/%d", name, e, i, N,
|
"[TXN OPERROR] {%s} %s %d/%d", name, e, i, N,
|
||||||
)
|
)
|
||||||
if db_retry and i < N:
|
if i < N:
|
||||||
i += 1
|
i += 1
|
||||||
try:
|
try:
|
||||||
conn.rollback()
|
conn.rollback()
|
||||||
|
@ -521,7 +524,7 @@ class DatabasePool:
|
||||||
transaction_logger.warning(
|
transaction_logger.warning(
|
||||||
"[TXN DEADLOCK] {%s} %d/%d", name, i, N
|
"[TXN DEADLOCK] {%s} %d/%d", name, i, N
|
||||||
)
|
)
|
||||||
if db_retry and i < N:
|
if i < N:
|
||||||
i += 1
|
i += 1
|
||||||
try:
|
try:
|
||||||
conn.rollback()
|
conn.rollback()
|
||||||
|
@ -600,7 +603,9 @@ class DatabasePool:
|
||||||
i.e. outside of a transaction. This is useful for transaction
|
i.e. outside of a transaction. This is useful for transaction
|
||||||
that are only a single query. Currently only affects postgres.
|
that are only a single query. Currently only affects postgres.
|
||||||
WARNING: This means that if func fails half way through then
|
WARNING: This means that if func fails half way through then
|
||||||
the changes will *not* be rolled back.
|
the changes will *not* be rolled back. `func` may also get
|
||||||
|
called multiple times if the transaction is retried, so must
|
||||||
|
correctly handle that case.
|
||||||
|
|
||||||
args: positional args to pass to `func`
|
args: positional args to pass to `func`
|
||||||
kwargs: named args to pass to `func`
|
kwargs: named args to pass to `func`
|
||||||
|
@ -623,7 +628,6 @@ class DatabasePool:
|
||||||
func,
|
func,
|
||||||
*args,
|
*args,
|
||||||
db_autocommit=db_autocommit,
|
db_autocommit=db_autocommit,
|
||||||
db_retry=not db_autocommit, # Don't retry in auto commit mode.
|
|
||||||
**kwargs
|
**kwargs
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -108,7 +108,6 @@ class PostgresEngine(BaseDatabaseEngine):
|
||||||
db_conn.set_isolation_level(
|
db_conn.set_isolation_level(
|
||||||
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
|
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
|
||||||
)
|
)
|
||||||
db_conn.set_session(self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ)
|
|
||||||
|
|
||||||
# Set the bytea output to escape, vs the default of hex
|
# Set the bytea output to escape, vs the default of hex
|
||||||
cursor = db_conn.cursor()
|
cursor = db_conn.cursor()
|
||||||
|
|
|
@ -24,6 +24,7 @@ from typing_extensions import Deque
|
||||||
|
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.storage.database import DatabasePool, LoggingTransaction
|
from synapse.storage.database import DatabasePool, LoggingTransaction
|
||||||
|
from synapse.storage.types import Cursor
|
||||||
from synapse.storage.util.sequence import PostgresSequenceGenerator
|
from synapse.storage.util.sequence import PostgresSequenceGenerator
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -548,7 +549,7 @@ class MultiWriterIdGenerator:
|
||||||
# do.
|
# do.
|
||||||
break
|
break
|
||||||
|
|
||||||
def _update_stream_positions_table_txn(self, txn: LoggingTransaction):
|
def _update_stream_positions_table_txn(self, txn: Cursor):
|
||||||
"""Update the `stream_positions` table with newly persisted position.
|
"""Update the `stream_positions` table with newly persisted position.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue