From 811d2ecf2ed50613d2f8a0231c4b9487be2ff925 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Mar 2020 15:11:54 +0000 Subject: [PATCH] Don't panic if streams get behind. The catchup will in future happen on workers, so master process won't need to protect itself by dropping the connection. --- synapse/replication/tcp/protocol.py | 20 ++++---- synapse/replication/tcp/resource.py | 5 +- synapse/replication/tcp/streams/_base.py | 61 ++++++++++-------------- 3 files changed, 41 insertions(+), 45 deletions(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index bc1482a9bb..d7ef2398fa 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -485,15 +485,19 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): self.connecting_streams.add(stream_name) try: - # Get missing updates - updates, current_token = await self.streamer.get_stream_updates( - stream_name, token - ) + limited = True + while limited: + # Get missing updates + ( + updates, + current_token, + limited, + ) = await self.streamer.get_stream_updates(stream_name, token) - # Send all the missing updates - for update in updates: - token, row = update[0], update[1] - self.send_command(RdataCommand(stream_name, token, row)) + # Send all the missing updates + for update in updates: + token, row = update[0], update[1] + self.send_command(RdataCommand(stream_name, token, row)) # We send a POSITION command to ensure that they have an up to # date token (especially useful if we didn't send any updates diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 6e2ebaf614..5be31024b7 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -190,7 +190,8 @@ class ReplicationStreamer(object): stream.current_token(), ) try: - updates, current_token = await stream.get_updates() + updates, current_token, limited = await stream.get_updates() + self.pending_updates |= limited except Exception: logger.info("Failed to handle stream %s", stream.NAME) raise @@ -235,7 +236,7 @@ class ReplicationStreamer(object): if not stream: raise Exception("unknown stream %s", stream_name) - return await stream.get_updates_since(token) + return await stream.get_updates_since(token, stream.current_token()) @measure_func("repl.federation_ack") def federation_ack(self, token): diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index abf5c6c6a8..99cef97532 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -14,10 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import itertools import logging from collections import namedtuple -from typing import Any, List, Optional, Tuple +from typing import Any, List, Optional, Tuple, Union import attr @@ -153,61 +152,53 @@ class Stream(object): """ self.last_token = self.current_token() - async def get_updates(self): + async def get_updates(self) -> Tuple[List[Tuple[int, JsonDict]], int, bool]: """Gets all updates since the last time this function was called (or since the stream was constructed if it hadn't been called before). Returns: - Deferred[Tuple[List[Tuple[int, Any]], int]: - Resolves to a pair ``(updates, current_token)``, where ``updates`` is a - list of ``(token, row)`` entries. ``row`` will be json-serialised and - sent over the replication steam. + Resolves to a pair `(updates, new_last_token, limited)`, where + `updates` is a list of `(token, row)` entries, `new_last_token` is + the new position in stream, and `limited` is whether there are + more updates to fetch. """ - updates, current_token = await self.get_updates_since(self.last_token) + current_token = self.current_token() + updates, current_token, limited = await self.get_updates_since( + self.last_token, current_token + ) self.last_token = current_token - return updates, current_token + return updates, current_token, limited async def get_updates_since( - self, from_token: int - ) -> Tuple[List[Tuple[int, JsonDict]], int]: + self, from_token: Union[int, str], upto_token: int, limit: int = 100 + ) -> Tuple[List[Tuple[int, JsonDict]], int, bool]: """Like get_updates except allows specifying from when we should stream updates Returns: - Resolves to a pair `(updates, new_last_token)`, where `updates` is - a list of `(token, row)` entries and `new_last_token` is the new - position in stream. + Resolves to a pair `(updates, new_last_token, limited)`, where + `updates` is a list of `(token, row)` entries, `new_last_token` is + the new position in stream, and `limited` is whether there are + more updates to fetch. """ if from_token in ("NOW", "now"): - return [], self.current_token() - - current_token = self.current_token() + return [], upto_token, False from_token = int(from_token) - if from_token == current_token: - return [], current_token - - rows = await self.update_function( - from_token, current_token, limit=MAX_EVENTS_BEHIND + 1 - ) - - # never turn more than MAX_EVENTS_BEHIND + 1 into updates. - rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1) + if from_token == upto_token: + return [], upto_token, False + limited = False + rows = await self.update_function(from_token, upto_token, limit=limit) updates = [(row[0], row[1:]) for row in rows] + if len(updates) == limit: + upto_token = rows[-1][0] + limited = True - # check we didn't get more rows than the limit. - # doing it like this allows the update_function to be a generator. - if len(updates) >= MAX_EVENTS_BEHIND: - raise Exception("stream %s has fallen behind" % (self.NAME)) - - # The update function didn't hit the limit, so we must have got all - # the updates to `current_token`, and can return that as our new - # stream position. - return updates, current_token + return updates, upto_token, limited def current_token(self): """Gets the current token of the underlying streams. Should be provided