Merge pull request #2718 from matrix-org/rav/notify_logcontexts

Clear logcontext before starting fed txn queue runner
pull/2727/head
Richard van der Hoff 2017-11-29 16:01:46 +00:00 committed by GitHub
commit 97d1a1dc01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 9 deletions

View File

@ -20,7 +20,7 @@ from .persistence import TransactionActions
from .units import Transaction, Edu from .units import Transaction, Edu
from synapse.api.errors import HttpResponseException from synapse.api.errors import HttpResponseException
from synapse.util import logcontext from synapse.util import logcontext, PreserveLoggingContext
from synapse.util.async import run_on_reactor from synapse.util.async import run_on_reactor
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func from synapse.util.metrics import measure_func
@ -146,7 +146,6 @@ class TransactionQueue(object):
else: else:
return not destination.startswith("localhost") return not destination.startswith("localhost")
@defer.inlineCallbacks
def notify_new_events(self, current_id): def notify_new_events(self, current_id):
"""This gets called when we have some new events we might want to """This gets called when we have some new events we might want to
send out to other servers. send out to other servers.
@ -156,6 +155,13 @@ class TransactionQueue(object):
if self._is_processing: if self._is_processing:
return return
# fire off a processing loop in the background. It's likely it will
# outlast the current request, so run it in the sentinel logcontext.
with PreserveLoggingContext():
self._process_event_queue_loop()
@defer.inlineCallbacks
def _process_event_queue_loop(self):
try: try:
self._is_processing = True self._is_processing = True
while True: while True:

View File

@ -255,9 +255,7 @@ class Notifier(object):
) )
if self.federation_sender: if self.federation_sender:
preserve_fn(self.federation_sender.notify_new_events)( self.federation_sender.notify_new_events(room_stream_id)
room_stream_id
)
if event.type == EventTypes.Member and event.membership == Membership.JOIN: if event.type == EventTypes.Member and event.membership == Membership.JOIN:
self._user_joined_room(event.state_key, event.room_id) self._user_joined_room(event.state_key, event.room_id)
@ -297,8 +295,7 @@ class Notifier(object):
def on_new_replication_data(self): def on_new_replication_data(self):
"""Used to inform replication listeners that something has happend """Used to inform replication listeners that something has happend
without waking up any of the normal user event streams""" without waking up any of the normal user event streams"""
with PreserveLoggingContext(): self.notify_replication()
self.notify_replication()
@defer.inlineCallbacks @defer.inlineCallbacks
def wait_for_events(self, user_id, timeout, callback, room_ids=None, def wait_for_events(self, user_id, timeout, callback, room_ids=None,
@ -516,8 +513,14 @@ class Notifier(object):
self.replication_deferred = ObservableDeferred(defer.Deferred()) self.replication_deferred = ObservableDeferred(defer.Deferred())
deferred.callback(None) deferred.callback(None)
for cb in self.replication_callbacks: # the callbacks may well outlast the current request, so we run
preserve_fn(cb)() # them in the sentinel logcontext.
#
# (ideally it would be up to the callbacks to know if they were
# starting off background processes and drop the logcontext
# accordingly, but that requires more changes)
for cb in self.replication_callbacks:
cb()
@defer.inlineCallbacks @defer.inlineCallbacks
def wait_for_replication(self, callback, timeout): def wait_for_replication(self, callback, timeout):