# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """A replication client for use by synapse workers. """ from twisted.internet import reactor, defer from twisted.internet.protocol import ReconnectingClientFactory from .commands import ( FederationAckCommand, UserSyncCommand, RemovePusherCommand, InvalidateCacheCommand, UserIpCommand, ) from .protocol import ClientReplicationStreamProtocol import logging logger = logging.getLogger(__name__) class ReplicationClientFactory(ReconnectingClientFactory): """Factory for building connections to the master. Will reconnect if the connection is lost. Accepts a handler that will be called when new data is available or data is required. """ maxDelay = 5 # Try at least once every N seconds def __init__(self, hs, client_name, handler): self.client_name = client_name self.handler = handler self.server_name = hs.config.server_name self._clock = hs.get_clock() # As self.clock is defined in super class reactor.addSystemEventTrigger("before", "shutdown", self.stopTrying) def startedConnecting(self, connector): logger.info("Connecting to replication: %r", connector.getDestination()) def buildProtocol(self, addr): logger.info("Connected to replication: %r", addr) self.resetDelay() return ClientReplicationStreamProtocol( self.client_name, self.server_name, self._clock, self.handler ) def clientConnectionLost(self, connector, reason): logger.error("Lost replication conn: %r", reason) ReconnectingClientFactory.clientConnectionLost(self, connector, reason) def clientConnectionFailed(self, connector, reason): logger.error("Failed to connect to replication: %r", reason) ReconnectingClientFactory.clientConnectionFailed( self, connector, reason ) class ReplicationClientHandler(object): """A base handler that can be passed to the ReplicationClientFactory. By default proxies incoming replication data to the SlaveStore. """ def __init__(self, store): self.store = store # The current connection. None if we are currently (re)connecting self.connection = None # Any pending commands to be sent once a new connection has been # established self.pending_commands = [] # Map from string -> deferred, to wake up when receiveing a SYNC with # the given string. # Used for tests. self.awaiting_syncs = {} def start_replication(self, hs): """Helper method to start a replication connection to the remote server using TCP. """ client_name = hs.config.worker_name factory = ReplicationClientFactory(hs, client_name, self) host = hs.config.worker_replication_host port = hs.config.worker_replication_port reactor.connectTCP(host, port, factory) def on_rdata(self, stream_name, token, rows): """Called when we get new replication data. By default this just pokes the slave store. Can be overriden in subclasses to handle more. """ logger.info("Received rdata %s -> %s", stream_name, token) self.store.process_replication_rows(stream_name, token, rows) def on_position(self, stream_name, token): """Called when we get new position data. By default this just pokes the slave store. Can be overriden in subclasses to handle more. """ self.store.process_replication_rows(stream_name, token, []) def on_sync(self, data): """When we received a SYNC we wake up any deferreds that were waiting for the sync with the given data. Used by tests. """ d = self.awaiting_syncs.pop(data, None) if d: d.callback(data) def get_streams_to_replicate(self): """Called when a new connection has been established and we need to subscribe to streams. Returns a dictionary of stream name to token. """ args = self.store.stream_positions() user_account_data = args.pop("user_account_data", None) room_account_data = args.pop("room_account_data", None) if user_account_data: args["account_data"] = user_account_data elif room_account_data: args["account_data"] = room_account_data return args def get_currently_syncing_users(self): """Get the list of currently syncing users (if any). This is called when a connection has been established and we need to send the currently syncing users. (Overriden by the synchrotron's only) """ return [] def send_command(self, cmd): """Send a command to master (when we get establish a connection if we don't have one already.) """ if self.connection: self.connection.send_command(cmd) else: logger.warn("Queuing command as not connected: %r", cmd.NAME) self.pending_commands.append(cmd) def send_federation_ack(self, token): """Ack data for the federation stream. This allows the master to drop data stored purely in memory. """ self.send_command(FederationAckCommand(token)) def send_user_sync(self, user_id, is_syncing, last_sync_ms): """Poke the master that a user has started/stopped syncing. """ self.send_command(UserSyncCommand(user_id, is_syncing, last_sync_ms)) def send_remove_pusher(self, app_id, push_key, user_id): """Poke the master to remove a pusher for a user """ cmd = RemovePusherCommand(app_id, push_key, user_id) self.send_command(cmd) def send_invalidate_cache(self, cache_func, keys): """Poke the master to invalidate a cache. """ cmd = InvalidateCacheCommand(cache_func.__name__, keys) self.send_command(cmd) def send_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen): """Tell the master that the user made a request. """ cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen) self.send_command(cmd) def await_sync(self, data): """Returns a deferred that is resolved when we receive a SYNC command with given data. Used by tests. """ return self.awaiting_syncs.setdefault(data, defer.Deferred()) def update_connection(self, connection): """Called when a connection has been established (or lost with None). """ self.connection = connection if connection: for cmd in self.pending_commands: connection.send_command(cmd) self.pending_commands = []