Fix recording of federation stream token (#7564)
A couple of changes of significance: * remove the `_last_ack < federation_position` condition, so that updates will still be correctly processed after restart * Correctly wire up send_federation_ack to the right class.pull/7575/head
parent
d14c4d6b6d
commit
00db90f409
|
@ -0,0 +1 @@
|
||||||
|
Fix exception `'GenericWorkerReplicationHandler' object has no attribute 'send_federation_ack'`, introduced in v1.13.0.
|
|
@ -17,7 +17,7 @@
|
||||||
import contextlib
|
import contextlib
|
||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
from typing import Dict, Iterable
|
from typing import Dict, Iterable, Optional, Set
|
||||||
|
|
||||||
from typing_extensions import ContextManager
|
from typing_extensions import ContextManager
|
||||||
|
|
||||||
|
@ -677,10 +677,9 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
|
||||||
self.notify_pushers = hs.config.start_pushers
|
self.notify_pushers = hs.config.start_pushers
|
||||||
self.pusher_pool = hs.get_pusherpool()
|
self.pusher_pool = hs.get_pusherpool()
|
||||||
|
|
||||||
|
self.send_handler = None # type: Optional[FederationSenderHandler]
|
||||||
if hs.config.send_federation:
|
if hs.config.send_federation:
|
||||||
self.send_handler = FederationSenderHandler(hs, self)
|
self.send_handler = FederationSenderHandler(hs)
|
||||||
else:
|
|
||||||
self.send_handler = None
|
|
||||||
|
|
||||||
async def on_rdata(self, stream_name, instance_name, token, rows):
|
async def on_rdata(self, stream_name, instance_name, token, rows):
|
||||||
await super().on_rdata(stream_name, instance_name, token, rows)
|
await super().on_rdata(stream_name, instance_name, token, rows)
|
||||||
|
@ -718,7 +717,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
|
||||||
if entities:
|
if entities:
|
||||||
self.notifier.on_new_event("to_device_key", token, users=entities)
|
self.notifier.on_new_event("to_device_key", token, users=entities)
|
||||||
elif stream_name == DeviceListsStream.NAME:
|
elif stream_name == DeviceListsStream.NAME:
|
||||||
all_room_ids = set()
|
all_room_ids = set() # type: Set[str]
|
||||||
for row in rows:
|
for row in rows:
|
||||||
if row.entity.startswith("@"):
|
if row.entity.startswith("@"):
|
||||||
room_ids = await self.store.get_rooms_for_user(row.entity)
|
room_ids = await self.store.get_rooms_for_user(row.entity)
|
||||||
|
@ -769,24 +768,33 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
|
||||||
|
|
||||||
|
|
||||||
class FederationSenderHandler(object):
|
class FederationSenderHandler(object):
|
||||||
"""Processes the replication stream and forwards the appropriate entries
|
"""Processes the fedration replication stream
|
||||||
to the federation sender.
|
|
||||||
|
This class is only instantiate on the worker responsible for sending outbound
|
||||||
|
federation transactions. It receives rows from the replication stream and forwards
|
||||||
|
the appropriate entries to the FederationSender class.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, hs: GenericWorkerServer, replication_client):
|
def __init__(self, hs: GenericWorkerServer):
|
||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
self._is_mine_id = hs.is_mine_id
|
self._is_mine_id = hs.is_mine_id
|
||||||
self.federation_sender = hs.get_federation_sender()
|
self.federation_sender = hs.get_federation_sender()
|
||||||
self.replication_client = replication_client
|
self._hs = hs
|
||||||
|
|
||||||
|
# if the worker is restarted, we want to pick up where we left off in
|
||||||
|
# the replication stream, so load the position from the database.
|
||||||
|
#
|
||||||
|
# XXX is this actually worthwhile? Whenever the master is restarted, we'll
|
||||||
|
# drop some rows anyway (which is mostly fine because we're only dropping
|
||||||
|
# typing and presence notifications). If the replication stream is
|
||||||
|
# unreliable, why do we do all this hoop-jumping to store the position in the
|
||||||
|
# database? See also https://github.com/matrix-org/synapse/issues/7535.
|
||||||
|
#
|
||||||
self.federation_position = self.store.federation_out_pos_startup
|
self.federation_position = self.store.federation_out_pos_startup
|
||||||
|
|
||||||
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
|
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
|
||||||
|
|
||||||
self._last_ack = self.federation_position
|
self._last_ack = self.federation_position
|
||||||
|
|
||||||
self._room_serials = {}
|
|
||||||
self._room_typing = {}
|
|
||||||
|
|
||||||
def on_start(self):
|
def on_start(self):
|
||||||
# There may be some events that are persisted but haven't been sent,
|
# There may be some events that are persisted but haven't been sent,
|
||||||
# so send them now.
|
# so send them now.
|
||||||
|
@ -849,22 +857,34 @@ class FederationSenderHandler(object):
|
||||||
await self.federation_sender.send_read_receipt(receipt_info)
|
await self.federation_sender.send_read_receipt(receipt_info)
|
||||||
|
|
||||||
async def update_token(self, token):
|
async def update_token(self, token):
|
||||||
|
"""Update the record of where we have processed to in the federation stream.
|
||||||
|
|
||||||
|
Called after we have processed a an update received over replication. Sends
|
||||||
|
a FEDERATION_ACK back to the master, and stores the token that we have processed
|
||||||
|
in `federation_stream_position` so that we can restart where we left off.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
self.federation_position = token
|
self.federation_position = token
|
||||||
|
|
||||||
# We linearize here to ensure we don't have races updating the token
|
# We linearize here to ensure we don't have races updating the token
|
||||||
with (await self._fed_position_linearizer.queue(None)):
|
#
|
||||||
if self._last_ack < self.federation_position:
|
# XXX this appears to be redundant, since the ReplicationCommandHandler
|
||||||
await self.store.update_federation_out_pos(
|
# has a linearizer which ensures that we only process one line of
|
||||||
"federation", self.federation_position
|
# replication data at a time. Should we remove it, or is it doing useful
|
||||||
)
|
# service for robustness? Or could we replace it with an assertion that
|
||||||
|
# we're not being re-entered?
|
||||||
|
|
||||||
# We ACK this token over replication so that the master can drop
|
with (await self._fed_position_linearizer.queue(None)):
|
||||||
# its in memory queues
|
await self.store.update_federation_out_pos(
|
||||||
self.replication_client.send_federation_ack(
|
"federation", self.federation_position
|
||||||
self.federation_position
|
)
|
||||||
)
|
|
||||||
self._last_ack = self.federation_position
|
# We ACK this token over replication so that the master can drop
|
||||||
|
# its in memory queues
|
||||||
|
self._hs.get_tcp_replication().send_federation_ack(
|
||||||
|
self.federation_position
|
||||||
|
)
|
||||||
|
self._last_ack = self.federation_position
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Error updating federation stream position")
|
logger.exception("Error updating federation stream position")
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,71 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import mock
|
||||||
|
|
||||||
|
from synapse.app.generic_worker import GenericWorkerServer
|
||||||
|
from synapse.replication.tcp.commands import FederationAckCommand
|
||||||
|
from synapse.replication.tcp.protocol import AbstractConnection
|
||||||
|
from synapse.replication.tcp.streams.federation import FederationStream
|
||||||
|
|
||||||
|
from tests.unittest import HomeserverTestCase
|
||||||
|
|
||||||
|
|
||||||
|
class FederationAckTestCase(HomeserverTestCase):
|
||||||
|
def default_config(self) -> dict:
|
||||||
|
config = super().default_config()
|
||||||
|
config["worker_app"] = "synapse.app.federation_sender"
|
||||||
|
config["send_federation"] = True
|
||||||
|
return config
|
||||||
|
|
||||||
|
def make_homeserver(self, reactor, clock):
|
||||||
|
hs = self.setup_test_homeserver(homeserverToUse=GenericWorkerServer)
|
||||||
|
return hs
|
||||||
|
|
||||||
|
def test_federation_ack_sent(self):
|
||||||
|
"""A FEDERATION_ACK should be sent back after each RDATA federation
|
||||||
|
|
||||||
|
This test checks that the federation sender is correctly sending back
|
||||||
|
FEDERATION_ACK messages. The test works by spinning up a federation_sender
|
||||||
|
worker server, and then fishing out its ReplicationCommandHandler. We wire
|
||||||
|
the RCH up to a mock connection (so that we can observe the command being sent)
|
||||||
|
and then poke in an RDATA row.
|
||||||
|
|
||||||
|
XXX: it might be nice to do this by pretending to be a synapse master worker
|
||||||
|
(or a redis server), and having the worker connect to us via a mocked-up TCP
|
||||||
|
transport, rather than assuming that the implementation has a
|
||||||
|
ReplicationCommandHandler.
|
||||||
|
"""
|
||||||
|
rch = self.hs.get_tcp_replication()
|
||||||
|
|
||||||
|
# wire up the ReplicationCommandHandler to a mock connection
|
||||||
|
mock_connection = mock.Mock(spec=AbstractConnection)
|
||||||
|
rch.new_connection(mock_connection)
|
||||||
|
|
||||||
|
# tell it it received an RDATA row
|
||||||
|
self.get_success(
|
||||||
|
rch.on_rdata(
|
||||||
|
"federation",
|
||||||
|
"master",
|
||||||
|
token=10,
|
||||||
|
rows=[FederationStream.FederationStreamRow(type="x", data=[1, 2, 3])],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# now check that the FEDERATION_ACK was sent
|
||||||
|
mock_connection.send_command.assert_called_once()
|
||||||
|
cmd = mock_connection.send_command.call_args[0][0]
|
||||||
|
assert isinstance(cmd, FederationAckCommand)
|
||||||
|
self.assertEqual(cmd.token, 10)
|
Loading…
Reference in New Issue