Add ability to catchup on stream by talking to master.

pull/7024/head
Erik Johnston 2020-03-20 15:17:01 +00:00
parent 811d2ecf2e
commit ba90596687
6 changed files with 127 additions and 15 deletions

View File

@ -499,4 +499,13 @@ class FederationSender(object):
self._get_per_destination_queue(destination).attempt_new_transaction() self._get_per_destination_queue(destination).attempt_new_transaction()
def get_current_token(self) -> int: def get_current_token(self) -> int:
# Dummy implementation for case where federation sender isn't offloaded
# to a worker.
return 0 return 0
async def get_replication_rows(
self, from_token, to_token, limit, federation_ack=None
):
# Dummy implementation for case where federation sender isn't offloaded
# to a worker.
return []

View File

@ -21,6 +21,7 @@ from synapse.replication.http import (
membership, membership,
register, register,
send_event, send_event,
streams,
) )
REPLICATION_PREFIX = "/_synapse/replication" REPLICATION_PREFIX = "/_synapse/replication"
@ -38,3 +39,4 @@ class ReplicationRestResource(JsonResource):
login.register_servlets(hs, self) login.register_servlets(hs, self)
register.register_servlets(hs, self) register.register_servlets(hs, self)
devices.register_servlets(hs, self) devices.register_servlets(hs, self)
streams.register_servlets(hs, self)

View File

@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from synapse.api.errors import SynapseError
from synapse.http.servlet import parse_integer
from synapse.replication.http._base import ReplicationEndpoint
logger = logging.getLogger(__name__)
class ReplicationGetStreamUpdates(ReplicationEndpoint):
"""Fetches stream updates from a server. Used for streams not persisted to
the database, e.g. typing notifications.
"""
NAME = "get_repl_stream_updates"
PATH_ARGS = ("stream_name",)
METHOD = "GET"
def __init__(self, hs):
super(ReplicationGetStreamUpdates, self).__init__(hs)
from synapse.replication.tcp.streams import STREAMS_MAP
self.streams = {stream.NAME: stream(hs) for stream in STREAMS_MAP.values()}
@staticmethod
def _serialize_payload(stream_name, from_token, upto_token, limit):
return {"from_token": from_token, "upto_token": upto_token, "limit": limit}
async def _handle_request(self, request, stream_name):
stream = self.streams.get(stream_name)
if stream is None:
raise SynapseError(400, "Unknown stream")
from_token = parse_integer(request, "from_token", required=True)
upto_token = parse_integer(request, "upto_token", required=True)
limit = parse_integer(request, "limit", required=True)
updates, upto_token, limited = await stream.get_updates_since(
from_token, upto_token, limit
)
return (
200,
{"updates": updates, "upto_token": upto_token, "limited": limited},
)
def register_servlets(hs, http_server):
ReplicationGetStreamUpdates(hs).register(http_server)

View File

@ -25,6 +25,8 @@ Each stream is defined by the following information:
update_function: The function that returns a list of updates between two tokens update_function: The function that returns a list of updates between two tokens
""" """
from typing import Dict, Type
from . import _base, events, federation from . import _base, events, federation
STREAMS_MAP = { STREAMS_MAP = {
@ -47,4 +49,4 @@ STREAMS_MAP = {
_base.GroupServerStream, _base.GroupServerStream,
_base.UserSignatureStream, _base.UserSignatureStream,
) )
} } # type: Dict[str, Type[_base.Stream]]

View File

@ -20,6 +20,7 @@ from typing import Any, List, Optional, Tuple, Union
import attr import attr
from synapse.replication.http.streams import ReplicationGetStreamUpdates
from synapse.types import JsonDict from synapse.types import JsonDict
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -127,6 +128,10 @@ class Stream(object):
# The type of the row. Used by the default impl of parse_row. # The type of the row. Used by the default impl of parse_row.
ROW_TYPE = None # type: Any ROW_TYPE = None # type: Any
# Whether the update function is only available on master. If True then
# calls to get updates are proxied to the master via a HTTP call.
_QUERY_MASTER = False
@classmethod @classmethod
def parse_row(cls, row): def parse_row(cls, row):
"""Parse a row received over replication """Parse a row received over replication
@ -143,6 +148,11 @@ class Stream(object):
return cls.ROW_TYPE(*row) return cls.ROW_TYPE(*row)
def __init__(self, hs): def __init__(self, hs):
self._is_worker = hs.config.worker_app is not None
if self._QUERY_MASTER and self._is_worker:
self._replication_client = ReplicationGetStreamUpdates.make_client(hs)
# The token from which we last asked for updates # The token from which we last asked for updates
self.last_token = self.current_token() self.last_token = self.current_token()
@ -191,14 +201,23 @@ class Stream(object):
if from_token == upto_token: if from_token == upto_token:
return [], upto_token, False return [], upto_token, False
limited = False if self._is_worker and self._QUERY_MASTER:
rows = await self.update_function(from_token, upto_token, limit=limit) result = await self._replication_client(
updates = [(row[0], row[1:]) for row in rows] stream_name=self.NAME,
if len(updates) == limit: from_token=from_token,
upto_token = rows[-1][0] upto_token=upto_token,
limited = True limit=limit,
)
return result["updates"], result["upto_token"], result["limited"]
else:
limited = False
rows = await self.update_function(from_token, upto_token, limit=limit)
updates = [(row[0], row[1:]) for row in rows]
if len(updates) == limit:
upto_token = rows[-1][0]
limited = True
return updates, upto_token, limited return updates, upto_token, limited
def current_token(self): def current_token(self):
"""Gets the current token of the underlying streams. Should be provided """Gets the current token of the underlying streams. Should be provided
@ -239,13 +258,16 @@ class BackfillStream(Stream):
class PresenceStream(Stream): class PresenceStream(Stream):
NAME = "presence" NAME = "presence"
ROW_TYPE = PresenceStreamRow ROW_TYPE = PresenceStreamRow
_QUERY_MASTER = True
def __init__(self, hs): def __init__(self, hs):
store = hs.get_datastore() store = hs.get_datastore()
presence_handler = hs.get_presence_handler() presence_handler = hs.get_presence_handler()
self.current_token = store.get_current_presence_token # type: ignore self.current_token = store.get_current_presence_token # type: ignore
self.update_function = presence_handler.get_all_presence_updates # type: ignore
if hs.config.worker_app is None:
self.update_function = presence_handler.get_all_presence_updates # type: ignore
super(PresenceStream, self).__init__(hs) super(PresenceStream, self).__init__(hs)
@ -253,12 +275,15 @@ class PresenceStream(Stream):
class TypingStream(Stream): class TypingStream(Stream):
NAME = "typing" NAME = "typing"
ROW_TYPE = TypingStreamRow ROW_TYPE = TypingStreamRow
_QUERY_MASTER = True
def __init__(self, hs): def __init__(self, hs):
typing_handler = hs.get_typing_handler() typing_handler = hs.get_typing_handler()
self.current_token = typing_handler.get_current_token # type: ignore self.current_token = typing_handler.get_current_token # type: ignore
self.update_function = typing_handler.get_all_typing_updates # type: ignore
if hs.config.worker_app is None:
self.update_function = typing_handler.get_all_typing_updates # type: ignore
super(TypingStream, self).__init__(hs) super(TypingStream, self).__init__(hs)

View File

@ -15,7 +15,9 @@
# limitations under the License. # limitations under the License.
from collections import namedtuple from collections import namedtuple
from ._base import Stream from twisted.internet import defer
from synapse.replication.tcp.streams._base import Stream
FederationStreamRow = namedtuple( FederationStreamRow = namedtuple(
"FederationStreamRow", "FederationStreamRow",
@ -33,11 +35,18 @@ class FederationStream(Stream):
NAME = "federation" NAME = "federation"
ROW_TYPE = FederationStreamRow ROW_TYPE = FederationStreamRow
_QUERY_MASTER = True
def __init__(self, hs): def __init__(self, hs):
federation_sender = hs.get_federation_sender() # Not all synapse instances will have a federation sender instance,
# whether that's a `FederationSender` or a `FederationRemoteSendQueue`,
self.current_token = federation_sender.get_current_token # type: ignore # so we stub the stream out when that is the case.
self.update_function = federation_sender.get_replication_rows # type: ignore if hs.config.worker_app is None or hs.should_send_federation():
federation_sender = hs.get_federation_sender()
self.current_token = federation_sender.get_current_token # type: ignore
self.update_function = federation_sender.get_replication_rows # type: ignore
else:
self.current_token = lambda: 0 # type: ignore
self.update_function = lambda *args, **kwargs: defer.succeed([]) # type: ignore
super(FederationStream, self).__init__(hs) super(FederationStream, self).__init__(hs)