Only calculate initial sync for 10 rooms at a time

This helps to ensure we don't completely starve other requests.
pull/569/head
Erik Johnston 2016-02-10 11:41:04 +00:00
parent e557dc80b8
commit 8e49892b21
1 changed files with 29 additions and 24 deletions

View File

@ -18,7 +18,7 @@ from ._base import BaseHandler
from synapse.streams.config import PaginationConfig from synapse.streams.config import PaginationConfig
from synapse.api.constants import Membership, EventTypes from synapse.api.constants import Membership, EventTypes
from synapse.util import unwrapFirstError from synapse.util import unwrapFirstError
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from twisted.internet import defer from twisted.internet import defer
@ -228,10 +228,14 @@ class SyncHandler(BaseHandler):
invited = [] invited = []
archived = [] archived = []
deferreds = [] deferreds = []
for event in room_list:
if event.membership == Membership.JOIN: room_list_chunks = [room_list[i:i + 10] for i in xrange(0, len(room_list), 10)]
with PreserveLoggingContext(LoggingContext.current_context()): for room_list_chunk in room_list_chunks:
room_sync_deferred = self.full_state_sync_for_joined_room( for event in room_list_chunk:
if event.membership == Membership.JOIN:
room_sync_deferred = preserve_fn(
self.full_state_sync_for_joined_room
)(
room_id=event.room_id, room_id=event.room_id,
sync_config=sync_config, sync_config=sync_config,
now_token=now_token, now_token=now_token,
@ -240,20 +244,21 @@ class SyncHandler(BaseHandler):
tags_by_room=tags_by_room, tags_by_room=tags_by_room,
account_data_by_room=account_data_by_room, account_data_by_room=account_data_by_room,
) )
room_sync_deferred.addCallback(joined.append) room_sync_deferred.addCallback(joined.append)
deferreds.append(room_sync_deferred) deferreds.append(room_sync_deferred)
elif event.membership == Membership.INVITE: elif event.membership == Membership.INVITE:
invite = yield self.store.get_event(event.event_id) invite = yield self.store.get_event(event.event_id)
invited.append(InvitedSyncResult( invited.append(InvitedSyncResult(
room_id=event.room_id, room_id=event.room_id,
invite=invite, invite=invite,
)) ))
elif event.membership in (Membership.LEAVE, Membership.BAN): elif event.membership in (Membership.LEAVE, Membership.BAN):
leave_token = now_token.copy_and_replace( leave_token = now_token.copy_and_replace(
"room_key", "s%d" % (event.stream_ordering,) "room_key", "s%d" % (event.stream_ordering,)
) )
with PreserveLoggingContext(LoggingContext.current_context()): room_sync_deferred = preserve_fn(
room_sync_deferred = self.full_state_sync_for_archived_room( self.full_state_sync_for_archived_room
)(
sync_config=sync_config, sync_config=sync_config,
room_id=event.room_id, room_id=event.room_id,
leave_event_id=event.event_id, leave_event_id=event.event_id,
@ -262,12 +267,12 @@ class SyncHandler(BaseHandler):
tags_by_room=tags_by_room, tags_by_room=tags_by_room,
account_data_by_room=account_data_by_room, account_data_by_room=account_data_by_room,
) )
room_sync_deferred.addCallback(archived.append) room_sync_deferred.addCallback(archived.append)
deferreds.append(room_sync_deferred) deferreds.append(room_sync_deferred)
yield defer.gatherResults( yield defer.gatherResults(
deferreds, consumeErrors=True deferreds, consumeErrors=True
).addErrback(unwrapFirstError) ).addErrback(unwrapFirstError)
account_data_for_user = sync_config.filter_collection.filter_account_data( account_data_for_user = sync_config.filter_collection.filter_account_data(
self.account_data_for_user(account_data) self.account_data_for_user(account_data)