Don't panic if streams get behind.

The catchup will in future happen on workers, so master process won't
need to protect itself by dropping the connection.
pull/7024/head
Erik Johnston 2020-03-20 15:11:54 +00:00
parent 7233d38690
commit 811d2ecf2e
3 changed files with 41 additions and 45 deletions

View File

@ -485,15 +485,19 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.connecting_streams.add(stream_name)
try:
# Get missing updates
updates, current_token = await self.streamer.get_stream_updates(
stream_name, token
)
limited = True
while limited:
# Get missing updates
(
updates,
current_token,
limited,
) = await self.streamer.get_stream_updates(stream_name, token)
# Send all the missing updates
for update in updates:
token, row = update[0], update[1]
self.send_command(RdataCommand(stream_name, token, row))
# Send all the missing updates
for update in updates:
token, row = update[0], update[1]
self.send_command(RdataCommand(stream_name, token, row))
# We send a POSITION command to ensure that they have an up to
# date token (especially useful if we didn't send any updates

View File

@ -190,7 +190,8 @@ class ReplicationStreamer(object):
stream.current_token(),
)
try:
updates, current_token = await stream.get_updates()
updates, current_token, limited = await stream.get_updates()
self.pending_updates |= limited
except Exception:
logger.info("Failed to handle stream %s", stream.NAME)
raise
@ -235,7 +236,7 @@ class ReplicationStreamer(object):
if not stream:
raise Exception("unknown stream %s", stream_name)
return await stream.get_updates_since(token)
return await stream.get_updates_since(token, stream.current_token())
@measure_func("repl.federation_ack")
def federation_ack(self, token):

View File

@ -14,10 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
from collections import namedtuple
from typing import Any, List, Optional, Tuple
from typing import Any, List, Optional, Tuple, Union
import attr
@ -153,61 +152,53 @@ class Stream(object):
"""
self.last_token = self.current_token()
async def get_updates(self):
async def get_updates(self) -> Tuple[List[Tuple[int, JsonDict]], int, bool]:
"""Gets all updates since the last time this function was called (or
since the stream was constructed if it hadn't been called before).
Returns:
Deferred[Tuple[List[Tuple[int, Any]], int]:
Resolves to a pair ``(updates, current_token)``, where ``updates`` is a
list of ``(token, row)`` entries. ``row`` will be json-serialised and
sent over the replication steam.
Resolves to a pair `(updates, new_last_token, limited)`, where
`updates` is a list of `(token, row)` entries, `new_last_token` is
the new position in stream, and `limited` is whether there are
more updates to fetch.
"""
updates, current_token = await self.get_updates_since(self.last_token)
current_token = self.current_token()
updates, current_token, limited = await self.get_updates_since(
self.last_token, current_token
)
self.last_token = current_token
return updates, current_token
return updates, current_token, limited
async def get_updates_since(
self, from_token: int
) -> Tuple[List[Tuple[int, JsonDict]], int]:
self, from_token: Union[int, str], upto_token: int, limit: int = 100
) -> Tuple[List[Tuple[int, JsonDict]], int, bool]:
"""Like get_updates except allows specifying from when we should
stream updates
Returns:
Resolves to a pair `(updates, new_last_token)`, where `updates` is
a list of `(token, row)` entries and `new_last_token` is the new
position in stream.
Resolves to a pair `(updates, new_last_token, limited)`, where
`updates` is a list of `(token, row)` entries, `new_last_token` is
the new position in stream, and `limited` is whether there are
more updates to fetch.
"""
if from_token in ("NOW", "now"):
return [], self.current_token()
current_token = self.current_token()
return [], upto_token, False
from_token = int(from_token)
if from_token == current_token:
return [], current_token
rows = await self.update_function(
from_token, current_token, limit=MAX_EVENTS_BEHIND + 1
)
# never turn more than MAX_EVENTS_BEHIND + 1 into updates.
rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1)
if from_token == upto_token:
return [], upto_token, False
limited = False
rows = await self.update_function(from_token, upto_token, limit=limit)
updates = [(row[0], row[1:]) for row in rows]
if len(updates) == limit:
upto_token = rows[-1][0]
limited = True
# check we didn't get more rows than the limit.
# doing it like this allows the update_function to be a generator.
if len(updates) >= MAX_EVENTS_BEHIND:
raise Exception("stream %s has fallen behind" % (self.NAME))
# The update function didn't hit the limit, so we must have got all
# the updates to `current_token`, and can return that as our new
# stream position.
return updates, current_token
return updates, upto_token, limited
def current_token(self):
"""Gets the current token of the underlying streams. Should be provided