Fix logcontext handling for persist_events

* don't use preserve_context_over_deferred, which is known broken.

* remove a redundant preserve_fn.

* add/improve some comments
pull/2549/head
Richard van der Hoff 2017-10-17 10:59:30 +01:00
parent e0a75e0c25
commit 2e9f5ea31a
2 changed files with 22 additions and 7 deletions

View File

@ -21,7 +21,7 @@ from synapse.events.utils import prune_event
from synapse.util.async import ObservableDeferred from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import ( from synapse.util.logcontext import (
preserve_fn, PreserveLoggingContext, preserve_context_over_deferred preserve_fn, PreserveLoggingContext, make_deferred_yieldable
) )
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
@ -88,13 +88,23 @@ class _EventPeristenceQueue(object):
def add_to_queue(self, room_id, events_and_contexts, backfilled): def add_to_queue(self, room_id, events_and_contexts, backfilled):
"""Add events to the queue, with the given persist_event options. """Add events to the queue, with the given persist_event options.
NB: due to the normal usage pattern of this method, it does *not*
follow the synapse logcontext rules, and leaves the logcontext in
place whether or not the returned deferred is ready.
Args: Args:
room_id (str): room_id (str):
events_and_contexts (list[(EventBase, EventContext)]): events_and_contexts (list[(EventBase, EventContext)]):
backfilled (bool): backfilled (bool):
Returns:
defer.Deferred: a deferred which will resolve once the events are
persisted. Runs its callbacks *without* a logcontext.
""" """
queue = self._event_persist_queues.setdefault(room_id, deque()) queue = self._event_persist_queues.setdefault(room_id, deque())
if queue: if queue:
# if the last item in the queue has the same `backfilled` setting,
# we can just add these new events to that item.
end_item = queue[-1] end_item = queue[-1]
if end_item.backfilled == backfilled: if end_item.backfilled == backfilled:
end_item.events_and_contexts.extend(events_and_contexts) end_item.events_and_contexts.extend(events_and_contexts)
@ -113,11 +123,11 @@ class _EventPeristenceQueue(object):
def handle_queue(self, room_id, per_item_callback): def handle_queue(self, room_id, per_item_callback):
"""Attempts to handle the queue for a room if not already being handled. """Attempts to handle the queue for a room if not already being handled.
The given callback will be invoked with for each item in the queue,1 The given callback will be invoked with for each item in the queue,
of type _EventPersistQueueItem. The per_item_callback will continuously of type _EventPersistQueueItem. The per_item_callback will continuously
be called with new items, unless the queue becomnes empty. The return be called with new items, unless the queue becomnes empty. The return
value of the function will be given to the deferreds waiting on the item, value of the function will be given to the deferreds waiting on the item,
exceptions will be passed to the deferres as well. exceptions will be passed to the deferreds as well.
This function should therefore be called whenever anything is added This function should therefore be called whenever anything is added
to the queue. to the queue.
@ -233,7 +243,7 @@ class EventsStore(SQLBaseStore):
deferreds = [] deferreds = []
for room_id, evs_ctxs in partitioned.iteritems(): for room_id, evs_ctxs in partitioned.iteritems():
d = preserve_fn(self._event_persist_queue.add_to_queue)( d = self._event_persist_queue.add_to_queue(
room_id, evs_ctxs, room_id, evs_ctxs,
backfilled=backfilled, backfilled=backfilled,
) )
@ -242,7 +252,7 @@ class EventsStore(SQLBaseStore):
for room_id in partitioned: for room_id in partitioned:
self._maybe_start_persisting(room_id) self._maybe_start_persisting(room_id)
return preserve_context_over_deferred( return make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True) defer.gatherResults(deferreds, consumeErrors=True)
) )
@ -267,7 +277,7 @@ class EventsStore(SQLBaseStore):
self._maybe_start_persisting(event.room_id) self._maybe_start_persisting(event.room_id)
yield preserve_context_over_deferred(deferred) yield make_deferred_yieldable(deferred)
max_persisted_id = yield self._stream_id_gen.get_current_token() max_persisted_id = yield self._stream_id_gen.get_current_token()
defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
@ -1526,7 +1536,7 @@ class EventsStore(SQLBaseStore):
if not allow_rejected: if not allow_rejected:
rows[:] = [r for r in rows if not r["rejects"]] rows[:] = [r for r in rows if not r["rejects"]]
res = yield preserve_context_over_deferred(defer.gatherResults( res = yield make_deferred_yieldable(defer.gatherResults(
[ [
preserve_fn(self._get_event_from_row)( preserve_fn(self._get_event_from_row)(
row["internal_metadata"], row["json"], row["redacts"], row["internal_metadata"], row["json"], row["redacts"],

View File

@ -53,6 +53,11 @@ class ObservableDeferred(object):
Cancelling or otherwise resolving an observer will not affect the original Cancelling or otherwise resolving an observer will not affect the original
ObservableDeferred. ObservableDeferred.
NB that it does not attempt to do anything with logcontexts; in general
you should probably make_deferred_yieldable the deferreds
returned by `observe`, and ensure that the original deferred runs its
callbacks in the sentinel logcontext.
""" """
__slots__ = ["_deferred", "_observers", "_result"] __slots__ = ["_deferred", "_observers", "_result"]