Revert EDU-batching hacks from matrix-org-hotfixes
Firstly: we want to do this in a better way, which is the intention of too many RRs, which means we need to make it happen again. This reverts commits:pull/4812/head8d7c0264b
000d23090
eb0334b07
4d07dc0d1
parent
a6e2546980
commit
c7285607a3
|
@ -66,9 +66,6 @@ sent_edus_by_type = Counter(
|
||||||
["type"],
|
["type"],
|
||||||
)
|
)
|
||||||
|
|
||||||
# number of seconds to wait to batch up outgoing EDUs
|
|
||||||
EDU_BATCH_TIME = 5.0
|
|
||||||
|
|
||||||
|
|
||||||
class TransactionQueue(object):
|
class TransactionQueue(object):
|
||||||
"""This class makes sure we only have one transaction in flight at
|
"""This class makes sure we only have one transaction in flight at
|
||||||
|
@ -122,12 +119,6 @@ class TransactionQueue(object):
|
||||||
# Map of destination -> (edu_type, key) -> Edu
|
# Map of destination -> (edu_type, key) -> Edu
|
||||||
self.pending_edus_keyed_by_dest = edus_keyed = {}
|
self.pending_edus_keyed_by_dest = edus_keyed = {}
|
||||||
|
|
||||||
# In order to batch outgoing EDUs, we delay sending them. This records the time
|
|
||||||
# when we should send the next batch, by destination.
|
|
||||||
self.edu_tx_time_by_dest = {}
|
|
||||||
|
|
||||||
self.edu_tx_task_by_dest = {}
|
|
||||||
|
|
||||||
LaterGauge(
|
LaterGauge(
|
||||||
"synapse_federation_transaction_queue_pending_pdus",
|
"synapse_federation_transaction_queue_pending_pdus",
|
||||||
"",
|
"",
|
||||||
|
@ -408,21 +399,7 @@ class TransactionQueue(object):
|
||||||
|
|
||||||
destination = edu.destination
|
destination = edu.destination
|
||||||
|
|
||||||
if destination not in self.edu_tx_time_by_dest:
|
self._attempt_new_transaction(destination)
|
||||||
txtime = self.clock.time() + EDU_BATCH_TIME * 1000
|
|
||||||
self.edu_tx_time_by_dest[destination] = txtime
|
|
||||||
|
|
||||||
if destination in self.edu_tx_task_by_dest:
|
|
||||||
# we already have a job queued to send EDUs to this destination
|
|
||||||
return
|
|
||||||
|
|
||||||
def send_edus():
|
|
||||||
del self.edu_tx_task_by_dest[destination]
|
|
||||||
self._attempt_new_transaction(destination)
|
|
||||||
|
|
||||||
self.edu_tx_task_by_dest[destination] = self.clock.call_later(
|
|
||||||
EDU_BATCH_TIME, send_edus,
|
|
||||||
)
|
|
||||||
|
|
||||||
def send_device_messages(self, destination):
|
def send_device_messages(self, destination):
|
||||||
if destination == self.server_name:
|
if destination == self.server_name:
|
||||||
|
@ -447,7 +424,6 @@ class TransactionQueue(object):
|
||||||
Returns:
|
Returns:
|
||||||
None
|
None
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# list of (pending_pdu, deferred, order)
|
# list of (pending_pdu, deferred, order)
|
||||||
if destination in self.pending_transactions:
|
if destination in self.pending_transactions:
|
||||||
# XXX: pending_transactions can get stuck on by a never-ending
|
# XXX: pending_transactions can get stuck on by a never-ending
|
||||||
|
@ -501,30 +477,19 @@ class TransactionQueue(object):
|
||||||
if leftover_pdus:
|
if leftover_pdus:
|
||||||
self.pending_pdus_by_dest[destination] = leftover_pdus
|
self.pending_pdus_by_dest[destination] = leftover_pdus
|
||||||
|
|
||||||
# if we have PDUs to send, we may as well send EDUs too. Otherwise,
|
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
||||||
# we only send EDUs if their delay is up
|
|
||||||
if destination in self.edu_tx_time_by_dest and (
|
|
||||||
pending_pdus or
|
|
||||||
self.clock.time() > self.edu_tx_time_by_dest[destination]
|
|
||||||
):
|
|
||||||
del self.edu_tx_time_by_dest[destination]
|
|
||||||
|
|
||||||
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
# We can only include at most 100 EDUs per transactions
|
||||||
|
pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
|
||||||
# We can only include at most 100 EDUs per transactions
|
if leftover_edus:
|
||||||
pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
|
self.pending_edus_by_dest[destination] = leftover_edus
|
||||||
if leftover_edus:
|
|
||||||
self.edu_tx_time_by_dest[destination] = self.clock.time()
|
|
||||||
self.pending_edus_by_dest[destination] = leftover_edus
|
|
||||||
|
|
||||||
pending_edus.extend(
|
|
||||||
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
pending_edus = []
|
|
||||||
|
|
||||||
pending_presence = self.pending_presence_by_dest.pop(destination, {})
|
pending_presence = self.pending_presence_by_dest.pop(destination, {})
|
||||||
|
|
||||||
|
pending_edus.extend(
|
||||||
|
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
|
||||||
|
)
|
||||||
|
|
||||||
pending_edus.extend(device_message_edus)
|
pending_edus.extend(device_message_edus)
|
||||||
if pending_presence:
|
if pending_presence:
|
||||||
pending_edus.append(
|
pending_edus.append(
|
||||||
|
|
Loading…
Reference in New Issue