125 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			125 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Python
		
	
	
| # -*- coding: utf-8 -*-
 | |
| # Copyright 2017 Vector Creations Ltd
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #     http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| """A replication client for use by synapse workers.
 | |
| """
 | |
| 
 | |
| import logging
 | |
| from typing import TYPE_CHECKING, Dict
 | |
| 
 | |
| from twisted.internet.protocol import ReconnectingClientFactory
 | |
| 
 | |
| from synapse.replication.slave.storage._base import BaseSlavedStore
 | |
| from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
 | |
| 
 | |
| if TYPE_CHECKING:
 | |
|     from synapse.server import HomeServer
 | |
|     from synapse.replication.tcp.handler import ReplicationCommandHandler
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| class DirectTcpReplicationClientFactory(ReconnectingClientFactory):
 | |
|     """Factory for building connections to the master. Will reconnect if the
 | |
|     connection is lost.
 | |
| 
 | |
|     Accepts a handler that is passed to `ClientReplicationStreamProtocol`.
 | |
|     """
 | |
| 
 | |
|     initialDelay = 0.1
 | |
|     maxDelay = 1  # Try at least once every N seconds
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         hs: "HomeServer",
 | |
|         client_name: str,
 | |
|         command_handler: "ReplicationCommandHandler",
 | |
|     ):
 | |
|         self.client_name = client_name
 | |
|         self.command_handler = command_handler
 | |
|         self.server_name = hs.config.server_name
 | |
|         self.hs = hs
 | |
|         self._clock = hs.get_clock()  # As self.clock is defined in super class
 | |
| 
 | |
|         hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying)
 | |
| 
 | |
|     def startedConnecting(self, connector):
 | |
|         logger.info("Connecting to replication: %r", connector.getDestination())
 | |
| 
 | |
|     def buildProtocol(self, addr):
 | |
|         logger.info("Connected to replication: %r", addr)
 | |
|         return ClientReplicationStreamProtocol(
 | |
|             self.hs,
 | |
|             self.client_name,
 | |
|             self.server_name,
 | |
|             self._clock,
 | |
|             self.command_handler,
 | |
|         )
 | |
| 
 | |
|     def clientConnectionLost(self, connector, reason):
 | |
|         logger.error("Lost replication conn: %r", reason)
 | |
|         ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
 | |
| 
 | |
|     def clientConnectionFailed(self, connector, reason):
 | |
|         logger.error("Failed to connect to replication: %r", reason)
 | |
|         ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
 | |
| 
 | |
| 
 | |
| class ReplicationDataHandler:
 | |
|     """Handles incoming stream updates from replication.
 | |
| 
 | |
|     This instance notifies the slave data store about updates. Can be subclassed
 | |
|     to handle updates in additional ways.
 | |
|     """
 | |
| 
 | |
|     def __init__(self, store: BaseSlavedStore):
 | |
|         self.store = store
 | |
| 
 | |
|     async def on_rdata(self, stream_name: str, token: int, rows: list):
 | |
|         """Called to handle a batch of replication data with a given stream token.
 | |
| 
 | |
|         By default this just pokes the slave store. Can be overridden in subclasses to
 | |
|         handle more.
 | |
| 
 | |
|         Args:
 | |
|             stream_name (str): name of the replication stream for this batch of rows
 | |
|             token (int): stream token for this batch of rows
 | |
|             rows (list): a list of Stream.ROW_TYPE objects as returned by
 | |
|                 Stream.parse_row.
 | |
|         """
 | |
|         self.store.process_replication_rows(stream_name, token, rows)
 | |
| 
 | |
|     def get_streams_to_replicate(self) -> Dict[str, int]:
 | |
|         """Called when a new connection has been established and we need to
 | |
|         subscribe to streams.
 | |
| 
 | |
|         Returns:
 | |
|             map from stream name to the most recent update we have for
 | |
|             that stream (ie, the point we want to start replicating from)
 | |
|         """
 | |
|         args = self.store.stream_positions()
 | |
|         user_account_data = args.pop("user_account_data", None)
 | |
|         room_account_data = args.pop("room_account_data", None)
 | |
|         if user_account_data:
 | |
|             args["account_data"] = user_account_data
 | |
|         elif room_account_data:
 | |
|             args["account_data"] = room_account_data
 | |
|         return args
 | |
| 
 | |
|     async def on_position(self, stream_name: str, token: int):
 | |
|         self.store.process_replication_rows(stream_name, token, [])
 | |
| 
 | |
|     def on_remote_server_up(self, server: str):
 | |
|         """Called when get a new REMOTE_SERVER_UP command."""
 |