Move UserSyncCommand handling to CommandHandler
parent
27e4d2bdd5
commit
2190027cf9
|
@ -65,6 +65,8 @@ class ReplicationCommandHandler:
|
||||||
self._presence_handler = hs.get_presence_handler()
|
self._presence_handler = hs.get_presence_handler()
|
||||||
self._store = hs.get_datastore()
|
self._store = hs.get_datastore()
|
||||||
self._notifier = hs.get_notifier()
|
self._notifier = hs.get_notifier()
|
||||||
|
self._clock = hs.get_clock()
|
||||||
|
self._instance_id = hs.get_instance_id()
|
||||||
|
|
||||||
# Set of streams that we've caught up with.
|
# Set of streams that we've caught up with.
|
||||||
self._streams_connected = set() # type: Set[str]
|
self._streams_connected = set() # type: Set[str]
|
||||||
|
@ -296,6 +298,15 @@ class ReplicationCommandHandler:
|
||||||
if self._factory:
|
if self._factory:
|
||||||
self._factory.resetDelay()
|
self._factory.resetDelay()
|
||||||
|
|
||||||
|
# Tell the server if we have any users currently syncing (should only
|
||||||
|
# happen on synchrotrons)
|
||||||
|
currently_syncing = self.get_currently_syncing_users()
|
||||||
|
now = self._clock.time_msec()
|
||||||
|
for user_id in currently_syncing:
|
||||||
|
connection.send_command(
|
||||||
|
UserSyncCommand(self._instance_id, user_id, True, now)
|
||||||
|
)
|
||||||
|
|
||||||
def lost_connection(self, connection: AbstractConnection):
|
def lost_connection(self, connection: AbstractConnection):
|
||||||
"""Called when a connection is closed/lost.
|
"""Called when a connection is closed/lost.
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -72,7 +72,6 @@ from synapse.replication.tcp.commands import (
|
||||||
PingCommand,
|
PingCommand,
|
||||||
ReplicateCommand,
|
ReplicateCommand,
|
||||||
ServerCommand,
|
ServerCommand,
|
||||||
UserSyncCommand,
|
|
||||||
)
|
)
|
||||||
from synapse.types import Collection
|
from synapse.types import Collection
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
|
@ -454,8 +453,6 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
|
||||||
):
|
):
|
||||||
super().__init__(clock, command_handler)
|
super().__init__(clock, command_handler)
|
||||||
|
|
||||||
self.instance_id = hs.get_instance_id()
|
|
||||||
|
|
||||||
self.client_name = client_name
|
self.client_name = client_name
|
||||||
self.server_name = server_name
|
self.server_name = server_name
|
||||||
|
|
||||||
|
@ -466,13 +463,6 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
|
||||||
# Once we've connected subscribe to the necessary streams
|
# Once we've connected subscribe to the necessary streams
|
||||||
self.replicate()
|
self.replicate()
|
||||||
|
|
||||||
# Tell the server if we have any users currently syncing (should only
|
|
||||||
# happen on synchrotrons)
|
|
||||||
currently_syncing = self.command_handler.get_currently_syncing_users()
|
|
||||||
now = self.clock.time_msec()
|
|
||||||
for user_id in currently_syncing:
|
|
||||||
self.send_command(UserSyncCommand(self.instance_id, user_id, True, now))
|
|
||||||
|
|
||||||
async def on_SERVER(self, cmd):
|
async def on_SERVER(self, cmd):
|
||||||
if cmd.data != self.server_name:
|
if cmd.data != self.server_name:
|
||||||
logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data)
|
logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data)
|
||||||
|
|
Loading…
Reference in New Issue