Fix up logcontext handling in (federation) TransactionQueue
Avoid using preserve_context_over_function, which has problems with respect to logcontexts.pull/2508/head
parent
3ddda939d3
commit
01bbacf3c4
|
@ -20,8 +20,8 @@ from .persistence import TransactionActions
|
||||||
from .units import Transaction, Edu
|
from .units import Transaction, Edu
|
||||||
|
|
||||||
from synapse.api.errors import HttpResponseException
|
from synapse.api.errors import HttpResponseException
|
||||||
|
from synapse.util import logcontext
|
||||||
from synapse.util.async import run_on_reactor
|
from synapse.util.async import run_on_reactor
|
||||||
from synapse.util.logcontext import preserve_context_over_fn, preserve_fn
|
|
||||||
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
|
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
|
||||||
from synapse.util.metrics import measure_func
|
from synapse.util.metrics import measure_func
|
||||||
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
|
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
|
||||||
|
@ -231,11 +231,9 @@ class TransactionQueue(object):
|
||||||
(pdu, order)
|
(pdu, order)
|
||||||
)
|
)
|
||||||
|
|
||||||
preserve_context_over_fn(
|
self._attempt_new_transaction(destination)
|
||||||
self._attempt_new_transaction, destination
|
|
||||||
)
|
|
||||||
|
|
||||||
@preserve_fn # the caller should not yield on this
|
@logcontext.preserve_fn # the caller should not yield on this
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def send_presence(self, states):
|
def send_presence(self, states):
|
||||||
"""Send the new presence states to the appropriate destinations.
|
"""Send the new presence states to the appropriate destinations.
|
||||||
|
@ -299,7 +297,7 @@ class TransactionQueue(object):
|
||||||
state.user_id: state for state in states
|
state.user_id: state for state in states
|
||||||
})
|
})
|
||||||
|
|
||||||
preserve_fn(self._attempt_new_transaction)(destination)
|
self._attempt_new_transaction(destination)
|
||||||
|
|
||||||
def send_edu(self, destination, edu_type, content, key=None):
|
def send_edu(self, destination, edu_type, content, key=None):
|
||||||
edu = Edu(
|
edu = Edu(
|
||||||
|
@ -321,9 +319,7 @@ class TransactionQueue(object):
|
||||||
else:
|
else:
|
||||||
self.pending_edus_by_dest.setdefault(destination, []).append(edu)
|
self.pending_edus_by_dest.setdefault(destination, []).append(edu)
|
||||||
|
|
||||||
preserve_context_over_fn(
|
self._attempt_new_transaction(destination)
|
||||||
self._attempt_new_transaction, destination
|
|
||||||
)
|
|
||||||
|
|
||||||
def send_failure(self, failure, destination):
|
def send_failure(self, failure, destination):
|
||||||
if destination == self.server_name or destination == "localhost":
|
if destination == self.server_name or destination == "localhost":
|
||||||
|
@ -336,9 +332,7 @@ class TransactionQueue(object):
|
||||||
destination, []
|
destination, []
|
||||||
).append(failure)
|
).append(failure)
|
||||||
|
|
||||||
preserve_context_over_fn(
|
self._attempt_new_transaction(destination)
|
||||||
self._attempt_new_transaction, destination
|
|
||||||
)
|
|
||||||
|
|
||||||
def send_device_messages(self, destination):
|
def send_device_messages(self, destination):
|
||||||
if destination == self.server_name or destination == "localhost":
|
if destination == self.server_name or destination == "localhost":
|
||||||
|
@ -347,15 +341,24 @@ class TransactionQueue(object):
|
||||||
if not self.can_send_to(destination):
|
if not self.can_send_to(destination):
|
||||||
return
|
return
|
||||||
|
|
||||||
preserve_context_over_fn(
|
self._attempt_new_transaction(destination)
|
||||||
self._attempt_new_transaction, destination
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_current_token(self):
|
def get_current_token(self):
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _attempt_new_transaction(self, destination):
|
def _attempt_new_transaction(self, destination):
|
||||||
|
"""Try to start a new transaction to this destination
|
||||||
|
|
||||||
|
If there is already a transaction in progress to this destination,
|
||||||
|
returns immediately. Otherwise kicks off the process of sending a
|
||||||
|
transaction in the background.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
destination (str):
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
None
|
||||||
|
"""
|
||||||
# list of (pending_pdu, deferred, order)
|
# list of (pending_pdu, deferred, order)
|
||||||
if destination in self.pending_transactions:
|
if destination in self.pending_transactions:
|
||||||
# XXX: pending_transactions can get stuck on by a never-ending
|
# XXX: pending_transactions can get stuck on by a never-ending
|
||||||
|
@ -368,6 +371,19 @@ class TransactionQueue(object):
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
logger.debug("TX [%s] Starting transaction loop", destination)
|
||||||
|
|
||||||
|
# Drop the logcontext before starting the transaction. It doesn't
|
||||||
|
# really make sense to log all the outbound transactions against
|
||||||
|
# whatever path led us to this point: that's pretty arbitrary really.
|
||||||
|
#
|
||||||
|
# (this also means we can fire off _perform_transaction without
|
||||||
|
# yielding)
|
||||||
|
with logcontext.PreserveLoggingContext():
|
||||||
|
self._transaction_transmission_loop(destination)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _transaction_transmission_loop(self, destination):
|
||||||
pending_pdus = []
|
pending_pdus = []
|
||||||
try:
|
try:
|
||||||
self.pending_transactions[destination] = 1
|
self.pending_transactions[destination] = 1
|
||||||
|
|
Loading…
Reference in New Issue