Merge pull request #2508 from matrix-org/rav/federation_queue_logcontexts
Fix up logcontext handling in (federation) TransactionQueuepull/2516/head
commit
0eeaa25694
|
@ -20,8 +20,8 @@ from .persistence import TransactionActions
|
|||
from .units import Transaction, Edu
|
||||
|
||||
from synapse.api.errors import HttpResponseException
|
||||
from synapse.util import logcontext
|
||||
from synapse.util.async import run_on_reactor
|
||||
from synapse.util.logcontext import preserve_context_over_fn, preserve_fn
|
||||
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
|
||||
|
@ -231,11 +231,9 @@ class TransactionQueue(object):
|
|||
(pdu, order)
|
||||
)
|
||||
|
||||
preserve_context_over_fn(
|
||||
self._attempt_new_transaction, destination
|
||||
)
|
||||
self._attempt_new_transaction(destination)
|
||||
|
||||
@preserve_fn # the caller should not yield on this
|
||||
@logcontext.preserve_fn # the caller should not yield on this
|
||||
@defer.inlineCallbacks
|
||||
def send_presence(self, states):
|
||||
"""Send the new presence states to the appropriate destinations.
|
||||
|
@ -299,7 +297,7 @@ class TransactionQueue(object):
|
|||
state.user_id: state for state in states
|
||||
})
|
||||
|
||||
preserve_fn(self._attempt_new_transaction)(destination)
|
||||
self._attempt_new_transaction(destination)
|
||||
|
||||
def send_edu(self, destination, edu_type, content, key=None):
|
||||
edu = Edu(
|
||||
|
@ -321,9 +319,7 @@ class TransactionQueue(object):
|
|||
else:
|
||||
self.pending_edus_by_dest.setdefault(destination, []).append(edu)
|
||||
|
||||
preserve_context_over_fn(
|
||||
self._attempt_new_transaction, destination
|
||||
)
|
||||
self._attempt_new_transaction(destination)
|
||||
|
||||
def send_failure(self, failure, destination):
|
||||
if destination == self.server_name or destination == "localhost":
|
||||
|
@ -336,9 +332,7 @@ class TransactionQueue(object):
|
|||
destination, []
|
||||
).append(failure)
|
||||
|
||||
preserve_context_over_fn(
|
||||
self._attempt_new_transaction, destination
|
||||
)
|
||||
self._attempt_new_transaction(destination)
|
||||
|
||||
def send_device_messages(self, destination):
|
||||
if destination == self.server_name or destination == "localhost":
|
||||
|
@ -347,15 +341,24 @@ class TransactionQueue(object):
|
|||
if not self.can_send_to(destination):
|
||||
return
|
||||
|
||||
preserve_context_over_fn(
|
||||
self._attempt_new_transaction, destination
|
||||
)
|
||||
self._attempt_new_transaction(destination)
|
||||
|
||||
def get_current_token(self):
|
||||
return 0
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _attempt_new_transaction(self, destination):
|
||||
"""Try to start a new transaction to this destination
|
||||
|
||||
If there is already a transaction in progress to this destination,
|
||||
returns immediately. Otherwise kicks off the process of sending a
|
||||
transaction in the background.
|
||||
|
||||
Args:
|
||||
destination (str):
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
# list of (pending_pdu, deferred, order)
|
||||
if destination in self.pending_transactions:
|
||||
# XXX: pending_transactions can get stuck on by a never-ending
|
||||
|
@ -368,6 +371,19 @@ class TransactionQueue(object):
|
|||
)
|
||||
return
|
||||
|
||||
logger.debug("TX [%s] Starting transaction loop", destination)
|
||||
|
||||
# Drop the logcontext before starting the transaction. It doesn't
|
||||
# really make sense to log all the outbound transactions against
|
||||
# whatever path led us to this point: that's pretty arbitrary really.
|
||||
#
|
||||
# (this also means we can fire off _perform_transaction without
|
||||
# yielding)
|
||||
with logcontext.PreserveLoggingContext():
|
||||
self._transaction_transmission_loop(destination)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _transaction_transmission_loop(self, destination):
|
||||
pending_pdus = []
|
||||
try:
|
||||
self.pending_transactions[destination] = 1
|
||||
|
|
Loading…
Reference in New Issue