From 2190027cf99c44530258bb6f7d96243e4691caed Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 15:37:25 +0100 Subject: [PATCH] Move UserSyncCommand handling to CommandHandler --- synapse/replication/tcp/handler.py | 11 +++++++++++ synapse/replication/tcp/protocol.py | 10 ---------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index ce14c6f2c4..8ec0119697 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -65,6 +65,8 @@ class ReplicationCommandHandler: self._presence_handler = hs.get_presence_handler() self._store = hs.get_datastore() self._notifier = hs.get_notifier() + self._clock = hs.get_clock() + self._instance_id = hs.get_instance_id() # Set of streams that we've caught up with. self._streams_connected = set() # type: Set[str] @@ -296,6 +298,15 @@ class ReplicationCommandHandler: if self._factory: self._factory.resetDelay() + # Tell the server if we have any users currently syncing (should only + # happen on synchrotrons) + currently_syncing = self.get_currently_syncing_users() + now = self._clock.time_msec() + for user_id in currently_syncing: + connection.send_command( + UserSyncCommand(self._instance_id, user_id, True, now) + ) + def lost_connection(self, connection: AbstractConnection): """Called when a connection is closed/lost. """ diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index bb12d6a14b..eab796634e 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -72,7 +72,6 @@ from synapse.replication.tcp.commands import ( PingCommand, ReplicateCommand, ServerCommand, - UserSyncCommand, ) from synapse.types import Collection from synapse.util import Clock @@ -454,8 +453,6 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): ): super().__init__(clock, command_handler) - self.instance_id = hs.get_instance_id() - self.client_name = client_name self.server_name = server_name @@ -466,13 +463,6 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): # Once we've connected subscribe to the necessary streams self.replicate() - # Tell the server if we have any users currently syncing (should only - # happen on synchrotrons) - currently_syncing = self.command_handler.get_currently_syncing_users() - now = self.clock.time_msec() - for user_id in currently_syncing: - self.send_command(UserSyncCommand(self.instance_id, user_id, True, now)) - async def on_SERVER(self, cmd): if cmd.data != self.server_name: logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data)