Pull loop one level up

pull/768/head
Erik Johnston 2016-05-06 15:38:42 +01:00
parent b6e0be701e
commit fd85b167ec
1 changed files with 41 additions and 36 deletions

View File

@ -19,7 +19,7 @@ from twisted.internet import defer, reactor
from synapse.events import FrozenEvent, USE_FROZEN_DICTS
from synapse.events.utils import prune_event
from synapse.util.async import ObservableDeferred
from synapse.util.async import ObservableDeferred, run_on_reactor
from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
@ -89,12 +89,14 @@ class _EventPeristenceQueue(object):
return deferred.observe()
def handle_queue(self, room_id, callback):
def handle_queue(self, room_id, per_item_callback):
"""Attempts to handle the queue for a room if not already being handled.
The given callback will be invoked with a 'queue' arg, which is a
generator over _EventPersistQueueItem's. The queue will finish if there
are no longer any items in the room queue.
The given callback will be invoked with for each item in the queue,1
of type _EventPersistQueueItem. The per_item_callback will continuously
be called with new items, unless the queue becomnes empty. The return
value of the function will be given to the deferreds waiting on the item,
exceptions will be passed to the deferres as well.
This function should therefore be called whenever anything is added
to the queue.
@ -108,15 +110,26 @@ class _EventPeristenceQueue(object):
self._currently_persisting_rooms.add(room_id)
@defer.inlineCallbacks
def handle_queue_loop():
try:
callback(self._get_drainining_queue(room_id))
queue = self._get_drainining_queue(room_id)
for item in queue:
try:
ret = yield per_item_callback(item)
item.deferred.callback(ret)
except Exception as e:
item.deferred.errback(e)
finally:
queue = self._event_persist_queues.pop(room_id, None)
if queue:
self._event_persist_queues[room_id] = queue
self._currently_persisting_rooms.discard(room_id)
preserve_fn(handle_queue_loop)()
def _get_drainining_queue(self, room_id):
queue = self._event_persist_queues.pop(room_id, None)
if not queue:
return
queue = self._event_persist_queues.setdefault(room_id, deque())
try:
while True:
@ -180,10 +193,7 @@ class EventsStore(SQLBaseStore):
def _maybe_start_persisting(self, room_id):
@defer.inlineCallbacks
def persisting_queue(queue):
for item in queue:
try:
ret = None
def persisting_queue(item):
if item.current_state:
for event, context in item.events_and_contexts:
# There should only ever be one item in
@ -199,11 +209,6 @@ class EventsStore(SQLBaseStore):
item.events_and_contexts,
backfilled=item.backfilled,
)
logger.info("Resolving with ret: %r", ret)
item.deferred.callback(ret)
except Exception as e:
logger.exception("Failed to persist events")
item.deferred.errback(e)
self._event_persist_queue.handle_queue(room_id, persisting_queue)