diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 452fe9ef70..842dd8d720 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -99,15 +99,21 @@ class ReplicationCommandHandler: # missing RDATA. with await self._position_linearizer.queue(cmd.stream_name): if stream_name not in self._streams_connected: + # If the stream isn't marked as connected then we haven't seen a + # `POSITION` command yet, and so we may have missed some rows. + # Let's drop the row for now, on the assumption we'll receive a + # `POSITION` soon and we'll catch up correctly then. logger.warning( - "Discarding RDATA for unconnected stream %s", stream_name + "Discarding RDATA for unconnected stream %s -> ", + stream_name, + cmd.token, ) return if cmd.token is None: - # I.e. either this is part of a batch of updates for this stream (in + # I.e. this is part of a batch of updates for this stream (in # which case batch until we get an update for the stream with a non - # None token) or we're currently connecting so we queue up rows. + # None token). self._pending_batches.setdefault(stream_name, []).append(row) else: # Check if this is the last of a batch of updates