diff --git a/changelog.d/6126.feature b/changelog.d/6126.feature deleted file mode 100644 index 1207ba6206..0000000000 --- a/changelog.d/6126.feature +++ /dev/null @@ -1 +0,0 @@ -Group events into larger federation transactions at times of high traffic. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 497485fac2..d46f4aaeb1 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -152,24 +152,9 @@ class FederationSender(object): @defer.inlineCallbacks def _process_event_queue_loop(self): - loop_start_time = self.clock.time_msec() try: self._is_processing = True while True: - # if we've been going around this loop for a long time without - # catching up, deprioritise transaction transmission. This should mean - # that events get batched into fewer transactions, which is more - # efficient, and hence give us a chance to catch up - if ( - self.clock.time_msec() - loop_start_time > 60 * 1000 - and not self._transaction_manager.deprioritise_transmission - ): - logger.warning( - "Event processing loop is getting behind: deprioritising " - "transaction transmission" - ) - self._transaction_manager.deprioritise_transmission = True - last_token = yield self.store.get_federation_out_pos("events") next_token, events = yield self.store.get_all_new_events_stream( last_token, self._last_poked_id, limit=100 @@ -266,9 +251,6 @@ class FederationSender(object): finally: self._is_processing = False - if self._transaction_manager.deprioritise_transmission: - logger.info("Event queue caught up: re-prioritising transmission") - self._transaction_manager.deprioritise_transmission = False def _send_pdu(self, pdu, destinations): # We loop through all destinations to see whether we already have diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index b890aaf840..fad980b893 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -189,11 +189,6 @@ class PerDestinationQueue(object): pending_pdus = [] while True: - if self._transaction_manager.deprioritise_transmission: - # if the event-processing loop has got behind, sleep to give it - # a chance to catch up - yield self._clock.sleep(2) - # We have to keep 2 free slots for presence and rr_edus limit = MAX_EDUS_PER_TRANSACTION - 2 diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index 69679dbf65..5b6c79c51a 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -49,10 +49,6 @@ class TransactionManager(object): # HACK to get unique tx id self._next_txn_id = int(self.clock.time_msec()) - # the federation sender sometimes sets this to delay transaction transmission, - # if the sender gets behind. - self.deprioritise_transmission = False - @measure_func("_send_new_transaction") @defer.inlineCallbacks def send_new_transaction(self, destination, pending_pdus, pending_edus):