Persist events in parrellel if possible
parent
e2a241af08
commit
9e6daddaf5
|
@ -20,6 +20,7 @@ from twisted.internet import defer, reactor
|
||||||
from synapse.events import FrozenEvent, USE_FROZEN_DICTS
|
from synapse.events import FrozenEvent, USE_FROZEN_DICTS
|
||||||
from synapse.events.utils import prune_event
|
from synapse.events.utils import prune_event
|
||||||
|
|
||||||
|
from synapse.util import unwrapFirstError
|
||||||
from synapse.util.logcontext import preserve_context_over_deferred
|
from synapse.util.logcontext import preserve_context_over_deferred
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
|
@ -75,6 +76,8 @@ class EventsStore(SQLBaseStore):
|
||||||
for x in xrange(0, len(events_and_contexts), 100)
|
for x in xrange(0, len(events_and_contexts), 100)
|
||||||
]
|
]
|
||||||
|
|
||||||
|
if is_new_state:
|
||||||
|
# If is_new_state then we have to do chunks in order
|
||||||
for chunk in chunks:
|
for chunk in chunks:
|
||||||
yield self.runInteraction(
|
yield self.runInteraction(
|
||||||
"persist_events",
|
"persist_events",
|
||||||
|
@ -83,6 +86,21 @@ class EventsStore(SQLBaseStore):
|
||||||
backfilled=backfilled,
|
backfilled=backfilled,
|
||||||
is_new_state=is_new_state,
|
is_new_state=is_new_state,
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
# Otherwise, do them sequentially
|
||||||
|
yield defer.gatherResults(
|
||||||
|
[
|
||||||
|
self.runInteraction(
|
||||||
|
"persist_events",
|
||||||
|
self._persist_events_txn,
|
||||||
|
events_and_contexts=chunk,
|
||||||
|
backfilled=backfilled,
|
||||||
|
is_new_state=is_new_state,
|
||||||
|
)
|
||||||
|
for chunk in chunks
|
||||||
|
],
|
||||||
|
consumeErrors=True,
|
||||||
|
).addErrback(unwrapFirstError)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
|
|
Loading…
Reference in New Issue