document the REPLICATE command a bit better (#6305)

since I found myself wonder how it works
pull/6326/head
Richard van der Hoff 2019-11-04 12:40:18 +00:00 committed by GitHub
parent f496d25877
commit cc6243b4c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 110 additions and 10 deletions

1
changelog.d/6305.misc Normal file
View File

@ -0,0 +1 @@
Add some documentation about worker replication.

View File

@ -199,7 +199,20 @@ client (C):
#### REPLICATE (C) #### REPLICATE (C)
Asks the server to replicate a given stream Asks the server to replicate a given stream. The syntax is:
```
REPLICATE <stream_name> <token>
```
Where `<token>` may be either:
* a numeric stream_id to stream updates since (exclusive)
* `NOW` to stream all subsequent updates.
The `<stream_name>` is the name of a replication stream to subscribe
to (see [here](../synapse/replication/tcp/streams/_base.py) for a list
of streams). It can also be `ALL` to subscribe to all known streams,
in which case the `<token>` must be set to `NOW`.
#### USER_SYNC (C) #### USER_SYNC (C)

View File

@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
from typing import Dict
import six import six
@ -44,7 +45,14 @@ class BaseSlavedStore(SQLBaseStore):
self.hs = hs self.hs = hs
def stream_positions(self): def stream_positions(self) -> Dict[str, int]:
"""
Get the current positions of all the streams this store wants to subscribe to
Returns:
map from stream name to the most recent update we have for
that stream (ie, the point we want to start replicating from)
"""
pos = {} pos = {}
if self._cache_id_gen: if self._cache_id_gen:
pos["caches"] = self._cache_id_gen.get_current_token() pos["caches"] = self._cache_id_gen.get_current_token()

View File

@ -16,10 +16,17 @@
""" """
import logging import logging
from typing import Dict
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.protocol import ReconnectingClientFactory from twisted.internet.protocol import ReconnectingClientFactory
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.tcp.protocol import (
AbstractReplicationClientHandler,
ClientReplicationStreamProtocol,
)
from .commands import ( from .commands import (
FederationAckCommand, FederationAckCommand,
InvalidateCacheCommand, InvalidateCacheCommand,
@ -27,7 +34,6 @@ from .commands import (
UserIpCommand, UserIpCommand,
UserSyncCommand, UserSyncCommand,
) )
from .protocol import ClientReplicationStreamProtocol
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -42,7 +48,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
maxDelay = 30 # Try at least once every N seconds maxDelay = 30 # Try at least once every N seconds
def __init__(self, hs, client_name, handler): def __init__(self, hs, client_name, handler: AbstractReplicationClientHandler):
self.client_name = client_name self.client_name = client_name
self.handler = handler self.handler = handler
self.server_name = hs.config.server_name self.server_name = hs.config.server_name
@ -68,13 +74,13 @@ class ReplicationClientFactory(ReconnectingClientFactory):
ReconnectingClientFactory.clientConnectionFailed(self, connector, reason) ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
class ReplicationClientHandler(object): class ReplicationClientHandler(AbstractReplicationClientHandler):
"""A base handler that can be passed to the ReplicationClientFactory. """A base handler that can be passed to the ReplicationClientFactory.
By default proxies incoming replication data to the SlaveStore. By default proxies incoming replication data to the SlaveStore.
""" """
def __init__(self, store): def __init__(self, store: BaseSlavedStore):
self.store = store self.store = store
# The current connection. None if we are currently (re)connecting # The current connection. None if we are currently (re)connecting
@ -138,11 +144,13 @@ class ReplicationClientHandler(object):
if d: if d:
d.callback(data) d.callback(data)
def get_streams_to_replicate(self): def get_streams_to_replicate(self) -> Dict[str, int]:
"""Called when a new connection has been established and we need to """Called when a new connection has been established and we need to
subscribe to streams. subscribe to streams.
Returns a dictionary of stream name to token. Returns:
map from stream name to the most recent update we have for
that stream (ie, the point we want to start replicating from)
""" """
args = self.store.stream_positions() args = self.store.stream_positions()
user_account_data = args.pop("user_account_data", None) user_account_data = args.pop("user_account_data", None)

View File

@ -48,7 +48,7 @@ indicate which side is sending, these are *not* included on the wire::
> ERROR server stopping > ERROR server stopping
* connection closed by server * * connection closed by server *
""" """
import abc
import fcntl import fcntl
import logging import logging
import struct import struct
@ -65,6 +65,7 @@ from twisted.python.failure import Failure
from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import LaterGauge from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import Clock
from synapse.util.stringutils import random_string from synapse.util.stringutils import random_string
from .commands import ( from .commands import (
@ -558,11 +559,80 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.streamer.lost_connection(self) self.streamer.lost_connection(self)
class AbstractReplicationClientHandler(metaclass=abc.ABCMeta):
"""
The interface for the handler that should be passed to
ClientReplicationStreamProtocol
"""
@abc.abstractmethod
def on_rdata(self, stream_name, token, rows):
"""Called to handle a batch of replication data with a given stream token.
Args:
stream_name (str): name of the replication stream for this batch of rows
token (int): stream token for this batch of rows
rows (list): a list of Stream.ROW_TYPE objects as returned by
Stream.parse_row.
Returns:
Deferred|None
"""
raise NotImplementedError()
@abc.abstractmethod
def on_position(self, stream_name, token):
"""Called when we get new position data."""
raise NotImplementedError()
@abc.abstractmethod
def on_sync(self, data):
"""Called when get a new SYNC command."""
raise NotImplementedError()
@abc.abstractmethod
def get_streams_to_replicate(self):
"""Called when a new connection has been established and we need to
subscribe to streams.
Returns:
map from stream name to the most recent update we have for
that stream (ie, the point we want to start replicating from)
"""
raise NotImplementedError()
@abc.abstractmethod
def get_currently_syncing_users(self):
"""Get the list of currently syncing users (if any). This is called
when a connection has been established and we need to send the
currently syncing users."""
raise NotImplementedError()
@abc.abstractmethod
def update_connection(self, connection):
"""Called when a connection has been established (or lost with None).
"""
raise NotImplementedError()
@abc.abstractmethod
def finished_connecting(self):
"""Called when we have successfully subscribed and caught up to all
streams we're interested in.
"""
raise NotImplementedError()
class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
VALID_INBOUND_COMMANDS = VALID_SERVER_COMMANDS VALID_INBOUND_COMMANDS = VALID_SERVER_COMMANDS
VALID_OUTBOUND_COMMANDS = VALID_CLIENT_COMMANDS VALID_OUTBOUND_COMMANDS = VALID_CLIENT_COMMANDS
def __init__(self, client_name, server_name, clock, handler): def __init__(
self,
client_name: str,
server_name: str,
clock: Clock,
handler: AbstractReplicationClientHandler,
):
BaseReplicationStreamProtocol.__init__(self, clock) BaseReplicationStreamProtocol.__init__(self, clock)
self.client_name = client_name self.client_name = client_name