Fix up comments
parent
534bd868e5
commit
99e4a995b5
|
@ -77,7 +77,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
|
||||||
|
|
||||||
|
|
||||||
class ReplicationDataHandler:
|
class ReplicationDataHandler:
|
||||||
"""A replication data handler handles incoming stream updates from replication.
|
"""Handles incoming stream updates from replication.
|
||||||
|
|
||||||
This instance notifies the slave data store about updates. Can be subclassed
|
This instance notifies the slave data store about updates. Can be subclassed
|
||||||
to handle updates in additional ways.
|
to handle updates in additional ways.
|
||||||
|
|
|
@ -45,7 +45,8 @@ inbound_rdata_count = Counter(
|
||||||
|
|
||||||
|
|
||||||
class ReplicationCommandHandler:
|
class ReplicationCommandHandler:
|
||||||
"""Handles incoming commands from replication.
|
"""Handles incoming commands from replication as well as sending commands
|
||||||
|
back out to connections.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
|
@ -92,8 +93,9 @@ class ReplicationCommandHandler:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
if cmd.token is None or stream_name not in self._streams_connected:
|
if cmd.token is None or stream_name not in self._streams_connected:
|
||||||
# I.e. this is part of a batch of updates for this stream. Batch
|
# I.e. either this is part of a batch of updates for this stream (in
|
||||||
# until we get an update for the stream with a non None token
|
# which case batch until we get an update for the stream with a non
|
||||||
|
# None token) or we're currently connecting so we queue up rows.
|
||||||
self._pending_batches.setdefault(stream_name, []).append(row)
|
self._pending_batches.setdefault(stream_name, []).append(row)
|
||||||
else:
|
else:
|
||||||
# Check if this is the last of a batch of updates
|
# Check if this is the last of a batch of updates
|
||||||
|
@ -126,7 +128,7 @@ class ReplicationCommandHandler:
|
||||||
self._streams_connected.discard(cmd.stream_name)
|
self._streams_connected.discard(cmd.stream_name)
|
||||||
self._pending_batches.clear()
|
self._pending_batches.clear()
|
||||||
|
|
||||||
# We protect catching up with a linearizer in case the replicaiton
|
# We protect catching up with a linearizer in case the replication
|
||||||
# connection reconnects under us.
|
# connection reconnects under us.
|
||||||
with await self._position_linearizer.queue(cmd.stream_name):
|
with await self._position_linearizer.queue(cmd.stream_name):
|
||||||
# Find where we previously streamed up to.
|
# Find where we previously streamed up to.
|
||||||
|
|
Loading…
Reference in New Issue