Awful hackery to try to get the fed sender to keep up
Basically, if the federation sender starts getting behind, insert some sleeps into the transaction transmission code to give the fed sender a chance to catch up. Might have to experiment a bit with the numbers.pull/6126/head
parent
e04c235907
commit
721086a291
|
@ -0,0 +1 @@
|
|||
Group events into larger federation transactions at times of high traffic.
|
|
@ -152,9 +152,24 @@ class FederationSender(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def _process_event_queue_loop(self):
|
||||
loop_start_time = self.clock.time_msec()
|
||||
try:
|
||||
self._is_processing = True
|
||||
while True:
|
||||
# if we've been going around this loop for a long time without
|
||||
# catching up, deprioritise transaction transmission. This should mean
|
||||
# that events get batched into fewer transactions, which is more
|
||||
# efficient, and hence give us a chance to catch up
|
||||
if (
|
||||
self.clock.time_msec() - loop_start_time > 60 * 1000
|
||||
and not self._transaction_manager.deprioritise_transmission
|
||||
):
|
||||
logger.warning(
|
||||
"Event processing loop is getting behind: deprioritising "
|
||||
"transaction transmission"
|
||||
)
|
||||
self._transaction_manager.deprioritise_transmission = True
|
||||
|
||||
last_token = yield self.store.get_federation_out_pos("events")
|
||||
next_token, events = yield self.store.get_all_new_events_stream(
|
||||
last_token, self._last_poked_id, limit=100
|
||||
|
@ -251,6 +266,9 @@ class FederationSender(object):
|
|||
|
||||
finally:
|
||||
self._is_processing = False
|
||||
if self._transaction_manager.deprioritise_transmission:
|
||||
logger.info("Event queue caught up: re-prioritising transmission")
|
||||
self._transaction_manager.deprioritise_transmission = False
|
||||
|
||||
def _send_pdu(self, pdu, destinations):
|
||||
# We loop through all destinations to see whether we already have
|
||||
|
|
|
@ -189,6 +189,11 @@ class PerDestinationQueue(object):
|
|||
|
||||
pending_pdus = []
|
||||
while True:
|
||||
if self._transaction_manager.deprioritise_transmission:
|
||||
# if the event-processing loop has got behind, sleep to give it
|
||||
# a chance to catch up
|
||||
yield self._clock.sleep(2)
|
||||
|
||||
# We have to keep 2 free slots for presence and rr_edus
|
||||
limit = MAX_EDUS_PER_TRANSACTION - 2
|
||||
|
||||
|
|
|
@ -49,6 +49,10 @@ class TransactionManager(object):
|
|||
# HACK to get unique tx id
|
||||
self._next_txn_id = int(self.clock.time_msec())
|
||||
|
||||
# the federation sender sometimes sets this to delay transaction transmission,
|
||||
# if the sender gets behind.
|
||||
self.deprioritise_transmission = False
|
||||
|
||||
@measure_func("_send_new_transaction")
|
||||
@defer.inlineCallbacks
|
||||
def send_new_transaction(self, destination, pending_pdus, pending_edus):
|
||||
|
|
Loading…
Reference in New Issue