Compare commits

...

2 Commits

Author SHA1 Message Date
Erik Johnston 4873583022 Revert to previous (racey) handling of POSITION and RDATA, and move into linearizer 2020-04-03 15:51:29 +01:00
Erik Johnston 99e4a995b5 Fix up comments 2020-04-03 15:44:56 +01:00
2 changed files with 21 additions and 46 deletions

View File

@ -77,7 +77,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
class ReplicationDataHandler:
"""A replication data handler handles incoming stream updates from replication.
"""Handles incoming stream updates from replication.
This instance notifies the slave data store about updates. Can be subclassed
to handle updates in additional ways.

View File

@ -45,7 +45,8 @@ inbound_rdata_count = Counter(
class ReplicationCommandHandler:
"""Handles incoming commands from replication.
"""Handles incoming commands from replication as well as sending commands
back out to connections.
"""
def __init__(self, hs):
@ -92,8 +93,9 @@ class ReplicationCommandHandler:
raise
if cmd.token is None or stream_name not in self._streams_connected:
# I.e. this is part of a batch of updates for this stream. Batch
# until we get an update for the stream with a non None token
# I.e. either this is part of a batch of updates for this stream (in
# which case batch until we get an update for the stream with a non
# None token) or we're currently connecting so we queue up rows.
self._pending_batches.setdefault(stream_name, []).append(row)
else:
# Check if this is the last of a batch of updates
@ -119,6 +121,9 @@ class ReplicationCommandHandler:
logger.error("Got POSITION for unknown stream: %s", cmd.stream_name)
return
# We protect catching up with a linearizer in case the replication
# connection reconnects under us.
with await self._position_linearizer.queue(cmd.stream_name):
# We're about to go and catch up with the stream, so mark as connecting
# to stop RDATA being handled at the same time by removing stream from
# list of connected streams. We also clear any batched up RDATA from
@ -126,9 +131,6 @@ class ReplicationCommandHandler:
self._streams_connected.discard(cmd.stream_name)
self._pending_batches.clear()
# We protect catching up with a linearizer in case the replicaiton
# connection reconnects under us.
with await self._position_linearizer.queue(cmd.stream_name):
# Find where we previously streamed up to.
current_token = self._replication_data_handler.get_streams_to_replicate().get(
cmd.stream_name
@ -156,41 +158,14 @@ class ReplicationCommandHandler:
# We've now caught up to position sent to us, notify handler.
await self._replication_data_handler.on_position(cmd.stream_name, cmd.token)
self._streams_connected.add(cmd.stream_name)
# Handle any RDATA that came in while we were catching up.
rows = self._pending_batches.pop(cmd.stream_name, [])
if rows:
# We need to make sure we filter out RDATA rows with a token less
# than what we've caught up to. This is slightly fiddly because of
# "batched" rows which have a `None` token, indicating that they
# have the same token as the next row with a non-None token.
#
# We do this by walking the list backwards, first removing any RDATA
# rows that are part of an uncompeted batch, then taking rows while
# their token is either None or greater than where we've caught up
# to.
uncompleted_batch = []
unfinished_batch = True
filtered_rows = []
for row in reversed(rows):
if row.token is not None:
unfinished_batch = False
if cmd.token < row.token:
filtered_rows.append(row)
else:
break
elif unfinished_batch:
uncompleted_batch.append(row)
else:
filtered_rows.append(row)
await self._replication_data_handler.on_rdata(
cmd.stream_name, rows[-1].token, rows
)
filtered_rows.reverse()
uncompleted_batch.reverse()
if uncompleted_batch:
self._pending_batches[cmd.stream_name] = uncompleted_batch
await self.on_rdata(cmd.stream_name, rows[-1].token, filtered_rows)
self._streams_connected.add(cmd.stream_name)
async def on_SYNC(self, cmd: SyncCommand):
pass