Fix federation stall on concurrent access errors (#9639)

pull/9674/head
Jonathan de Jong 2021-03-23 14:52:30 +01:00 committed by GitHub
parent 4ecba9bd5c
commit 0caf2a338e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 10 additions and 36 deletions

1
changelog.d/9639.bugfix Normal file
View File

@ -0,0 +1 @@
Fix bug where federation sending can stall due to `concurrent access` database exceptions when it falls behind.

View File

@ -22,7 +22,6 @@ from canonicaljson import encode_canonical_json
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import JsonDict
from synapse.util.caches.expiringcache import ExpiringCache
@ -312,49 +311,23 @@ class TransactionStore(TransactionWorkerStore):
stream_ordering: the stream_ordering of the event
"""
return await self.db_pool.runInteraction(
"store_destination_rooms_entries",
self._store_destination_rooms_entries_txn,
destinations,
room_id,
stream_ordering,
await self.db_pool.simple_upsert_many(
table="destinations",
key_names=("destination",),
key_values=[(d,) for d in destinations],
value_names=[],
value_values=[],
desc="store_destination_rooms_entries_dests",
)
def _store_destination_rooms_entries_txn(
self,
txn: LoggingTransaction,
destinations: Iterable[str],
room_id: str,
stream_ordering: int,
) -> None:
# ensure we have a `destinations` row for this destination, as there is
# a foreign key constraint.
if isinstance(self.database_engine, PostgresEngine):
q = """
INSERT INTO destinations (destination)
VALUES (?)
ON CONFLICT DO NOTHING;
"""
elif isinstance(self.database_engine, Sqlite3Engine):
q = """
INSERT OR IGNORE INTO destinations (destination)
VALUES (?);
"""
else:
raise RuntimeError("Unknown database engine")
txn.execute_batch(q, ((destination,) for destination in destinations))
rows = [(destination, room_id) for destination in destinations]
self.db_pool.simple_upsert_many_txn(
txn,
await self.db_pool.simple_upsert_many(
table="destination_rooms",
key_names=("destination", "room_id"),
key_values=rows,
value_names=["stream_ordering"],
value_values=[(stream_ordering,)] * len(rows),
desc="store_destination_rooms_entries_rooms",
)
async def get_destination_last_successful_stream_ordering(