Add/fixup comments
parent
dfb4d01ba6
commit
84ac795e04
|
@ -99,15 +99,21 @@ class ReplicationCommandHandler:
|
|||
# missing RDATA.
|
||||
with await self._position_linearizer.queue(cmd.stream_name):
|
||||
if stream_name not in self._streams_connected:
|
||||
# If the stream isn't marked as connected then we haven't seen a
|
||||
# `POSITION` command yet, and so we may have missed some rows.
|
||||
# Let's drop the row for now, on the assumption we'll receive a
|
||||
# `POSITION` soon and we'll catch up correctly then.
|
||||
logger.warning(
|
||||
"Discarding RDATA for unconnected stream %s", stream_name
|
||||
"Discarding RDATA for unconnected stream %s -> ",
|
||||
stream_name,
|
||||
cmd.token,
|
||||
)
|
||||
return
|
||||
|
||||
if cmd.token is None:
|
||||
# I.e. either this is part of a batch of updates for this stream (in
|
||||
# I.e. this is part of a batch of updates for this stream (in
|
||||
# which case batch until we get an update for the stream with a non
|
||||
# None token) or we're currently connecting so we queue up rows.
|
||||
# None token).
|
||||
self._pending_batches.setdefault(stream_name, []).append(row)
|
||||
else:
|
||||
# Check if this is the last of a batch of updates
|
||||
|
|
Loading…
Reference in New Issue