diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 24770882d2..9341793a61 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -20,6 +20,7 @@ from twisted.internet import defer, reactor from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event +from synapse.util import unwrapFirstError from synapse.util.logcontext import preserve_context_over_deferred from synapse.util.logutils import log_function from synapse.api.constants import EventTypes @@ -75,14 +76,31 @@ class EventsStore(SQLBaseStore): for x in xrange(0, len(events_and_contexts), 100) ] - for chunk in chunks: - yield self.runInteraction( - "persist_events", - self._persist_events_txn, - events_and_contexts=chunk, - backfilled=backfilled, - is_new_state=is_new_state, - ) + if is_new_state: + # If is_new_state then we have to do chunks in order + for chunk in chunks: + yield self.runInteraction( + "persist_events", + self._persist_events_txn, + events_and_contexts=chunk, + backfilled=backfilled, + is_new_state=is_new_state, + ) + else: + # Otherwise, do them sequentially + yield defer.gatherResults( + [ + self.runInteraction( + "persist_events", + self._persist_events_txn, + events_and_contexts=chunk, + backfilled=backfilled, + is_new_state=is_new_state, + ) + for chunk in chunks + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) @defer.inlineCallbacks @log_function