diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 233cb33daf..a477578e44 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -499,4 +499,13 @@ class FederationSender(object): self._get_per_destination_queue(destination).attempt_new_transaction() def get_current_token(self) -> int: + # Dummy implementation for case where federation sender isn't offloaded + # to a worker. return 0 + + async def get_replication_rows( + self, from_token, to_token, limit, federation_ack=None + ): + # Dummy implementation for case where federation sender isn't offloaded + # to a worker. + return [] diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 28dbc6fcba..4613b2538c 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -21,6 +21,7 @@ from synapse.replication.http import ( membership, register, send_event, + streams, ) REPLICATION_PREFIX = "/_synapse/replication" @@ -38,3 +39,4 @@ class ReplicationRestResource(JsonResource): login.register_servlets(hs, self) register.register_servlets(hs, self) devices.register_servlets(hs, self) + streams.register_servlets(hs, self) diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py new file mode 100644 index 0000000000..3889278b2a --- /dev/null +++ b/synapse/replication/http/streams.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from synapse.api.errors import SynapseError +from synapse.http.servlet import parse_integer +from synapse.replication.http._base import ReplicationEndpoint + +logger = logging.getLogger(__name__) + + +class ReplicationGetStreamUpdates(ReplicationEndpoint): + """Fetches stream updates from a server. Used for streams not persisted to + the database, e.g. typing notifications. + """ + + NAME = "get_repl_stream_updates" + PATH_ARGS = ("stream_name",) + METHOD = "GET" + + def __init__(self, hs): + super(ReplicationGetStreamUpdates, self).__init__(hs) + + from synapse.replication.tcp.streams import STREAMS_MAP + + self.streams = {stream.NAME: stream(hs) for stream in STREAMS_MAP.values()} + + @staticmethod + def _serialize_payload(stream_name, from_token, upto_token, limit): + return {"from_token": from_token, "upto_token": upto_token, "limit": limit} + + async def _handle_request(self, request, stream_name): + stream = self.streams.get(stream_name) + if stream is None: + raise SynapseError(400, "Unknown stream") + + from_token = parse_integer(request, "from_token", required=True) + upto_token = parse_integer(request, "upto_token", required=True) + limit = parse_integer(request, "limit", required=True) + + updates, upto_token, limited = await stream.get_updates_since( + from_token, upto_token, limit + ) + + return ( + 200, + {"updates": updates, "upto_token": upto_token, "limited": limited}, + ) + + +def register_servlets(hs, http_server): + ReplicationGetStreamUpdates(hs).register(http_server) diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index 5f52264e84..c3b9a90ca5 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -25,6 +25,8 @@ Each stream is defined by the following information: update_function: The function that returns a list of updates between two tokens """ +from typing import Dict, Type + from . import _base, events, federation STREAMS_MAP = { @@ -47,4 +49,4 @@ STREAMS_MAP = { _base.GroupServerStream, _base.UserSignatureStream, ) -} +} # type: Dict[str, Type[_base.Stream]] diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 99cef97532..6dea523f8c 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -20,6 +20,7 @@ from typing import Any, List, Optional, Tuple, Union import attr +from synapse.replication.http.streams import ReplicationGetStreamUpdates from synapse.types import JsonDict logger = logging.getLogger(__name__) @@ -127,6 +128,10 @@ class Stream(object): # The type of the row. Used by the default impl of parse_row. ROW_TYPE = None # type: Any + # Whether the update function is only available on master. If True then + # calls to get updates are proxied to the master via a HTTP call. + _QUERY_MASTER = False + @classmethod def parse_row(cls, row): """Parse a row received over replication @@ -143,6 +148,11 @@ class Stream(object): return cls.ROW_TYPE(*row) def __init__(self, hs): + self._is_worker = hs.config.worker_app is not None + + if self._QUERY_MASTER and self._is_worker: + self._replication_client = ReplicationGetStreamUpdates.make_client(hs) + # The token from which we last asked for updates self.last_token = self.current_token() @@ -191,14 +201,23 @@ class Stream(object): if from_token == upto_token: return [], upto_token, False - limited = False - rows = await self.update_function(from_token, upto_token, limit=limit) - updates = [(row[0], row[1:]) for row in rows] - if len(updates) == limit: - upto_token = rows[-1][0] - limited = True + if self._is_worker and self._QUERY_MASTER: + result = await self._replication_client( + stream_name=self.NAME, + from_token=from_token, + upto_token=upto_token, + limit=limit, + ) + return result["updates"], result["upto_token"], result["limited"] + else: + limited = False + rows = await self.update_function(from_token, upto_token, limit=limit) + updates = [(row[0], row[1:]) for row in rows] + if len(updates) == limit: + upto_token = rows[-1][0] + limited = True - return updates, upto_token, limited + return updates, upto_token, limited def current_token(self): """Gets the current token of the underlying streams. Should be provided @@ -239,13 +258,16 @@ class BackfillStream(Stream): class PresenceStream(Stream): NAME = "presence" ROW_TYPE = PresenceStreamRow + _QUERY_MASTER = True def __init__(self, hs): store = hs.get_datastore() presence_handler = hs.get_presence_handler() self.current_token = store.get_current_presence_token # type: ignore - self.update_function = presence_handler.get_all_presence_updates # type: ignore + + if hs.config.worker_app is None: + self.update_function = presence_handler.get_all_presence_updates # type: ignore super(PresenceStream, self).__init__(hs) @@ -253,12 +275,15 @@ class PresenceStream(Stream): class TypingStream(Stream): NAME = "typing" ROW_TYPE = TypingStreamRow + _QUERY_MASTER = True def __init__(self, hs): typing_handler = hs.get_typing_handler() self.current_token = typing_handler.get_current_token # type: ignore - self.update_function = typing_handler.get_all_typing_updates # type: ignore + + if hs.config.worker_app is None: + self.update_function = typing_handler.get_all_typing_updates # type: ignore super(TypingStream, self).__init__(hs) diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py index 615f3dc9ac..5d9e87188b 100644 --- a/synapse/replication/tcp/streams/federation.py +++ b/synapse/replication/tcp/streams/federation.py @@ -15,7 +15,9 @@ # limitations under the License. from collections import namedtuple -from ._base import Stream +from twisted.internet import defer + +from synapse.replication.tcp.streams._base import Stream FederationStreamRow = namedtuple( "FederationStreamRow", @@ -33,11 +35,18 @@ class FederationStream(Stream): NAME = "federation" ROW_TYPE = FederationStreamRow + _QUERY_MASTER = True def __init__(self, hs): - federation_sender = hs.get_federation_sender() - - self.current_token = federation_sender.get_current_token # type: ignore - self.update_function = federation_sender.get_replication_rows # type: ignore + # Not all synapse instances will have a federation sender instance, + # whether that's a `FederationSender` or a `FederationRemoteSendQueue`, + # so we stub the stream out when that is the case. + if hs.config.worker_app is None or hs.should_send_federation(): + federation_sender = hs.get_federation_sender() + self.current_token = federation_sender.get_current_token # type: ignore + self.update_function = federation_sender.get_replication_rows # type: ignore + else: + self.current_token = lambda: 0 # type: ignore + self.update_function = lambda *args, **kwargs: defer.succeed([]) # type: ignore super(FederationStream, self).__init__(hs)