Merge branch 'rav/federation_sender_hackery' into matrix-org-hotfixes
commit
7657ad3ced
|
@ -0,0 +1 @@
|
||||||
|
Group events into larger federation transactions at times of high traffic.
|
|
@ -152,9 +152,24 @@ class FederationSender(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _process_event_queue_loop(self):
|
def _process_event_queue_loop(self):
|
||||||
|
loop_start_time = self.clock.time_msec()
|
||||||
try:
|
try:
|
||||||
self._is_processing = True
|
self._is_processing = True
|
||||||
while True:
|
while True:
|
||||||
|
# if we've been going around this loop for a long time without
|
||||||
|
# catching up, deprioritise transaction transmission. This should mean
|
||||||
|
# that events get batched into fewer transactions, which is more
|
||||||
|
# efficient, and hence give us a chance to catch up
|
||||||
|
if (
|
||||||
|
self.clock.time_msec() - loop_start_time > 60 * 1000
|
||||||
|
and not self._transaction_manager.deprioritise_transmission
|
||||||
|
):
|
||||||
|
logger.warning(
|
||||||
|
"Event processing loop is getting behind: deprioritising "
|
||||||
|
"transaction transmission"
|
||||||
|
)
|
||||||
|
self._transaction_manager.deprioritise_transmission = True
|
||||||
|
|
||||||
last_token = yield self.store.get_federation_out_pos("events")
|
last_token = yield self.store.get_federation_out_pos("events")
|
||||||
next_token, events = yield self.store.get_all_new_events_stream(
|
next_token, events = yield self.store.get_all_new_events_stream(
|
||||||
last_token, self._last_poked_id, limit=100
|
last_token, self._last_poked_id, limit=100
|
||||||
|
@ -251,6 +266,9 @@ class FederationSender(object):
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
self._is_processing = False
|
self._is_processing = False
|
||||||
|
if self._transaction_manager.deprioritise_transmission:
|
||||||
|
logger.info("Event queue caught up: re-prioritising transmission")
|
||||||
|
self._transaction_manager.deprioritise_transmission = False
|
||||||
|
|
||||||
def _send_pdu(self, pdu, destinations):
|
def _send_pdu(self, pdu, destinations):
|
||||||
# We loop through all destinations to see whether we already have
|
# We loop through all destinations to see whether we already have
|
||||||
|
|
|
@ -189,6 +189,11 @@ class PerDestinationQueue(object):
|
||||||
|
|
||||||
pending_pdus = []
|
pending_pdus = []
|
||||||
while True:
|
while True:
|
||||||
|
if self._transaction_manager.deprioritise_transmission:
|
||||||
|
# if the event-processing loop has got behind, sleep to give it
|
||||||
|
# a chance to catch up
|
||||||
|
yield self._clock.sleep(2)
|
||||||
|
|
||||||
# We have to keep 2 free slots for presence and rr_edus
|
# We have to keep 2 free slots for presence and rr_edus
|
||||||
limit = MAX_EDUS_PER_TRANSACTION - 2
|
limit = MAX_EDUS_PER_TRANSACTION - 2
|
||||||
|
|
||||||
|
|
|
@ -49,6 +49,10 @@ class TransactionManager(object):
|
||||||
# HACK to get unique tx id
|
# HACK to get unique tx id
|
||||||
self._next_txn_id = int(self.clock.time_msec())
|
self._next_txn_id = int(self.clock.time_msec())
|
||||||
|
|
||||||
|
# the federation sender sometimes sets this to delay transaction transmission,
|
||||||
|
# if the sender gets behind.
|
||||||
|
self.deprioritise_transmission = False
|
||||||
|
|
||||||
@measure_func("_send_new_transaction")
|
@measure_func("_send_new_transaction")
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def send_new_transaction(self, destination, pending_pdus, pending_edus):
|
def send_new_transaction(self, destination, pending_pdus, pending_edus):
|
||||||
|
|
Loading…
Reference in New Issue