From 6ac1ecad952f695ef8b40b8ae488e2d8f0c1ee06 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Mar 2020 13:30:57 +0100 Subject: [PATCH] Add linearizer to protect stream catchup --- synapse/replication/tcp/handler.py | 49 ++++++++++++++---------- tests/replication/slave/storage/_base.py | 6 ++- 2 files changed, 32 insertions(+), 23 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 0b42339142..b3c33370a0 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -35,6 +35,7 @@ from synapse.replication.tcp.commands import ( UserSyncCommand, ) from synapse.replication.tcp.streams import STREAMS_MAP, Stream +from synapse.util.async_helpers import Linearizer logger = logging.getLogger(__name__) @@ -60,6 +61,8 @@ class ReplicationCommandHandler: stream.NAME: stream(hs) for stream in STREAMS_MAP.values() } # type: Dict[str, Stream] + self._position_linearizer = Linearizer("replication_position") + # Map of stream to batched updates. See RdataCommand for info on how # batching works. self.pending_batches = {} # type: Dict[str, List[Any]] @@ -122,31 +125,35 @@ class ReplicationCommandHandler: # to stop RDATA being handled at the same time. self.streams_connecting.add(cmd.stream_name) - # Find where we previously streamed up to. - current_token = self.replication_data_handler.get_streams_to_replicate().get( - cmd.stream_name - ) - if current_token is None: - logger.warning( - "Got POSITION for stream we're not subscribed to: %s", cmd.stream_name + # We protect catching up with a linearizer in case the replicaiton + # connection reconnects under us. + with await self._position_linearizer.queue(cmd.stream_name): + # Find where we previously streamed up to. + current_token = self.replication_data_handler.get_streams_to_replicate().get( + cmd.stream_name ) - return - - # Fetch all updates between then and now. - limited = True - while limited: - updates, current_token, limited = await stream.get_updates_since( - current_token, cmd.token - ) - if updates: - await self.on_rdata( + if current_token is None: + logger.warning( + "Got POSITION for stream we're not subscribed to: %s", cmd.stream_name, - current_token, - [stream.parse_row(update[1]) for update in updates], ) + return - # We've now caught up to position sent to us, notify handler. - await self.replication_data_handler.on_position(cmd.stream_name, cmd.token) + # Fetch all updates between then and now. + limited = True + while limited: + updates, current_token, limited = await stream.get_updates_since( + current_token, cmd.token + ) + if updates: + await self.on_rdata( + cmd.stream_name, + current_token, + [stream.parse_row(update[1]) for update in updates], + ) + + # We've now caught up to position sent to us, notify handler. + await self.replication_data_handler.on_position(cmd.stream_name, cmd.token) self.streams_connecting.discard(cmd.stream_name) diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 1deec91e07..14be64b3fd 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -15,9 +15,11 @@ from mock import Mock, NonCallableMock -from synapse.replication.tcp.client import ReplicationClientFactory +from synapse.replication.tcp.client import ( + ReplicationClientFactory, + ReplicationDataHandler, +) from synapse.replication.tcp.handler import ReplicationCommandHandler -from synapse.replication.tcp.client import ReplicationDataHandler from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.storage.database import make_conn