From e6c25e085862a1ff45151546a3c82a595d3aa915 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Mar 2020 13:53:08 +0100 Subject: [PATCH] Allow ReplicationCommandHandler to have multiple connections This will allow the server replication component to use ReplicationCommandHandler. --- synapse/replication/tcp/handler.py | 43 ++++++++++++++++++----------- synapse/replication/tcp/protocol.py | 5 ++-- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 12a1cfd6d1..3fe736bb77 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -69,8 +69,8 @@ class ReplicationCommandHandler: # The factory used to create connections. self._factory = None # type: Optional[ReplicationClientFactory] - # The current connection. None if we are currently (re)connecting - self._connection = None + # The currently connected connections. + self._connections = [] def start_replication(self, hs): """Helper method to start a replication connection to the remote server @@ -181,29 +181,40 @@ class ReplicationCommandHandler: """ return self._presence_handler.get_currently_syncing_users() - def update_connection(self, connection): - """Called when a connection has been established (or lost with None). + def new_connection(self, connection): + """Called when we have a new connection. """ - self._connection = connection + self._connections.append(connection) - def finished_connecting(self): - """Called when we have successfully subscribed and caught up to all - streams we're interested in. - """ - logger.info("Finished connecting to server") - - # We don't reset the delay any earlier as otherwise if there is a - # problem during start up we'll end up tight looping connecting to the - # server. + # If we're using a ReplicationClientFactory then we reset the connection + # delay now. We don't reset the delay any earlier as otherwise if there + # is a problem during start up we'll end up tight looping connecting to + # the server. if self._factory: self._factory.resetDelay() + def lost_connection(self, connection): + """Called when a connection is closed/lost. + """ + try: + self._connections.remove(connection) + except ValueError: + pass + + def connected(self) -> bool: + """Do we have any replication connections open? + + Used to no-op if nothing is connected. + """ + return bool(self._connections) + def send_command(self, cmd: Command): """Send a command to master (when we get establish a connection if we don't have one already.) """ - if self._connection: - self._connection.send_command(cmd) + if self._connections: + for connection in self._connections: + connection.send_command(cmd) else: logger.warning("Dropping command as not connected: %r", cmd.NAME) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index f2a37f568e..a412ae376f 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -508,8 +508,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): self.send_command(UserSyncCommand(self.instance_id, user_id, True, now)) # We've now finished connecting to so inform the client handler - self.handler.update_connection(self) - self.handler.finished_connecting() + self.handler.new_connection(self) async def handle_command(self, cmd: Command): """Handle a command we have received over the replication stream. @@ -552,7 +551,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): def on_connection_closed(self): BaseReplicationStreamProtocol.on_connection_closed(self) - self.handler.update_connection(None) + self.handler.lost_connection(self) # The following simply registers metrics for the replication connections