Regularly try to wake up dests instead of waiting for next PDU/EDU (#15743)
parent
d939120421
commit
f63d4a3a65
|
@ -0,0 +1 @@
|
||||||
|
Regularly try to send transactions to other servers after they failed instead of waiting for a new event to be available before trying.
|
|
@ -109,10 +109,8 @@ was enabled*, Catch-Up Mode is exited and we return to `_transaction_transmissio
|
||||||
|
|
||||||
If a remote server is unreachable over federation, we back off from that server,
|
If a remote server is unreachable over federation, we back off from that server,
|
||||||
with an exponentially-increasing retry interval.
|
with an exponentially-increasing retry interval.
|
||||||
Whilst we don't automatically retry after the interval, we prevent making new attempts
|
We automatically retry after the retry interval expires (roughly, the logic to do so
|
||||||
until such time as the back-off has cleared.
|
being triggered every minute).
|
||||||
Once the back-off is cleared and a new PDU or EDU arrives for transmission, the transmission
|
|
||||||
loop resumes and empties the queue by making federation requests.
|
|
||||||
|
|
||||||
If the backoff grows too large (> 1 hour), the in-memory queue is emptied (to prevent
|
If the backoff grows too large (> 1 hour), the in-memory queue is emptied (to prevent
|
||||||
unbounded growth) and Catch-Up Mode is entered.
|
unbounded growth) and Catch-Up Mode is entered.
|
||||||
|
@ -145,7 +143,6 @@ from prometheus_client import Counter
|
||||||
from typing_extensions import Literal
|
from typing_extensions import Literal
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from twisted.internet.interfaces import IDelayedCall
|
|
||||||
|
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
from synapse.api.presence import UserPresenceState
|
from synapse.api.presence import UserPresenceState
|
||||||
|
@ -184,14 +181,18 @@ sent_pdus_destination_dist_total = Counter(
|
||||||
"Total number of PDUs queued for sending across all destinations",
|
"Total number of PDUs queued for sending across all destinations",
|
||||||
)
|
)
|
||||||
|
|
||||||
# Time (in s) after Synapse's startup that we will begin to wake up destinations
|
# Time (in s) to wait before trying to wake up destinations that have
|
||||||
# that have catch-up outstanding.
|
# catch-up outstanding. This will also be the delay applied at startup
|
||||||
CATCH_UP_STARTUP_DELAY_SEC = 15
|
# before trying the same.
|
||||||
|
# Please note that rate limiting still applies, so while the loop is
|
||||||
|
# executed every X seconds the destinations may not be wake up because
|
||||||
|
# they are being rate limited following previous attempt failures.
|
||||||
|
WAKEUP_RETRY_PERIOD_SEC = 60
|
||||||
|
|
||||||
# Time (in s) to wait in between waking up each destination, i.e. one destination
|
# Time (in s) to wait in between waking up each destination, i.e. one destination
|
||||||
# will be woken up every <x> seconds after Synapse's startup until we have woken
|
# will be woken up every <x> seconds until we have woken every destination
|
||||||
# every destination has outstanding catch-up.
|
# has outstanding catch-up.
|
||||||
CATCH_UP_STARTUP_INTERVAL_SEC = 5
|
WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC = 5
|
||||||
|
|
||||||
|
|
||||||
class AbstractFederationSender(metaclass=abc.ABCMeta):
|
class AbstractFederationSender(metaclass=abc.ABCMeta):
|
||||||
|
@ -415,12 +416,10 @@ class FederationSender(AbstractFederationSender):
|
||||||
/ hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
|
/ hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
|
||||||
)
|
)
|
||||||
|
|
||||||
# wake up destinations that have outstanding PDUs to be caught up
|
# Regularly wake up destinations that have outstanding PDUs to be caught up
|
||||||
self._catchup_after_startup_timer: Optional[
|
self.clock.looping_call(
|
||||||
IDelayedCall
|
|
||||||
] = self.clock.call_later(
|
|
||||||
CATCH_UP_STARTUP_DELAY_SEC,
|
|
||||||
run_as_background_process,
|
run_as_background_process,
|
||||||
|
WAKEUP_RETRY_PERIOD_SEC * 1000.0,
|
||||||
"wake_destinations_needing_catchup",
|
"wake_destinations_needing_catchup",
|
||||||
self._wake_destinations_needing_catchup,
|
self._wake_destinations_needing_catchup,
|
||||||
)
|
)
|
||||||
|
@ -966,7 +965,6 @@ class FederationSender(AbstractFederationSender):
|
||||||
|
|
||||||
if not destinations_to_wake:
|
if not destinations_to_wake:
|
||||||
# finished waking all destinations!
|
# finished waking all destinations!
|
||||||
self._catchup_after_startup_timer = None
|
|
||||||
break
|
break
|
||||||
|
|
||||||
last_processed = destinations_to_wake[-1]
|
last_processed = destinations_to_wake[-1]
|
||||||
|
@ -983,4 +981,4 @@ class FederationSender(AbstractFederationSender):
|
||||||
last_processed,
|
last_processed,
|
||||||
)
|
)
|
||||||
self.wake_destination(destination)
|
self.wake_destination(destination)
|
||||||
await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC)
|
await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC)
|
||||||
|
|
|
@ -431,28 +431,24 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
|
||||||
# ACT: call _wake_destinations_needing_catchup
|
# ACT: call _wake_destinations_needing_catchup
|
||||||
|
|
||||||
# patch wake_destination to just count the destinations instead
|
# patch wake_destination to just count the destinations instead
|
||||||
woken = []
|
woken = set()
|
||||||
|
|
||||||
def wake_destination_track(destination: str) -> None:
|
def wake_destination_track(destination: str) -> None:
|
||||||
woken.append(destination)
|
woken.add(destination)
|
||||||
|
|
||||||
self.federation_sender.wake_destination = wake_destination_track # type: ignore[assignment]
|
self.federation_sender.wake_destination = wake_destination_track # type: ignore[assignment]
|
||||||
|
|
||||||
# cancel the pre-existing timer for _wake_destinations_needing_catchup
|
# We wait quite long so that all dests can be woken up, since there is a delay
|
||||||
# this is because we are calling it manually rather than waiting for it
|
# between them.
|
||||||
# to be called automatically
|
self.pump(by=5.0)
|
||||||
assert self.federation_sender._catchup_after_startup_timer is not None
|
|
||||||
self.federation_sender._catchup_after_startup_timer.cancel()
|
|
||||||
|
|
||||||
self.get_success(
|
|
||||||
self.federation_sender._wake_destinations_needing_catchup(), by=5.0
|
|
||||||
)
|
|
||||||
|
|
||||||
# ASSERT (_wake_destinations_needing_catchup):
|
# ASSERT (_wake_destinations_needing_catchup):
|
||||||
# - all remotes are woken up, save for zzzerver
|
# - all remotes are woken up, save for zzzerver
|
||||||
self.assertNotIn("zzzerver", woken)
|
self.assertNotIn("zzzerver", woken)
|
||||||
# - all destinations are woken exactly once; they appear once in woken.
|
# - all destinations are woken, potentially more than once, since the
|
||||||
self.assertCountEqual(woken, server_names[:-1])
|
# wake up is called regularly and we don't ack in this test that a transaction
|
||||||
|
# has been successfully sent.
|
||||||
|
self.assertCountEqual(woken, set(server_names[:-1]))
|
||||||
|
|
||||||
def test_not_latest_event(self) -> None:
|
def test_not_latest_event(self) -> None:
|
||||||
"""Test that we send the latest event in the room even if its not ours."""
|
"""Test that we send the latest event in the room even if its not ours."""
|
||||||
|
|
Loading…
Reference in New Issue