Correctly handle RDATA that comes in while we catch up with a stream

pull/7185/head
Erik Johnston 2020-04-01 18:12:46 +01:00
parent ca9778cedf
commit 534bd868e5
1 changed files with 30 additions and 1 deletions

View File

@ -161,7 +161,36 @@ class ReplicationCommandHandler:
# Handle any RDATA that came in while we were catching up.
rows = self._pending_batches.pop(cmd.stream_name, [])
if rows:
await self.on_rdata(cmd.stream_name, rows[-1].token, rows)
# We need to make sure we filter out RDATA rows with a token less
# than what we've caught up to. This is slightly fiddly because of
# "batched" rows which have a `None` token, indicating that they
# have the same token as the next row with a non-None token.
#
# We do this by walking the list backwards, first removing any RDATA
# rows that are part of an uncompeted batch, then taking rows while
# their token is either None or greater than where we've caught up
# to.
uncompleted_batch = []
unfinished_batch = True
filtered_rows = []
for row in reversed(rows):
if row.token is not None:
unfinished_batch = False
if cmd.token < row.token:
filtered_rows.append(row)
else:
break
elif unfinished_batch:
uncompleted_batch.append(row)
else:
filtered_rows.append(row)
filtered_rows.reverse()
uncompleted_batch.reverse()
if uncompleted_batch:
self._pending_batches[cmd.stream_name] = uncompleted_batch
await self.on_rdata(cmd.stream_name, rows[-1].token, filtered_rows)
async def on_SYNC(self, cmd: SyncCommand):
pass