Revert federation-transaction-transmission backoff hacks
This revertspull/8675/headb852a8247
,15b2a5081
,28889d8da
. I don't think these patches are required any more, and if they are, they should be on mainline, not hidden in our hotfixes branch. Let's try backing them out: if that turns out to be an error, we can PR them properly.
parent
7b6f857aa9
commit
cfb3096e33
|
@ -151,25 +151,10 @@ class FederationSender:
|
||||||
"process_event_queue_for_federation", self._process_event_queue_loop
|
"process_event_queue_for_federation", self._process_event_queue_loop
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _process_event_queue_loop(self):
|
async def _process_event_queue_loop(self) -> None:
|
||||||
loop_start_time = self.clock.time_msec()
|
|
||||||
try:
|
try:
|
||||||
self._is_processing = True
|
self._is_processing = True
|
||||||
while True:
|
while True:
|
||||||
# if we've been going around this loop for a long time without
|
|
||||||
# catching up, deprioritise transaction transmission. This should mean
|
|
||||||
# that events get batched into fewer transactions, which is more
|
|
||||||
# efficient, and hence give us a chance to catch up
|
|
||||||
if (
|
|
||||||
self.clock.time_msec() - loop_start_time > 60 * 1000
|
|
||||||
and not self._transaction_manager.deprioritise_transmission
|
|
||||||
):
|
|
||||||
logger.warning(
|
|
||||||
"Event queue is getting behind: deprioritising transaction "
|
|
||||||
"transmission"
|
|
||||||
)
|
|
||||||
self._transaction_manager.deprioritise_transmission = True
|
|
||||||
|
|
||||||
last_token = await self.store.get_federation_out_pos("events")
|
last_token = await self.store.get_federation_out_pos("events")
|
||||||
next_token, events = await self.store.get_all_new_events_stream(
|
next_token, events = await self.store.get_all_new_events_stream(
|
||||||
last_token, self._last_poked_id, limit=100
|
last_token, self._last_poked_id, limit=100
|
||||||
|
@ -279,9 +264,6 @@ class FederationSender:
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
self._is_processing = False
|
self._is_processing = False
|
||||||
if self._transaction_manager.deprioritise_transmission:
|
|
||||||
logger.info("Event queue caught up: re-prioritising transmission")
|
|
||||||
self._transaction_manager.deprioritise_transmission = False
|
|
||||||
|
|
||||||
def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
|
def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
|
||||||
# We loop through all destinations to see whether we already have
|
# We loop through all destinations to see whether we already have
|
||||||
|
|
|
@ -15,7 +15,6 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import datetime
|
import datetime
|
||||||
import logging
|
import logging
|
||||||
import random
|
|
||||||
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple
|
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple
|
||||||
|
|
||||||
from prometheus_client import Counter
|
from prometheus_client import Counter
|
||||||
|
@ -40,8 +39,6 @@ if TYPE_CHECKING:
|
||||||
# This is defined in the Matrix spec and enforced by the receiver.
|
# This is defined in the Matrix spec and enforced by the receiver.
|
||||||
MAX_EDUS_PER_TRANSACTION = 100
|
MAX_EDUS_PER_TRANSACTION = 100
|
||||||
|
|
||||||
DEPRIORITISE_SLEEP_TIME = 10
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -223,18 +220,6 @@ class PerDestinationQueue:
|
||||||
|
|
||||||
pending_pdus = []
|
pending_pdus = []
|
||||||
while True:
|
while True:
|
||||||
if self._transaction_manager.deprioritise_transmission:
|
|
||||||
# if the event-processing loop has got behind, sleep to give it
|
|
||||||
# a chance to catch up. Add some randomness so that the transmitters
|
|
||||||
# don't all wake up in sync.
|
|
||||||
sleeptime = random.uniform(
|
|
||||||
DEPRIORITISE_SLEEP_TIME, DEPRIORITISE_SLEEP_TIME * 2
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
"TX [%s]: sleeping for %f seconds", self._destination, sleeptime
|
|
||||||
)
|
|
||||||
await self._clock.sleep(sleeptime)
|
|
||||||
|
|
||||||
# We have to keep 2 free slots for presence and rr_edus
|
# We have to keep 2 free slots for presence and rr_edus
|
||||||
limit = MAX_EDUS_PER_TRANSACTION - 2
|
limit = MAX_EDUS_PER_TRANSACTION - 2
|
||||||
|
|
||||||
|
|
|
@ -51,10 +51,6 @@ class TransactionManager:
|
||||||
# HACK to get unique tx id
|
# HACK to get unique tx id
|
||||||
self._next_txn_id = int(self.clock.time_msec())
|
self._next_txn_id = int(self.clock.time_msec())
|
||||||
|
|
||||||
# the federation sender sometimes sets this to delay transaction transmission,
|
|
||||||
# if the sender gets behind.
|
|
||||||
self.deprioritise_transmission = False
|
|
||||||
|
|
||||||
@measure_func("_send_new_transaction")
|
@measure_func("_send_new_transaction")
|
||||||
async def send_new_transaction(
|
async def send_new_transaction(
|
||||||
self, destination: str, pdus: List[EventBase], edus: List[Edu],
|
self, destination: str, pdus: List[EventBase], edus: List[Edu],
|
||||||
|
|
Loading…
Reference in New Issue