Catch-up after Federation Outage (bonus): Catch-up on Synapse Startup (#8322)
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net> Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com> * Fix _set_destination_retry_timings This came about because the code assumed that retry_interval could not be NULL — which has been challenged by catch-up.pull/8354/head
parent
8a4a4186de
commit
36efbcaf51
|
@ -0,0 +1 @@
|
||||||
|
Fix messages over federation being lost until an event is sent into the same room.
|
|
@ -1 +0,0 @@
|
||||||
Track the latest event for every destination and room for catch-up after federation outage.
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fix messages over federation being lost until an event is sent into the same room.
|
|
@ -1 +0,0 @@
|
||||||
Track the `stream_ordering` of the last successfully-sent event to every destination, so we can use this information to 'catch up' a remote server after an outage.
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fix messages over federation being lost until an event is sent into the same room.
|
|
@ -1 +0,0 @@
|
||||||
Track the `stream_ordering` of the last successfully-sent event to every destination, so we can use this information to 'catch up' a remote server after an outage.
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fix messages over federation being lost until an event is sent into the same room.
|
|
@ -55,6 +55,15 @@ sent_pdus_destination_dist_total = Counter(
|
||||||
"Total number of PDUs queued for sending across all destinations",
|
"Total number of PDUs queued for sending across all destinations",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Time (in s) after Synapse's startup that we will begin to wake up destinations
|
||||||
|
# that have catch-up outstanding.
|
||||||
|
CATCH_UP_STARTUP_DELAY_SEC = 15
|
||||||
|
|
||||||
|
# Time (in s) to wait in between waking up each destination, i.e. one destination
|
||||||
|
# will be woken up every <x> seconds after Synapse's startup until we have woken
|
||||||
|
# every destination has outstanding catch-up.
|
||||||
|
CATCH_UP_STARTUP_INTERVAL_SEC = 5
|
||||||
|
|
||||||
|
|
||||||
class FederationSender:
|
class FederationSender:
|
||||||
def __init__(self, hs: "synapse.server.HomeServer"):
|
def __init__(self, hs: "synapse.server.HomeServer"):
|
||||||
|
@ -125,6 +134,14 @@ class FederationSender:
|
||||||
1000.0 / hs.config.federation_rr_transactions_per_room_per_second
|
1000.0 / hs.config.federation_rr_transactions_per_room_per_second
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# wake up destinations that have outstanding PDUs to be caught up
|
||||||
|
self._catchup_after_startup_timer = self.clock.call_later(
|
||||||
|
CATCH_UP_STARTUP_DELAY_SEC,
|
||||||
|
run_as_background_process,
|
||||||
|
"wake_destinations_needing_catchup",
|
||||||
|
self._wake_destinations_needing_catchup,
|
||||||
|
)
|
||||||
|
|
||||||
def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
|
def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
|
||||||
"""Get or create a PerDestinationQueue for the given destination
|
"""Get or create a PerDestinationQueue for the given destination
|
||||||
|
|
||||||
|
@ -560,3 +577,37 @@ class FederationSender:
|
||||||
# Dummy implementation for case where federation sender isn't offloaded
|
# Dummy implementation for case where federation sender isn't offloaded
|
||||||
# to a worker.
|
# to a worker.
|
||||||
return [], 0, False
|
return [], 0, False
|
||||||
|
|
||||||
|
async def _wake_destinations_needing_catchup(self):
|
||||||
|
"""
|
||||||
|
Wakes up destinations that need catch-up and are not currently being
|
||||||
|
backed off from.
|
||||||
|
|
||||||
|
In order to reduce load spikes, adds a delay between each destination.
|
||||||
|
"""
|
||||||
|
|
||||||
|
last_processed = None # type: Optional[str]
|
||||||
|
|
||||||
|
while True:
|
||||||
|
destinations_to_wake = await self.store.get_catch_up_outstanding_destinations(
|
||||||
|
last_processed
|
||||||
|
)
|
||||||
|
|
||||||
|
if not destinations_to_wake:
|
||||||
|
# finished waking all destinations!
|
||||||
|
self._catchup_after_startup_timer = None
|
||||||
|
break
|
||||||
|
|
||||||
|
destinations_to_wake = [
|
||||||
|
d
|
||||||
|
for d in destinations_to_wake
|
||||||
|
if self._federation_shard_config.should_handle(self._instance_name, d)
|
||||||
|
]
|
||||||
|
|
||||||
|
for last_processed in destinations_to_wake:
|
||||||
|
logger.info(
|
||||||
|
"Destination %s has outstanding catch-up, waking up.",
|
||||||
|
last_processed,
|
||||||
|
)
|
||||||
|
self.wake_destination(last_processed)
|
||||||
|
await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC)
|
||||||
|
|
|
@ -218,6 +218,7 @@ class TransactionStore(SQLBaseStore):
|
||||||
retry_interval = EXCLUDED.retry_interval
|
retry_interval = EXCLUDED.retry_interval
|
||||||
WHERE
|
WHERE
|
||||||
EXCLUDED.retry_interval = 0
|
EXCLUDED.retry_interval = 0
|
||||||
|
OR destinations.retry_interval IS NULL
|
||||||
OR destinations.retry_interval < EXCLUDED.retry_interval
|
OR destinations.retry_interval < EXCLUDED.retry_interval
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@ -249,7 +250,11 @@ class TransactionStore(SQLBaseStore):
|
||||||
"retry_interval": retry_interval,
|
"retry_interval": retry_interval,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval:
|
elif (
|
||||||
|
retry_interval == 0
|
||||||
|
or prev_row["retry_interval"] is None
|
||||||
|
or prev_row["retry_interval"] < retry_interval
|
||||||
|
):
|
||||||
self.db_pool.simple_update_one_txn(
|
self.db_pool.simple_update_one_txn(
|
||||||
txn,
|
txn,
|
||||||
"destinations",
|
"destinations",
|
||||||
|
@ -397,7 +402,7 @@ class TransactionStore(SQLBaseStore):
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_catch_up_room_event_ids_txn(
|
def _get_catch_up_room_event_ids_txn(
|
||||||
txn, destination: str, last_successful_stream_ordering: int,
|
txn: LoggingTransaction, destination: str, last_successful_stream_ordering: int,
|
||||||
) -> List[str]:
|
) -> List[str]:
|
||||||
q = """
|
q = """
|
||||||
SELECT event_id FROM destination_rooms
|
SELECT event_id FROM destination_rooms
|
||||||
|
@ -412,3 +417,60 @@ class TransactionStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
event_ids = [row[0] for row in txn]
|
event_ids = [row[0] for row in txn]
|
||||||
return event_ids
|
return event_ids
|
||||||
|
|
||||||
|
async def get_catch_up_outstanding_destinations(
|
||||||
|
self, after_destination: Optional[str]
|
||||||
|
) -> List[str]:
|
||||||
|
"""
|
||||||
|
Gets at most 25 destinations which have outstanding PDUs to be caught up,
|
||||||
|
and are not being backed off from
|
||||||
|
Args:
|
||||||
|
after_destination:
|
||||||
|
If provided, all destinations must be lexicographically greater
|
||||||
|
than this one.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list of up to 25 destinations with outstanding catch-up.
|
||||||
|
These are the lexicographically first destinations which are
|
||||||
|
lexicographically greater than after_destination (if provided).
|
||||||
|
"""
|
||||||
|
time = self.hs.get_clock().time_msec()
|
||||||
|
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"get_catch_up_outstanding_destinations",
|
||||||
|
self._get_catch_up_outstanding_destinations_txn,
|
||||||
|
time,
|
||||||
|
after_destination,
|
||||||
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_catch_up_outstanding_destinations_txn(
|
||||||
|
txn: LoggingTransaction, now_time_ms: int, after_destination: Optional[str]
|
||||||
|
) -> List[str]:
|
||||||
|
q = """
|
||||||
|
SELECT destination FROM destinations
|
||||||
|
WHERE destination IN (
|
||||||
|
SELECT destination FROM destination_rooms
|
||||||
|
WHERE destination_rooms.stream_ordering >
|
||||||
|
destinations.last_successful_stream_ordering
|
||||||
|
)
|
||||||
|
AND destination > ?
|
||||||
|
AND (
|
||||||
|
retry_last_ts IS NULL OR
|
||||||
|
retry_last_ts + retry_interval < ?
|
||||||
|
)
|
||||||
|
ORDER BY destination
|
||||||
|
LIMIT 25
|
||||||
|
"""
|
||||||
|
txn.execute(
|
||||||
|
q,
|
||||||
|
(
|
||||||
|
# everything is lexicographically greater than "" so this gives
|
||||||
|
# us the first batch of up to 25.
|
||||||
|
after_destination or "",
|
||||||
|
now_time_ms,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
destinations = [row[0] for row in txn]
|
||||||
|
return destinations
|
||||||
|
|
|
@ -321,3 +321,102 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
|
||||||
per_dest_queue._last_successful_stream_ordering,
|
per_dest_queue._last_successful_stream_ordering,
|
||||||
event_5.internal_metadata.stream_ordering,
|
event_5.internal_metadata.stream_ordering,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@override_config({"send_federation": True})
|
||||||
|
def test_catch_up_on_synapse_startup(self):
|
||||||
|
"""
|
||||||
|
Tests the behaviour of get_catch_up_outstanding_destinations and
|
||||||
|
_wake_destinations_needing_catchup.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# list of sorted server names (note that there are more servers than the batch
|
||||||
|
# size used in get_catch_up_outstanding_destinations).
|
||||||
|
server_names = ["server%02d" % number for number in range(42)] + ["zzzerver"]
|
||||||
|
|
||||||
|
# ARRANGE:
|
||||||
|
# - a local user (u1)
|
||||||
|
# - a room which u1 is joined to (and remote users @user:serverXX are
|
||||||
|
# joined to)
|
||||||
|
|
||||||
|
# mark the remotes as online
|
||||||
|
self.is_online = True
|
||||||
|
|
||||||
|
self.register_user("u1", "you the one")
|
||||||
|
u1_token = self.login("u1", "you the one")
|
||||||
|
room_id = self.helper.create_room_as("u1", tok=u1_token)
|
||||||
|
|
||||||
|
for server_name in server_names:
|
||||||
|
self.get_success(
|
||||||
|
event_injection.inject_member_event(
|
||||||
|
self.hs, room_id, "@user:%s" % server_name, "join"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# create an event
|
||||||
|
self.helper.send(room_id, "deary me!", tok=u1_token)
|
||||||
|
|
||||||
|
# ASSERT:
|
||||||
|
# - All servers are up to date so none should have outstanding catch-up
|
||||||
|
outstanding_when_successful = self.get_success(
|
||||||
|
self.hs.get_datastore().get_catch_up_outstanding_destinations(None)
|
||||||
|
)
|
||||||
|
self.assertEqual(outstanding_when_successful, [])
|
||||||
|
|
||||||
|
# ACT:
|
||||||
|
# - Make the remote servers unreachable
|
||||||
|
self.is_online = False
|
||||||
|
|
||||||
|
# - Mark zzzerver as being backed-off from
|
||||||
|
now = self.clock.time_msec()
|
||||||
|
self.get_success(
|
||||||
|
self.hs.get_datastore().set_destination_retry_timings(
|
||||||
|
"zzzerver", now, now, 24 * 60 * 60 * 1000 # retry in 1 day
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# - Send an event
|
||||||
|
self.helper.send(room_id, "can anyone hear me?", tok=u1_token)
|
||||||
|
|
||||||
|
# ASSERT (get_catch_up_outstanding_destinations):
|
||||||
|
# - all remotes are outstanding
|
||||||
|
# - they are returned in batches of 25, in order
|
||||||
|
outstanding_1 = self.get_success(
|
||||||
|
self.hs.get_datastore().get_catch_up_outstanding_destinations(None)
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(len(outstanding_1), 25)
|
||||||
|
self.assertEqual(outstanding_1, server_names[0:25])
|
||||||
|
|
||||||
|
outstanding_2 = self.get_success(
|
||||||
|
self.hs.get_datastore().get_catch_up_outstanding_destinations(
|
||||||
|
outstanding_1[-1]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.assertNotIn("zzzerver", outstanding_2)
|
||||||
|
self.assertEqual(len(outstanding_2), 17)
|
||||||
|
self.assertEqual(outstanding_2, server_names[25:-1])
|
||||||
|
|
||||||
|
# ACT: call _wake_destinations_needing_catchup
|
||||||
|
|
||||||
|
# patch wake_destination to just count the destinations instead
|
||||||
|
woken = []
|
||||||
|
|
||||||
|
def wake_destination_track(destination):
|
||||||
|
woken.append(destination)
|
||||||
|
|
||||||
|
self.hs.get_federation_sender().wake_destination = wake_destination_track
|
||||||
|
|
||||||
|
# cancel the pre-existing timer for _wake_destinations_needing_catchup
|
||||||
|
# this is because we are calling it manually rather than waiting for it
|
||||||
|
# to be called automatically
|
||||||
|
self.hs.get_federation_sender()._catchup_after_startup_timer.cancel()
|
||||||
|
|
||||||
|
self.get_success(
|
||||||
|
self.hs.get_federation_sender()._wake_destinations_needing_catchup(), by=5.0
|
||||||
|
)
|
||||||
|
|
||||||
|
# ASSERT (_wake_destinations_needing_catchup):
|
||||||
|
# - all remotes are woken up, save for zzzerver
|
||||||
|
self.assertNotIn("zzzerver", woken)
|
||||||
|
# - all destinations are woken exactly once; they appear once in woken.
|
||||||
|
self.assertCountEqual(woken, server_names[:-1])
|
||||||
|
|
Loading…
Reference in New Issue