Merge pull request #4749 from matrix-org/erikj/replication_connection_backoff

Fix tightloop over connecting to replication server
pull/4758/head
Erik Johnston 2019-02-27 11:00:59 +00:00 committed by GitHub
commit 7590e9fa28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 43 additions and 6 deletions

1
changelog.d/4749.bugfix Normal file
View File

@ -0,0 +1 @@
Fix tightloop over connecting to replication server.

View File

@ -188,7 +188,9 @@ RDATA (S)
A single update in a stream
POSITION (S)
The position of the stream has been updated
The position of the stream has been updated. Sent to the client after all
missing updates for a stream have been sent to the client and they're now
up to date.
ERROR (S, C)
There was an error

View File

@ -39,7 +39,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
Accepts a handler that will be called when new data is available or data
is required.
"""
maxDelay = 5 # Try at least once every N seconds
maxDelay = 30 # Try at least once every N seconds
def __init__(self, hs, client_name, handler):
self.client_name = client_name
@ -54,7 +54,6 @@ class ReplicationClientFactory(ReconnectingClientFactory):
def buildProtocol(self, addr):
logger.info("Connected to replication: %r", addr)
self.resetDelay()
return ClientReplicationStreamProtocol(
self.client_name, self.server_name, self._clock, self.handler
)
@ -90,15 +89,18 @@ class ReplicationClientHandler(object):
# Used for tests.
self.awaiting_syncs = {}
# The factory used to create connections.
self.factory = None
def start_replication(self, hs):
"""Helper method to start a replication connection to the remote server
using TCP.
"""
client_name = hs.config.worker_name
factory = ReplicationClientFactory(hs, client_name, self)
self.factory = ReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
hs.get_reactor().connectTCP(host, port, factory)
hs.get_reactor().connectTCP(host, port, self.factory)
def on_rdata(self, stream_name, token, rows):
"""Called when we get new replication data. By default this just pokes
@ -140,6 +142,7 @@ class ReplicationClientHandler(object):
args["account_data"] = user_account_data
elif room_account_data:
args["account_data"] = room_account_data
return args
def get_currently_syncing_users(self):
@ -204,3 +207,14 @@ class ReplicationClientHandler(object):
for cmd in self.pending_commands:
connection.send_command(cmd)
self.pending_commands = []
def finished_connecting(self):
"""Called when we have successfully subscribed and caught up to all
streams we're interested in.
"""
logger.info("Finished connecting to server")
# We don't reset the delay any earlier as otherwise if there is a
# problem during start up we'll end up tight looping connecting to the
# server.
self.factory.resetDelay()

View File

@ -127,8 +127,11 @@ class RdataCommand(Command):
class PositionCommand(Command):
"""Sent by the client to tell the client the stream postition without
"""Sent by the server to tell the client the stream postition without
needing to send an RDATA.
Sent to the client after all missing updates for a stream have been sent
to the client and they're now up to date.
"""
NAME = "POSITION"

View File

@ -526,6 +526,11 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.server_name = server_name
self.handler = handler
# Set of stream names that have been subscribe to, but haven't yet
# caught up with. This is used to track when the client has been fully
# connected to the remote.
self.streams_connecting = set()
# Map of stream to batched updates. See RdataCommand for info on how
# batching works.
self.pending_batches = {}
@ -548,6 +553,10 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# We've now finished connecting to so inform the client handler
self.handler.update_connection(self)
# This will happen if we don't actually subscribe to any streams
if not self.streams_connecting:
self.handler.finished_connecting()
def on_SERVER(self, cmd):
if cmd.data != self.server_name:
logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data)
@ -577,6 +586,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
return self.handler.on_rdata(stream_name, cmd.token, rows)
def on_POSITION(self, cmd):
# When we get a `POSITION` command it means we've finished getting
# missing updates for the given stream, and are now up to date.
self.streams_connecting.discard(cmd.stream_name)
if not self.streams_connecting:
self.handler.finished_connecting()
return self.handler.on_position(cmd.stream_name, cmd.token)
def on_SYNC(self, cmd):
@ -593,6 +608,8 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.id(), stream_name, token
)
self.streams_connecting.add(stream_name)
self.send_command(ReplicateCommand(stream_name, token))
def on_connection_closed(self):