From b5605dfecc1f277e03165b374bd6ce81638ccb36 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 May 2016 17:37:01 +0100 Subject: [PATCH 01/10] Refactor SyncHandler --- synapse/handlers/sync.py | 978 +++++++++++++++++++-------------------- 1 file changed, 481 insertions(+), 497 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9ebfccc8bf..80eccf19ae 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -194,157 +194,7 @@ class SyncHandler(object): Returns: A Deferred SyncResult. """ - if since_token is None or full_state: - return self.full_state_sync(sync_config, since_token) - else: - return self.incremental_sync_with_gap(sync_config, since_token) - - @defer.inlineCallbacks - def full_state_sync(self, sync_config, timeline_since_token): - """Get a sync for a client which is starting without any state. - - If a 'message_since_token' is given, only timeline events which have - happened since that token will be returned. - - Returns: - A Deferred SyncResult. - """ - now_token = yield self.event_sources.get_current_token() - - now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token - ) - - presence_stream = self.event_sources.sources["presence"] - # TODO (mjark): This looks wrong, shouldn't we be getting the presence - # UP to the present rather than after the present? - pagination_config = PaginationConfig(from_token=now_token) - presence, _ = yield presence_stream.get_pagination_rows( - user=sync_config.user, - pagination_config=pagination_config.get_source_config("presence"), - key=None - ) - - membership_list = ( - Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN - ) - - room_list = yield self.store.get_rooms_for_user_where_membership_is( - user_id=sync_config.user.to_string(), - membership_list=membership_list - ) - - account_data, account_data_by_room = ( - yield self.store.get_account_data_for_user( - sync_config.user.to_string() - ) - ) - - account_data['m.push_rules'] = yield self.push_rules_for_user( - sync_config.user - ) - - tags_by_room = yield self.store.get_tags_for_user( - sync_config.user.to_string() - ) - - ignored_users = account_data.get( - "m.ignored_user_list", {} - ).get("ignored_users", {}).keys() - - joined = [] - invited = [] - archived = [] - - user_id = sync_config.user.to_string() - - @defer.inlineCallbacks - def _generate_room_entry(event): - if event.membership == Membership.JOIN: - room_result = yield self.full_state_sync_for_joined_room( - room_id=event.room_id, - sync_config=sync_config, - now_token=now_token, - timeline_since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) - joined.append(room_result) - elif event.membership == Membership.INVITE: - if event.sender in ignored_users: - return - invite = yield self.store.get_event(event.event_id) - invited.append(InvitedSyncResult( - room_id=event.room_id, - invite=invite, - )) - elif event.membership in (Membership.LEAVE, Membership.BAN): - # Always send down rooms we were banned or kicked from. - if not sync_config.filter_collection.include_leave: - if event.membership == Membership.LEAVE: - if user_id == event.sender: - return - - leave_token = now_token.copy_and_replace( - "room_key", "s%d" % (event.stream_ordering,) - ) - room_result = yield self.full_state_sync_for_archived_room( - sync_config=sync_config, - room_id=event.room_id, - leave_event_id=event.event_id, - leave_token=leave_token, - timeline_since_token=timeline_since_token, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) - archived.append(room_result) - - yield concurrently_execute(_generate_room_entry, room_list, 10) - - account_data_for_user = sync_config.filter_collection.filter_account_data( - self.account_data_for_user(account_data) - ) - - presence = sync_config.filter_collection.filter_presence( - presence - ) - - defer.returnValue(SyncResult( - presence=presence, - account_data=account_data_for_user, - joined=joined, - invited=invited, - archived=archived, - next_batch=now_token, - )) - - @defer.inlineCallbacks - def full_state_sync_for_joined_room(self, room_id, sync_config, - now_token, timeline_since_token, - ephemeral_by_room, tags_by_room, - account_data_by_room): - """Sync a room for a client which is starting without any state - Returns: - A Deferred JoinedSyncResult. - """ - - batch = yield self.load_filtered_recents( - room_id, sync_config, now_token, since_token=timeline_since_token - ) - - room_sync = yield self.incremental_sync_with_gap_for_room( - room_id, sync_config, - now_token=now_token, - since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - batch=batch, - full_state=True, - ) - - defer.returnValue(room_sync) + return self.generate_sync_result(sync_config, since_token, full_state) @defer.inlineCallbacks def push_rules_for_user(self, user): @@ -365,24 +215,6 @@ class SyncHandler(object): return account_data_events - def account_data_for_room(self, room_id, tags_by_room, account_data_by_room): - account_data_events = [] - tags = tags_by_room.get(room_id) - if tags is not None: - account_data_events.append({ - "type": "m.tag", - "content": {"tags": tags}, - }) - - account_data = account_data_by_room.get(room_id, {}) - for account_data_type, content in account_data.items(): - account_data_events.append({ - "type": account_data_type, - "content": content, - }) - - return account_data_events - @defer.inlineCallbacks def ephemeral_by_room(self, sync_config, now_token, since_token=None): """Get the ephemeral events for each room the user is in @@ -445,237 +277,6 @@ class SyncHandler(object): defer.returnValue((now_token, ephemeral_by_room)) - def full_state_sync_for_archived_room(self, room_id, sync_config, - leave_event_id, leave_token, - timeline_since_token, tags_by_room, - account_data_by_room): - """Sync a room for a client which is starting without any state - Returns: - A Deferred ArchivedSyncResult. - """ - - return self.incremental_sync_for_archived_room( - sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room, - account_data_by_room, full_state=True, leave_token=leave_token, - ) - - @defer.inlineCallbacks - def incremental_sync_with_gap(self, sync_config, since_token): - """ Get the incremental delta needed to bring the client up to - date with the server. - Returns: - A Deferred SyncResult. - """ - now_token = yield self.event_sources.get_current_token() - - rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) - room_ids = [room.room_id for room in rooms] - - presence_source = self.event_sources.sources["presence"] - presence, presence_key = yield presence_source.get_new_events( - user=sync_config.user, - from_key=since_token.presence_key, - limit=sync_config.filter_collection.presence_limit(), - room_ids=room_ids, - is_guest=sync_config.is_guest, - ) - now_token = now_token.copy_and_replace("presence_key", presence_key) - - now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token, since_token - ) - - app_service = yield self.store.get_app_service_by_user_id( - sync_config.user.to_string() - ) - if app_service: - rooms = yield self.store.get_app_service_rooms(app_service) - joined_room_ids = set(r.room_id for r in rooms) - else: - rooms = yield self.store.get_rooms_for_user( - sync_config.user.to_string() - ) - joined_room_ids = set(r.room_id for r in rooms) - - user_id = sync_config.user.to_string() - - timeline_limit = sync_config.filter_collection.timeline_limit() - - tags_by_room = yield self.store.get_updated_tags( - user_id, - since_token.account_data_key, - ) - - account_data, account_data_by_room = ( - yield self.store.get_updated_account_data_for_user( - user_id, - since_token.account_data_key, - ) - ) - - push_rules_changed = yield self.store.have_push_rules_changed_for_user( - user_id, int(since_token.push_rules_key) - ) - - if push_rules_changed: - account_data["m.push_rules"] = yield self.push_rules_for_user( - sync_config.user - ) - - ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( - "m.ignored_user_list", user_id=user_id, - ) - - if ignored_account_data: - ignored_users = ignored_account_data.get("ignored_users", {}).keys() - else: - ignored_users = frozenset() - - # Get a list of membership change events that have happened. - rooms_changed = yield self.store.get_membership_changes_for_user( - user_id, since_token.room_key, now_token.room_key - ) - - mem_change_events_by_room_id = {} - for event in rooms_changed: - mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) - - newly_joined_rooms = [] - archived = [] - invited = [] - for room_id, events in mem_change_events_by_room_id.items(): - non_joins = [e for e in events if e.membership != Membership.JOIN] - has_join = len(non_joins) != len(events) - - # We want to figure out if we joined the room at some point since - # the last sync (even if we have since left). This is to make sure - # we do send down the room, and with full state, where necessary - if room_id in joined_room_ids or has_join: - old_state = yield self.get_state_at(room_id, since_token) - old_mem_ev = old_state.get((EventTypes.Member, user_id), None) - if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: - newly_joined_rooms.append(room_id) - - if room_id in joined_room_ids: - continue - - if not non_joins: - continue - - # Only bother if we're still currently invited - should_invite = non_joins[-1].membership == Membership.INVITE - if should_invite: - if event.sender not in ignored_users: - room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) - if room_sync: - invited.append(room_sync) - - # Always include leave/ban events. Just take the last one. - # TODO: How do we handle ban -> leave in same batch? - leave_events = [ - e for e in non_joins - if e.membership in (Membership.LEAVE, Membership.BAN) - ] - - if leave_events: - leave_event = leave_events[-1] - room_sync = yield self.incremental_sync_for_archived_room( - sync_config, room_id, leave_event.event_id, since_token, - tags_by_room, account_data_by_room, - full_state=room_id in newly_joined_rooms - ) - if room_sync: - archived.append(room_sync) - - # Get all events for rooms we're currently joined to. - room_to_events = yield self.store.get_room_events_stream_for_rooms( - room_ids=joined_room_ids, - from_key=since_token.room_key, - to_key=now_token.room_key, - limit=timeline_limit + 1, - ) - - joined = [] - # We loop through all room ids, even if there are no new events, in case - # there are non room events taht we need to notify about. - for room_id in joined_room_ids: - room_entry = room_to_events.get(room_id, None) - - if room_entry: - events, start_key = room_entry - - prev_batch_token = now_token.copy_and_replace("room_key", start_key) - - newly_joined_room = room_id in newly_joined_rooms - full_state = newly_joined_room - - batch = yield self.load_filtered_recents( - room_id, sync_config, prev_batch_token, - since_token=since_token, - recents=events, - newly_joined_room=newly_joined_room, - ) - else: - batch = TimelineBatch( - events=[], - prev_batch=since_token, - limited=False, - ) - full_state = False - - room_sync = yield self.incremental_sync_with_gap_for_room( - room_id=room_id, - sync_config=sync_config, - since_token=since_token, - now_token=now_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - batch=batch, - full_state=full_state, - ) - if room_sync: - joined.append(room_sync) - - # For each newly joined room, we want to send down presence of - # existing users. - presence_handler = self.presence_handler - extra_presence_users = set() - for room_id in newly_joined_rooms: - users = yield self.store.get_users_in_room(event.room_id) - extra_presence_users.update(users) - - # For each new member, send down presence. - for joined_sync in joined: - it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values()) - for event in it: - if event.type == EventTypes.Member: - if event.membership == Membership.JOIN: - extra_presence_users.add(event.state_key) - - states = yield presence_handler.get_states( - [u for u in extra_presence_users if u != user_id], - as_event=True, - ) - presence.extend(states) - - account_data_for_user = sync_config.filter_collection.filter_account_data( - self.account_data_for_user(account_data) - ) - - presence = sync_config.filter_collection.filter_presence( - presence - ) - - defer.returnValue(SyncResult( - presence=presence, - account_data=account_data_for_user, - joined=joined, - invited=invited, - archived=archived, - next_batch=now_token, - )) - @defer.inlineCallbacks def load_filtered_recents(self, room_id, sync_config, now_token, since_token=None, recents=None, newly_joined_room=False): @@ -696,6 +297,10 @@ class SyncHandler(object): else: limited = False + if since_token: + if not now_token.is_after(since_token): + limited = False + if recents is not None: recents = sync_config.filter_collection.filter_room_timeline(recents) recents = yield filter_events_for_client( @@ -748,103 +353,6 @@ class SyncHandler(object): limited=limited or newly_joined_room )) - @defer.inlineCallbacks - def incremental_sync_with_gap_for_room(self, room_id, sync_config, - since_token, now_token, - ephemeral_by_room, tags_by_room, - account_data_by_room, - batch, full_state=False): - state = yield self.compute_state_delta( - room_id, batch, sync_config, since_token, now_token, - full_state=full_state - ) - - account_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) - - account_data = sync_config.filter_collection.filter_room_account_data( - account_data - ) - - ephemeral = sync_config.filter_collection.filter_room_ephemeral( - ephemeral_by_room.get(room_id, []) - ) - - unread_notifications = {} - room_sync = JoinedSyncResult( - room_id=room_id, - timeline=batch, - state=state, - ephemeral=ephemeral, - account_data=account_data, - unread_notifications=unread_notifications, - ) - - if room_sync: - notifs = yield self.unread_notifs_for_room_id( - room_id, sync_config - ) - - if notifs is not None: - unread_notifications["notification_count"] = notifs["notify_count"] - unread_notifications["highlight_count"] = notifs["highlight_count"] - - logger.debug("Room sync: %r", room_sync) - - defer.returnValue(room_sync) - - @defer.inlineCallbacks - def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id, - since_token, tags_by_room, - account_data_by_room, full_state, - leave_token=None): - """ Get the incremental delta needed to bring the client up to date for - the archived room. - Returns: - A Deferred ArchivedSyncResult - """ - - if not leave_token: - stream_token = yield self.store.get_stream_token_for_event( - leave_event_id - ) - - leave_token = since_token.copy_and_replace("room_key", stream_token) - - if since_token and since_token.is_after(leave_token): - defer.returnValue(None) - - batch = yield self.load_filtered_recents( - room_id, sync_config, leave_token, since_token, - ) - - logger.debug("Recents %r", batch) - - state_events_delta = yield self.compute_state_delta( - room_id, batch, sync_config, since_token, leave_token, - full_state=full_state - ) - - account_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) - - account_data = sync_config.filter_collection.filter_room_account_data( - account_data - ) - - room_sync = ArchivedSyncResult( - room_id=room_id, - timeline=batch, - state=state_events_delta, - account_data=account_data, - ) - - logger.debug("Room sync: %r", room_sync) - - defer.returnValue(room_sync) - @defer.inlineCallbacks def get_state_after_event(self, event): """ @@ -1010,6 +518,457 @@ class SyncHandler(object): # count is whatever it was last time. defer.returnValue(None) + @defer.inlineCallbacks + def generate_sync_result(self, sync_config, since_token=None, full_state=False): + now_token = yield self.event_sources.get_current_token() + + sync_result_builer = SyncResultBuilder( + sync_config, full_state, + since_token=since_token, + now_token=now_token, + ) + + account_data_by_room = yield self.generate_sync_entry_for_account_data( + sync_result_builer + ) + + newly_joined_rooms, newly_joined_users = yield self.generate_sync_entry_for_rooms( + sync_result_builer, account_data_by_room + ) + + yield self.generate_sync_entry_for_presence( + sync_result_builer, newly_joined_rooms, newly_joined_users + ) + + defer.returnValue(SyncResult( + presence=sync_result_builer.presence, + account_data=sync_result_builer.account_data, + joined=sync_result_builer.joined, + invited=sync_result_builer.invited, + archived=sync_result_builer.archived, + next_batch=sync_result_builer.now_token, + )) + + @defer.inlineCallbacks + def generate_sync_entry_for_account_data(self, sync_result_builer): + sync_config = sync_result_builer.sync_config + user_id = sync_result_builer.sync_config.user.to_string() + since_token = sync_result_builer.since_token + + if since_token and not sync_result_builer.full_state: + account_data, account_data_by_room = ( + yield self.store.get_updated_account_data_for_user( + user_id, + since_token.account_data_key, + ) + ) + + push_rules_changed = yield self.store.have_push_rules_changed_for_user( + user_id, int(since_token.push_rules_key) + ) + + if push_rules_changed: + account_data["m.push_rules"] = yield self.push_rules_for_user( + sync_config.user + ) + else: + account_data, account_data_by_room = ( + yield self.store.get_account_data_for_user( + sync_config.user.to_string() + ) + ) + + account_data['m.push_rules'] = yield self.push_rules_for_user( + sync_config.user + ) + + account_data_for_user = sync_config.filter_collection.filter_account_data( + self.account_data_for_user(account_data) + ) + + sync_result_builer.account_data = account_data_for_user + + defer.returnValue(account_data_by_room) + + @defer.inlineCallbacks + def generate_sync_entry_for_presence(self, sync_result_builer, newly_joined_rooms, + newly_joined_users): + now_token = sync_result_builer.now_token + sync_config = sync_result_builer.sync_config + user = sync_result_builer.sync_config.user + + presence_source = self.event_sources.sources["presence"] + + since_token = sync_result_builer.since_token + if since_token and not sync_result_builer.full_state: + presence_key = since_token.presence_key + else: + presence_key = None + + presence, presence_key = yield presence_source.get_new_events( + user=user, + from_key=presence_key, + is_guest=sync_config.is_guest, + ) + sync_result_builer.now_token = now_token.copy_and_replace( + "presence_key", presence_key + ) + + extra_users_ids = set(newly_joined_users) + for room_id in newly_joined_rooms: + users = yield self.store.get_users_in_room(room_id) + extra_users_ids.update(users) + extra_users_ids.discard(user.to_string()) + + states = yield self.presence_handler.get_states( + extra_users_ids, + as_event=True, + ) + presence.extend(states) + + presence = sync_config.filter_collection.filter_presence( + presence + ) + + sync_result_builer.presence = presence + + @defer.inlineCallbacks + def generate_sync_entry_for_rooms(self, sync_result_builer, account_data_by_room): + user_id = sync_result_builer.sync_config.user.to_string() + + now_token, ephemeral_by_room = yield self.ephemeral_by_room( + sync_result_builer.sync_config, sync_result_builer.now_token + ) + sync_result_builer.now_token = now_token + + ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( + "m.ignored_user_list", user_id=user_id, + ) + + if ignored_account_data: + ignored_users = ignored_account_data.get("ignored_users", {}).keys() + else: + ignored_users = frozenset() + + if sync_result_builer.since_token: + res = yield self._get_rooms_changed(sync_result_builer, ignored_users) + joined, invited, archived, newly_joined_rooms = res + + tags_by_room = yield self.store.get_updated_tags( + user_id, + sync_result_builer.since_token.account_data_key, + ) + else: + res = yield self._get_all_rooms(sync_result_builer, ignored_users) + joined, invited, archived, newly_joined_rooms = res + + tags_by_room = yield self.store.get_tags_for_user(user_id) + + for room_entry in joined: + yield self._generate_room_entry( + "joined", + sync_result_builer, + ignored_users, + room_entry, + ephemeral=ephemeral_by_room.get(room_entry.room_id, []), + tags=tags_by_room.get(room_entry.room_id), + account_data=account_data_by_room.get(room_entry.room_id, {}), + always_include=sync_result_builer.full_state, + ) + for room_entry in archived: + yield self._generate_room_entry( + "archived", + sync_result_builer, + ignored_users, + room_entry, + ephemeral=ephemeral_by_room.get(room_entry.room_id, []), + tags=tags_by_room.get(room_entry.room_id), + account_data=account_data_by_room.get(room_entry.room_id, {}), + always_include=sync_result_builer.full_state, + ) + + sync_result_builer.invited.extend(invited) + + # Now we want to get any newly joined users + newly_joined_users = set() + for joined_sync in sync_result_builer.joined: + it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values()) + for event in it: + if event.type == EventTypes.Member: + if event.membership == Membership.JOIN: + newly_joined_users.add(event.state_key) + + defer.returnValue((newly_joined_rooms, newly_joined_users)) + + @defer.inlineCallbacks + def _get_rooms_changed(self, sync_result_builer, ignored_users): + user_id = sync_result_builer.sync_config.user.to_string() + since_token = sync_result_builer.since_token + now_token = sync_result_builer.now_token + sync_config = sync_result_builer.sync_config + + assert since_token + + app_service = yield self.store.get_app_service_by_user_id(user_id) + if app_service: + rooms = yield self.store.get_app_service_rooms(app_service) + joined_room_ids = set(r.room_id for r in rooms) + else: + rooms = yield self.store.get_rooms_for_user(user_id) + joined_room_ids = set(r.room_id for r in rooms) + + # Get a list of membership change events that have happened. + rooms_changed = yield self.store.get_membership_changes_for_user( + user_id, since_token.room_key, now_token.room_key + ) + + mem_change_events_by_room_id = {} + for event in rooms_changed: + mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) + + newly_joined_rooms = [] + archived = [] + invited = [] + for room_id, events in mem_change_events_by_room_id.items(): + non_joins = [e for e in events if e.membership != Membership.JOIN] + has_join = len(non_joins) != len(events) + + # We want to figure out if we joined the room at some point since + # the last sync (even if we have since left). This is to make sure + # we do send down the room, and with full state, where necessary + if room_id in joined_room_ids or has_join: + old_state = yield self.get_state_at(room_id, since_token) + old_mem_ev = old_state.get((EventTypes.Member, user_id), None) + if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: + newly_joined_rooms.append(room_id) + + if room_id in joined_room_ids: + continue + + if not non_joins: + continue + + # Only bother if we're still currently invited + should_invite = non_joins[-1].membership == Membership.INVITE + if should_invite: + if event.sender not in ignored_users: + room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) + if room_sync: + invited.append(room_sync) + + # Always include leave/ban events. Just take the last one. + # TODO: How do we handle ban -> leave in same batch? + leave_events = [ + e for e in non_joins + if e.membership in (Membership.LEAVE, Membership.BAN) + ] + + if leave_events: + leave_event = leave_events[-1] + leave_stream_token = yield self.store.get_stream_token_for_event( + leave_event.event_id + ) + leave_token = since_token.copy_and_replace( + "room_key", leave_stream_token + ) + + if since_token and since_token.is_after(leave_token): + continue + + archived.append(RoomSyncResultBuilder( + room_id=room_id, + events=None, + newly_joined=room_id in newly_joined_rooms, + full_state=False, + since_token=since_token, + upto_token=leave_token, + )) + + timeline_limit = sync_config.filter_collection.timeline_limit() + + # Get all events for rooms we're currently joined to. + room_to_events = yield self.store.get_room_events_stream_for_rooms( + room_ids=joined_room_ids, + from_key=since_token.room_key, + to_key=now_token.room_key, + limit=timeline_limit + 1, + ) + + joined = [] + # We loop through all room ids, even if there are no new events, in case + # there are non room events taht we need to notify about. + for room_id in joined_room_ids: + room_entry = room_to_events.get(room_id, None) + + if room_entry: + events, start_key = room_entry + + prev_batch_token = now_token.copy_and_replace("room_key", start_key) + + joined.append(RoomSyncResultBuilder( + room_id=room_id, + events=events, + newly_joined=room_id in newly_joined_rooms, + full_state=False, + since_token=None if room_id in newly_joined_rooms else since_token, + upto_token=prev_batch_token, + )) + else: + joined.append(RoomSyncResultBuilder( + room_id=room_id, + events=[], + newly_joined=room_id in newly_joined_rooms, + full_state=False, + since_token=since_token, + upto_token=since_token, + )) + + defer.returnValue((joined, invited, archived, newly_joined_rooms)) + + @defer.inlineCallbacks + def _get_all_rooms(self, sync_result_builer, ignored_users): + user_id = sync_result_builer.sync_config.user.to_string() + since_token = sync_result_builer.since_token + now_token = sync_result_builer.now_token + sync_config = sync_result_builer.sync_config + + membership_list = ( + Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN + ) + + room_list = yield self.store.get_rooms_for_user_where_membership_is( + user_id=user_id, + membership_list=membership_list + ) + + joined = [] + invited = [] + archived = [] + + for event in room_list: + if event.membership == Membership.JOIN: + joined.append(RoomSyncResultBuilder( + room_id=event.room_id, + events=None, + newly_joined=False, + full_state=True, + since_token=since_token, + upto_token=now_token, + )) + elif event.membership == Membership.INVITE: + if event.sender in ignored_users: + continue + invite = yield self.store.get_event(event.event_id) + invited.append(InvitedSyncResult( + room_id=event.room_id, + invite=invite, + )) + elif event.membership in (Membership.LEAVE, Membership.BAN): + # Always send down rooms we were banned or kicked from. + if not sync_config.filter_collection.include_leave: + if event.membership == Membership.LEAVE: + if user_id == event.sender: + continue + + leave_token = now_token.copy_and_replace( + "room_key", "s%d" % (event.stream_ordering,) + ) + archived.append(RoomSyncResultBuilder( + room_id=event.room_id, + events=None, + newly_joined=False, + full_state=True, + since_token=since_token, + upto_token=leave_token, + )) + + defer.returnValue((joined, invited, archived, [])) + + @defer.inlineCallbacks + def _generate_room_entry(self, room_type, sync_result_builer, ignored_users, + room_builder, ephemeral, tags, account_data, + always_include=False): + since_token = sync_result_builer.since_token + now_token = sync_result_builer.now_token + sync_config = sync_result_builer.sync_config + + room_id = room_builder.room_id + events = room_builder.events + newly_joined = room_builder.newly_joined + full_state = ( + room_builder.full_state + or newly_joined + or sync_result_builer.full_state + ) + since_token = room_builder.since_token + upto_token = room_builder.upto_token + + batch = yield self.load_filtered_recents( + room_id, sync_config, + now_token=upto_token, + since_token=since_token, + recents=events, + newly_joined_room=newly_joined, # FIXME + ) + + account_data_events = [] + if tags is not None: + account_data_events.append({ + "type": "m.tag", + "content": {"tags": tags}, + }) + + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + account_data = sync_config.filter_collection.filter_room_account_data( + account_data_events + ) + + ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral) + + if not (always_include or batch or account_data or ephemeral or full_state): + return + + state = yield self.compute_state_delta( + room_id, batch, sync_config, since_token, now_token, + full_state=full_state + ) + + if room_type == "joined": + unread_notifications = {} + room_sync = JoinedSyncResult( + room_id=room_id, + timeline=batch, + state=state, + ephemeral=ephemeral, + account_data=account_data_events, + unread_notifications=unread_notifications, + ) + + if room_sync or always_include: + notifs = yield self.unread_notifs_for_room_id( + room_id, sync_config + ) + + if notifs is not None: + unread_notifications["notification_count"] = notifs["notify_count"] + unread_notifications["highlight_count"] = notifs["highlight_count"] + + sync_result_builer.joined.append(room_sync) + elif room_type == "archived": + room_sync = ArchivedSyncResult( + room_id=room_id, + timeline=batch, + state=state, + account_data=account_data, + ) + if room_sync or always_include: + sync_result_builer.archived.append(room_sync) + def _action_has_highlight(actions): for action in actions: @@ -1057,3 +1016,28 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): (e.type, e.state_key): e for e in evs } + + +class SyncResultBuilder(object): + def __init__(self, sync_config, full_state, since_token, now_token): + self.sync_config = sync_config + self.full_state = full_state + self.since_token = since_token + self.now_token = now_token + + self.presence = [] + self.account_data = [] + self.joined = [] + self.invited = [] + self.archived = [] + + +class RoomSyncResultBuilder(object): + def __init__(self, room_id, events, newly_joined, full_state, since_token, + upto_token): + self.room_id = room_id + self.events = events + self.newly_joined = newly_joined + self.full_state = full_state + self.since_token = since_token + self.upto_token = upto_token From c0c79ef444ca0f21e8324abc8a813026aaf6cf17 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 May 2016 18:21:27 +0100 Subject: [PATCH 02/10] Add back concurrently_execute --- synapse/handlers/sync.py | 34 +++++++++------------------------- 1 file changed, 9 insertions(+), 25 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 80eccf19ae..bc6d6af133 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes from synapse.util.async import concurrently_execute from synapse.util.logcontext import LoggingContext @@ -478,26 +477,6 @@ class SyncHandler(object): for e in sync_config.filter_collection.filter_room_state(state.values()) }) - def check_joined_room(self, sync_config, state_delta): - """ - Check if the user has just joined the given room (so should - be given the full state) - - Args: - sync_config(synapse.handlers.sync.SyncConfig): - state_delta(dict[(str,str), synapse.events.FrozenEvent]): the - difference in state since the last sync - - Returns: - A deferred Tuple (state_delta, limited) - """ - join_event = state_delta.get(( - EventTypes.Member, sync_config.user.to_string()), None) - if join_event is not None: - if join_event.content["membership"] == Membership.JOIN: - return True - return False - @defer.inlineCallbacks def unread_notifs_for_room_id(self, room_id, sync_config): with Measure(self.clock, "unread_notifs_for_room_id"): @@ -664,8 +643,8 @@ class SyncHandler(object): tags_by_room = yield self.store.get_tags_for_user(user_id) - for room_entry in joined: - yield self._generate_room_entry( + def handle_joined(room_entry): + return self._generate_room_entry( "joined", sync_result_builer, ignored_users, @@ -675,8 +654,11 @@ class SyncHandler(object): account_data=account_data_by_room.get(room_entry.room_id, {}), always_include=sync_result_builer.full_state, ) - for room_entry in archived: - yield self._generate_room_entry( + + yield concurrently_execute(handle_joined, joined, 10) + + def handle_archived(room_entry): + return self._generate_room_entry( "archived", sync_result_builer, ignored_users, @@ -687,6 +669,8 @@ class SyncHandler(object): always_include=sync_result_builer.full_state, ) + yield concurrently_execute(handle_archived, archived, 10) + sync_result_builer.invited.extend(invited) # Now we want to get any newly joined users From 137e6a45577d5850ef6936670791af12c2fe74d9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 09:43:35 +0100 Subject: [PATCH 03/10] Shuffle things room --- synapse/handlers/sync.py | 70 +++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 37 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index bc6d6af133..6b7c6a436e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -499,6 +499,10 @@ class SyncHandler(object): @defer.inlineCallbacks def generate_sync_result(self, sync_config, since_token=None, full_state=False): + # NB: The now_token gets changed by some of the generate_sync_* methods, + # this is due to some of the underlying streams not supporting the ability + # to query up to a given point. + # Always use the `now_token` in `SyncResultBuilder` now_token = yield self.event_sources.get_current_token() sync_result_builer = SyncResultBuilder( @@ -511,9 +515,10 @@ class SyncHandler(object): sync_result_builer ) - newly_joined_rooms, newly_joined_users = yield self.generate_sync_entry_for_rooms( + res = yield self.generate_sync_entry_for_rooms( sync_result_builer, account_data_by_room ) + newly_joined_rooms, newly_joined_users = res yield self.generate_sync_entry_for_presence( sync_result_builer, newly_joined_rooms, newly_joined_users @@ -631,7 +636,7 @@ class SyncHandler(object): if sync_result_builer.since_token: res = yield self._get_rooms_changed(sync_result_builer, ignored_users) - joined, invited, archived, newly_joined_rooms = res + room_entries, invited, newly_joined_rooms = res tags_by_room = yield self.store.get_updated_tags( user_id, @@ -639,13 +644,12 @@ class SyncHandler(object): ) else: res = yield self._get_all_rooms(sync_result_builer, ignored_users) - joined, invited, archived, newly_joined_rooms = res + room_entries, invited, newly_joined_rooms = res tags_by_room = yield self.store.get_tags_for_user(user_id) - def handle_joined(room_entry): + def handle_room_entries(room_entry): return self._generate_room_entry( - "joined", sync_result_builer, ignored_users, room_entry, @@ -655,21 +659,7 @@ class SyncHandler(object): always_include=sync_result_builer.full_state, ) - yield concurrently_execute(handle_joined, joined, 10) - - def handle_archived(room_entry): - return self._generate_room_entry( - "archived", - sync_result_builer, - ignored_users, - room_entry, - ephemeral=ephemeral_by_room.get(room_entry.room_id, []), - tags=tags_by_room.get(room_entry.room_id), - account_data=account_data_by_room.get(room_entry.room_id, {}), - always_include=sync_result_builer.full_state, - ) - - yield concurrently_execute(handle_archived, archived, 10) + yield concurrently_execute(handle_room_entries, room_entries, 10) sync_result_builer.invited.extend(invited) @@ -711,7 +701,7 @@ class SyncHandler(object): mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) newly_joined_rooms = [] - archived = [] + room_entries = [] invited = [] for room_id, events in mem_change_events_by_room_id.items(): non_joins = [e for e in events if e.membership != Membership.JOIN] @@ -759,8 +749,9 @@ class SyncHandler(object): if since_token and since_token.is_after(leave_token): continue - archived.append(RoomSyncResultBuilder( + room_entries.append(RoomSyncResultBuilder( room_id=room_id, + rtype="archived", events=None, newly_joined=room_id in newly_joined_rooms, full_state=False, @@ -778,7 +769,6 @@ class SyncHandler(object): limit=timeline_limit + 1, ) - joined = [] # We loop through all room ids, even if there are no new events, in case # there are non room events taht we need to notify about. for room_id in joined_room_ids: @@ -789,8 +779,9 @@ class SyncHandler(object): prev_batch_token = now_token.copy_and_replace("room_key", start_key) - joined.append(RoomSyncResultBuilder( + room_entries.append(RoomSyncResultBuilder( room_id=room_id, + rtype="joined", events=events, newly_joined=room_id in newly_joined_rooms, full_state=False, @@ -798,8 +789,9 @@ class SyncHandler(object): upto_token=prev_batch_token, )) else: - joined.append(RoomSyncResultBuilder( + room_entries.append(RoomSyncResultBuilder( room_id=room_id, + rtype="joined", events=[], newly_joined=room_id in newly_joined_rooms, full_state=False, @@ -807,7 +799,7 @@ class SyncHandler(object): upto_token=since_token, )) - defer.returnValue((joined, invited, archived, newly_joined_rooms)) + defer.returnValue((room_entries, invited, newly_joined_rooms)) @defer.inlineCallbacks def _get_all_rooms(self, sync_result_builer, ignored_users): @@ -825,14 +817,14 @@ class SyncHandler(object): membership_list=membership_list ) - joined = [] + room_entries = [] invited = [] - archived = [] for event in room_list: if event.membership == Membership.JOIN: - joined.append(RoomSyncResultBuilder( + room_entries.append(RoomSyncResultBuilder( room_id=event.room_id, + rtype="joined", events=None, newly_joined=False, full_state=True, @@ -857,8 +849,9 @@ class SyncHandler(object): leave_token = now_token.copy_and_replace( "room_key", "s%d" % (event.stream_ordering,) ) - archived.append(RoomSyncResultBuilder( + room_entries.append(RoomSyncResultBuilder( room_id=event.room_id, + rtype="archived", events=None, newly_joined=False, full_state=True, @@ -866,10 +859,10 @@ class SyncHandler(object): upto_token=leave_token, )) - defer.returnValue((joined, invited, archived, [])) + defer.returnValue((room_entries, invited, [])) @defer.inlineCallbacks - def _generate_room_entry(self, room_type, sync_result_builer, ignored_users, + def _generate_room_entry(self, sync_result_builer, ignored_users, room_builder, ephemeral, tags, account_data, always_include=False): since_token = sync_result_builer.since_token @@ -892,7 +885,7 @@ class SyncHandler(object): now_token=upto_token, since_token=since_token, recents=events, - newly_joined_room=newly_joined, # FIXME + newly_joined_room=newly_joined, ) account_data_events = [] @@ -922,7 +915,7 @@ class SyncHandler(object): full_state=full_state ) - if room_type == "joined": + if room_builder.rtype == "joined": unread_notifications = {} room_sync = JoinedSyncResult( room_id=room_id, @@ -943,7 +936,7 @@ class SyncHandler(object): unread_notifications["highlight_count"] = notifs["highlight_count"] sync_result_builer.joined.append(room_sync) - elif room_type == "archived": + elif room_builder.rtype == "archived": room_sync = ArchivedSyncResult( room_id=room_id, timeline=batch, @@ -952,6 +945,8 @@ class SyncHandler(object): ) if room_sync or always_include: sync_result_builer.archived.append(room_sync) + else: + raise Exception("Unrecognized rtype: %r", room_builder.rtype) def _action_has_highlight(actions): @@ -1017,9 +1012,10 @@ class SyncResultBuilder(object): class RoomSyncResultBuilder(object): - def __init__(self, room_id, events, newly_joined, full_state, since_token, - upto_token): + def __init__(self, room_id, rtype, events, newly_joined, full_state, + since_token, upto_token): self.room_id = room_id + self.rtype = rtype self.events = events self.newly_joined = newly_joined self.full_state = full_state From 84f94e4cbbd8c79d867503159b564aaf233e46d5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 10:14:53 +0100 Subject: [PATCH 04/10] Add comments --- synapse/handlers/sync.py | 112 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 105 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6b7c6a436e..271dd4c147 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -499,6 +499,17 @@ class SyncHandler(object): @defer.inlineCallbacks def generate_sync_result(self, sync_config, since_token=None, full_state=False): + """Generates a sync result. + + Args: + sync_config (SyncConfig) + since_token (StreamToken) + full_state (bool) + + Returns: + Deferred(SyncResult) + """ + # NB: The now_token gets changed by some of the generate_sync_* methods, # this is due to some of the underlying streams not supporting the ability # to query up to a given point. @@ -511,16 +522,16 @@ class SyncHandler(object): now_token=now_token, ) - account_data_by_room = yield self.generate_sync_entry_for_account_data( + account_data_by_room = yield self._generate_sync_entry_for_account_data( sync_result_builer ) - res = yield self.generate_sync_entry_for_rooms( + res = yield self._generate_sync_entry_for_rooms( sync_result_builer, account_data_by_room ) newly_joined_rooms, newly_joined_users = res - yield self.generate_sync_entry_for_presence( + yield self._generate_sync_entry_for_presence( sync_result_builer, newly_joined_rooms, newly_joined_users ) @@ -534,7 +545,16 @@ class SyncHandler(object): )) @defer.inlineCallbacks - def generate_sync_entry_for_account_data(self, sync_result_builer): + def _generate_sync_entry_for_account_data(self, sync_result_builer): + """Generates the account data portion of the sync response. Populates + `sync_result_builer` with the result. + + Args: + sync_result_builer(SyncResultBuilder) + + Returns: + Deferred(dict): A dictionary containing the per room account data. + """ sync_config = sync_result_builer.sync_config user_id = sync_result_builer.sync_config.user.to_string() since_token = sync_result_builer.since_token @@ -575,8 +595,18 @@ class SyncHandler(object): defer.returnValue(account_data_by_room) @defer.inlineCallbacks - def generate_sync_entry_for_presence(self, sync_result_builer, newly_joined_rooms, - newly_joined_users): + def _generate_sync_entry_for_presence(self, sync_result_builer, newly_joined_rooms, + newly_joined_users): + """Generates the presence portion of the sync response. Populates the + `sync_result_builer` with the result. + + Args: + sync_result_builer(SyncResultBuilder) + newly_joined_rooms(list): List of rooms that the user has joined + since the last sync (or empty if an initial sync) + newly_joined_users(list): List of users that have joined rooms + since the last sync (or empty if an initial sync) + """ now_token = sync_result_builer.now_token sync_config = sync_result_builer.sync_config user = sync_result_builer.sync_config.user @@ -617,7 +647,18 @@ class SyncHandler(object): sync_result_builer.presence = presence @defer.inlineCallbacks - def generate_sync_entry_for_rooms(self, sync_result_builer, account_data_by_room): + def _generate_sync_entry_for_rooms(self, sync_result_builer, account_data_by_room): + """Generates the rooms portion of the sync response. Populates the + `sync_result_builer` with the result. + + Args: + sync_result_builer(SyncResultBuilder) + account_data_by_room(dict): Dictionary of per room account data + + Returns: + Deferred(tuple): Returns a 2-tuple of + `(newly_joined_rooms, newly_joined_users)` + """ user_id = sync_result_builer.sync_config.user.to_string() now_token, ephemeral_by_room = yield self.ephemeral_by_room( @@ -676,6 +717,16 @@ class SyncHandler(object): @defer.inlineCallbacks def _get_rooms_changed(self, sync_result_builer, ignored_users): + """Gets the the changes that have happened since the last sync. + + Args: + sync_result_builer(SyncResultBuilder) + ignored_users(set(str)): Set of users ignored by user. + + Returns: + Deferred(tuple): Returns a tuple of the form: + `([RoomSyncResultBuilder], [InvitedSyncResult], newly_joined_rooms)` + """ user_id = sync_result_builer.sync_config.user.to_string() since_token = sync_result_builer.since_token now_token = sync_result_builer.now_token @@ -803,6 +854,17 @@ class SyncHandler(object): @defer.inlineCallbacks def _get_all_rooms(self, sync_result_builer, ignored_users): + """Returns entries for all rooms for the user. + + Args: + sync_result_builer(SyncResultBuilder) + ignored_users(set(str)): Set of users ignored by user. + + Returns: + Deferred(tuple): Returns a tuple of the form: + `([RoomSyncResultBuilder], [InvitedSyncResult], [])` + """ + user_id = sync_result_builer.sync_config.user.to_string() since_token = sync_result_builer.since_token now_token = sync_result_builer.now_token @@ -865,6 +927,20 @@ class SyncHandler(object): def _generate_room_entry(self, sync_result_builer, ignored_users, room_builder, ephemeral, tags, account_data, always_include=False): + """Populates the `joined` and `archived` section of `sync_result_builer` + based on the `room_builder`. + + Args: + sync_result_builer(SyncResultBuilder) + ignored_users(set(str)): Set of users ignored by user. + room_builder(RoomSyncResultBuilder) + ephemeral(list): List of new ephemeral events for room + tags(list): List of *all* tags for room, or None if there has been + no change. + account_data(list): List of new account data for room + always_include(bool): Always include this room in the sync response, + even if empty. + """ since_token = sync_result_builer.since_token now_token = sync_result_builer.now_token sync_config = sync_result_builer.sync_config @@ -998,7 +1074,15 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): class SyncResultBuilder(object): + "Used to help build up a new SyncResult for a user" def __init__(self, sync_config, full_state, since_token, now_token): + """ + Args: + sync_config(SyncConfig) + full_state(bool): The full_state flag as specified by user + since_token(StreamToken): The token supplied by user, or None. + now_token(StreamToken): The token to sync up to. + """ self.sync_config = sync_config self.full_state = full_state self.since_token = since_token @@ -1012,8 +1096,22 @@ class SyncResultBuilder(object): class RoomSyncResultBuilder(object): + """Stores information needed to create either a `JoinedSyncResult` or + `ArchivedSyncResult`. + """ def __init__(self, room_id, rtype, events, newly_joined, full_state, since_token, upto_token): + """ + Args: + room_id(str) + rtype(str): One of `"joined"` or `"archived"` + events(list): List of events to include in the room, (more events + may be added when generating result). + newly_joined(bool): If the user has newly joined the room + full_state(bool): Whether the full state should be sent in result + since_token(StreamToken): Earliest point to return events from, or None + upto_token(StreamToken): Latest point to return events from. + """ self.room_id = room_id self.rtype = rtype self.events = events From 79bea8ab9a4c4cb4a0907784603698731424c00a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 10:22:24 +0100 Subject: [PATCH 05/10] Inline function. Make load_filtered_recents private --- synapse/handlers/sync.py | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 271dd4c147..3aa4432d68 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -203,17 +203,6 @@ class SyncHandler(object): rules = format_push_rules_for_user(user, rawrules, enabled_map) defer.returnValue(rules) - def account_data_for_user(self, account_data): - account_data_events = [] - - for account_data_type, content in account_data.items(): - account_data_events.append({ - "type": account_data_type, - "content": content, - }) - - return account_data_events - @defer.inlineCallbacks def ephemeral_by_room(self, sync_config, now_token, since_token=None): """Get the ephemeral events for each room the user is in @@ -277,8 +266,8 @@ class SyncHandler(object): defer.returnValue((now_token, ephemeral_by_room)) @defer.inlineCallbacks - def load_filtered_recents(self, room_id, sync_config, now_token, - since_token=None, recents=None, newly_joined_room=False): + def _load_filtered_recents(self, room_id, sync_config, now_token, + since_token=None, recents=None, newly_joined_room=False): """ Returns: a Deferred TimelineBatch @@ -586,9 +575,10 @@ class SyncHandler(object): sync_config.user ) - account_data_for_user = sync_config.filter_collection.filter_account_data( - self.account_data_for_user(account_data) - ) + account_data_for_user = sync_config.filter_collection.filter_account_data([ + {"type": account_data_type, "content": content} + for account_data_type, content in account_data.items() + ]) sync_result_builer.account_data = account_data_for_user @@ -956,7 +946,7 @@ class SyncHandler(object): since_token = room_builder.since_token upto_token = room_builder.upto_token - batch = yield self.load_filtered_recents( + batch = yield self._load_filtered_recents( room_id, sync_config, now_token=upto_token, since_token=since_token, From be2c67738640ff88fcf63701c9e3d82afc385e47 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 10:53:03 +0100 Subject: [PATCH 06/10] Spell builder correctly --- synapse/handlers/sync.py | 126 +++++++++++++++++++-------------------- 1 file changed, 63 insertions(+), 63 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 3aa4432d68..12a4cc8b57 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -505,50 +505,50 @@ class SyncHandler(object): # Always use the `now_token` in `SyncResultBuilder` now_token = yield self.event_sources.get_current_token() - sync_result_builer = SyncResultBuilder( + sync_result_builder = SyncResultBuilder( sync_config, full_state, since_token=since_token, now_token=now_token, ) account_data_by_room = yield self._generate_sync_entry_for_account_data( - sync_result_builer + sync_result_builder ) res = yield self._generate_sync_entry_for_rooms( - sync_result_builer, account_data_by_room + sync_result_builder, account_data_by_room ) newly_joined_rooms, newly_joined_users = res yield self._generate_sync_entry_for_presence( - sync_result_builer, newly_joined_rooms, newly_joined_users + sync_result_builder, newly_joined_rooms, newly_joined_users ) defer.returnValue(SyncResult( - presence=sync_result_builer.presence, - account_data=sync_result_builer.account_data, - joined=sync_result_builer.joined, - invited=sync_result_builer.invited, - archived=sync_result_builer.archived, - next_batch=sync_result_builer.now_token, + presence=sync_result_builder.presence, + account_data=sync_result_builder.account_data, + joined=sync_result_builder.joined, + invited=sync_result_builder.invited, + archived=sync_result_builder.archived, + next_batch=sync_result_builder.now_token, )) @defer.inlineCallbacks - def _generate_sync_entry_for_account_data(self, sync_result_builer): + def _generate_sync_entry_for_account_data(self, sync_result_builder): """Generates the account data portion of the sync response. Populates - `sync_result_builer` with the result. + `sync_result_builder` with the result. Args: - sync_result_builer(SyncResultBuilder) + sync_result_builder(SyncResultBuilder) Returns: Deferred(dict): A dictionary containing the per room account data. """ - sync_config = sync_result_builer.sync_config - user_id = sync_result_builer.sync_config.user.to_string() - since_token = sync_result_builer.since_token + sync_config = sync_result_builder.sync_config + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token - if since_token and not sync_result_builer.full_state: + if since_token and not sync_result_builder.full_state: account_data, account_data_by_room = ( yield self.store.get_updated_account_data_for_user( user_id, @@ -580,31 +580,31 @@ class SyncHandler(object): for account_data_type, content in account_data.items() ]) - sync_result_builer.account_data = account_data_for_user + sync_result_builder.account_data = account_data_for_user defer.returnValue(account_data_by_room) @defer.inlineCallbacks - def _generate_sync_entry_for_presence(self, sync_result_builer, newly_joined_rooms, + def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_rooms, newly_joined_users): """Generates the presence portion of the sync response. Populates the - `sync_result_builer` with the result. + `sync_result_builder` with the result. Args: - sync_result_builer(SyncResultBuilder) + sync_result_builder(SyncResultBuilder) newly_joined_rooms(list): List of rooms that the user has joined since the last sync (or empty if an initial sync) newly_joined_users(list): List of users that have joined rooms since the last sync (or empty if an initial sync) """ - now_token = sync_result_builer.now_token - sync_config = sync_result_builer.sync_config - user = sync_result_builer.sync_config.user + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config + user = sync_result_builder.sync_config.user presence_source = self.event_sources.sources["presence"] - since_token = sync_result_builer.since_token - if since_token and not sync_result_builer.full_state: + since_token = sync_result_builder.since_token + if since_token and not sync_result_builder.full_state: presence_key = since_token.presence_key else: presence_key = None @@ -614,7 +614,7 @@ class SyncHandler(object): from_key=presence_key, is_guest=sync_config.is_guest, ) - sync_result_builer.now_token = now_token.copy_and_replace( + sync_result_builder.now_token = now_token.copy_and_replace( "presence_key", presence_key ) @@ -634,27 +634,27 @@ class SyncHandler(object): presence ) - sync_result_builer.presence = presence + sync_result_builder.presence = presence @defer.inlineCallbacks - def _generate_sync_entry_for_rooms(self, sync_result_builer, account_data_by_room): + def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room): """Generates the rooms portion of the sync response. Populates the - `sync_result_builer` with the result. + `sync_result_builder` with the result. Args: - sync_result_builer(SyncResultBuilder) + sync_result_builder(SyncResultBuilder) account_data_by_room(dict): Dictionary of per room account data Returns: Deferred(tuple): Returns a 2-tuple of `(newly_joined_rooms, newly_joined_users)` """ - user_id = sync_result_builer.sync_config.user.to_string() + user_id = sync_result_builder.sync_config.user.to_string() now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_result_builer.sync_config, sync_result_builer.now_token + sync_result_builder.sync_config, sync_result_builder.now_token ) - sync_result_builer.now_token = now_token + sync_result_builder.now_token = now_token ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( "m.ignored_user_list", user_id=user_id, @@ -665,38 +665,38 @@ class SyncHandler(object): else: ignored_users = frozenset() - if sync_result_builer.since_token: - res = yield self._get_rooms_changed(sync_result_builer, ignored_users) + if sync_result_builder.since_token: + res = yield self._get_rooms_changed(sync_result_builder, ignored_users) room_entries, invited, newly_joined_rooms = res tags_by_room = yield self.store.get_updated_tags( user_id, - sync_result_builer.since_token.account_data_key, + sync_result_builder.since_token.account_data_key, ) else: - res = yield self._get_all_rooms(sync_result_builer, ignored_users) + res = yield self._get_all_rooms(sync_result_builder, ignored_users) room_entries, invited, newly_joined_rooms = res tags_by_room = yield self.store.get_tags_for_user(user_id) def handle_room_entries(room_entry): return self._generate_room_entry( - sync_result_builer, + sync_result_builder, ignored_users, room_entry, ephemeral=ephemeral_by_room.get(room_entry.room_id, []), tags=tags_by_room.get(room_entry.room_id), account_data=account_data_by_room.get(room_entry.room_id, {}), - always_include=sync_result_builer.full_state, + always_include=sync_result_builder.full_state, ) yield concurrently_execute(handle_room_entries, room_entries, 10) - sync_result_builer.invited.extend(invited) + sync_result_builder.invited.extend(invited) # Now we want to get any newly joined users newly_joined_users = set() - for joined_sync in sync_result_builer.joined: + for joined_sync in sync_result_builder.joined: it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values()) for event in it: if event.type == EventTypes.Member: @@ -706,21 +706,21 @@ class SyncHandler(object): defer.returnValue((newly_joined_rooms, newly_joined_users)) @defer.inlineCallbacks - def _get_rooms_changed(self, sync_result_builer, ignored_users): + def _get_rooms_changed(self, sync_result_builder, ignored_users): """Gets the the changes that have happened since the last sync. Args: - sync_result_builer(SyncResultBuilder) + sync_result_builder(SyncResultBuilder) ignored_users(set(str)): Set of users ignored by user. Returns: Deferred(tuple): Returns a tuple of the form: `([RoomSyncResultBuilder], [InvitedSyncResult], newly_joined_rooms)` """ - user_id = sync_result_builer.sync_config.user.to_string() - since_token = sync_result_builer.since_token - now_token = sync_result_builer.now_token - sync_config = sync_result_builer.sync_config + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config assert since_token @@ -843,11 +843,11 @@ class SyncHandler(object): defer.returnValue((room_entries, invited, newly_joined_rooms)) @defer.inlineCallbacks - def _get_all_rooms(self, sync_result_builer, ignored_users): + def _get_all_rooms(self, sync_result_builder, ignored_users): """Returns entries for all rooms for the user. Args: - sync_result_builer(SyncResultBuilder) + sync_result_builder(SyncResultBuilder) ignored_users(set(str)): Set of users ignored by user. Returns: @@ -855,10 +855,10 @@ class SyncHandler(object): `([RoomSyncResultBuilder], [InvitedSyncResult], [])` """ - user_id = sync_result_builer.sync_config.user.to_string() - since_token = sync_result_builer.since_token - now_token = sync_result_builer.now_token - sync_config = sync_result_builer.sync_config + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config membership_list = ( Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN @@ -914,14 +914,14 @@ class SyncHandler(object): defer.returnValue((room_entries, invited, [])) @defer.inlineCallbacks - def _generate_room_entry(self, sync_result_builer, ignored_users, + def _generate_room_entry(self, sync_result_builder, ignored_users, room_builder, ephemeral, tags, account_data, always_include=False): - """Populates the `joined` and `archived` section of `sync_result_builer` + """Populates the `joined` and `archived` section of `sync_result_builder` based on the `room_builder`. Args: - sync_result_builer(SyncResultBuilder) + sync_result_builder(SyncResultBuilder) ignored_users(set(str)): Set of users ignored by user. room_builder(RoomSyncResultBuilder) ephemeral(list): List of new ephemeral events for room @@ -931,9 +931,9 @@ class SyncHandler(object): always_include(bool): Always include this room in the sync response, even if empty. """ - since_token = sync_result_builer.since_token - now_token = sync_result_builer.now_token - sync_config = sync_result_builer.sync_config + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config room_id = room_builder.room_id events = room_builder.events @@ -941,7 +941,7 @@ class SyncHandler(object): full_state = ( room_builder.full_state or newly_joined - or sync_result_builer.full_state + or sync_result_builder.full_state ) since_token = room_builder.since_token upto_token = room_builder.upto_token @@ -1001,7 +1001,7 @@ class SyncHandler(object): unread_notifications["notification_count"] = notifs["notify_count"] unread_notifications["highlight_count"] = notifs["highlight_count"] - sync_result_builer.joined.append(room_sync) + sync_result_builder.joined.append(room_sync) elif room_builder.rtype == "archived": room_sync = ArchivedSyncResult( room_id=room_id, @@ -1010,7 +1010,7 @@ class SyncHandler(object): account_data=account_data, ) if room_sync or always_include: - sync_result_builer.archived.append(room_sync) + sync_result_builder.archived.append(room_sync) else: raise Exception("Unrecognized rtype: %r", room_builder.rtype) From b08ad0389e0e412798f32d9f5656db483238173d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 11:04:35 +0100 Subject: [PATCH 07/10] Only include non-offline presence in initial sync --- synapse/handlers/sync.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 12a4cc8b57..8143171c11 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -606,13 +606,16 @@ class SyncHandler(object): since_token = sync_result_builder.since_token if since_token and not sync_result_builder.full_state: presence_key = since_token.presence_key + include_offline = True else: presence_key = None + include_offline = False presence, presence_key = yield presence_source.get_new_events( user=user, from_key=presence_key, is_guest=sync_config.is_guest, + include_offline=include_offline, ) sync_result_builder.now_token = now_token.copy_and_replace( "presence_key", presence_key From 1c5ed2a19ba6da4478c54df072e623598c0ea60d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 11:21:34 +0100 Subject: [PATCH 08/10] Only work out newly_joined_users for incremental sync --- synapse/handlers/sync.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 8143171c11..476dcf38e2 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -699,12 +699,15 @@ class SyncHandler(object): # Now we want to get any newly joined users newly_joined_users = set() - for joined_sync in sync_result_builder.joined: - it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values()) - for event in it: - if event.type == EventTypes.Member: - if event.membership == Membership.JOIN: - newly_joined_users.add(event.state_key) + if sync_result_builder.since_token: + for joined_sync in sync_result_builder.joined: + it = itertools.chain( + joined_sync.timeline.events, joined_sync.state.values() + ) + for event in it: + if event.type == EventTypes.Member: + if event.membership == Membership.JOIN: + newly_joined_users.add(event.state_key) defer.returnValue((newly_joined_rooms, newly_joined_users)) From 69003039973169097592359f3f34fc32c5bbaeb0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 11:44:55 +0100 Subject: [PATCH 09/10] Don't send down all ephemeral events --- synapse/handlers/sync.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 476dcf38e2..6f7dd45ef3 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -655,7 +655,9 @@ class SyncHandler(object): user_id = sync_result_builder.sync_config.user.to_string() now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_result_builder.sync_config, sync_result_builder.now_token + sync_result_builder.sync_config, + now_token=sync_result_builder.now_token, + since_token=sync_result_builder.since_token, ) sync_result_builder.now_token = now_token From faad233ea61cfff2c377609fa1d3c64d39f8a039 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 14:00:43 +0100 Subject: [PATCH 10/10] Change short circuit path --- synapse/handlers/sync.py | 42 +++++++++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6f7dd45ef3..3b89582d79 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -273,23 +273,14 @@ class SyncHandler(object): a Deferred TimelineBatch """ with Measure(self.clock, "load_filtered_recents"): - filtering_factor = 2 timeline_limit = sync_config.filter_collection.timeline_limit() - load_limit = max(timeline_limit * filtering_factor, 10) - max_repeat = 5 # Only try a few times per room, otherwise - room_key = now_token.room_key - end_key = room_key if recents is None or newly_joined_room or timeline_limit < len(recents): limited = True else: limited = False - if since_token: - if not now_token.is_after(since_token): - limited = False - - if recents is not None: + if recents: recents = sync_config.filter_collection.filter_room_timeline(recents) recents = yield filter_events_for_client( self.store, @@ -299,6 +290,19 @@ class SyncHandler(object): else: recents = [] + if not limited: + defer.returnValue(TimelineBatch( + events=recents, + prev_batch=now_token, + limited=False + )) + + filtering_factor = 2 + load_limit = max(timeline_limit * filtering_factor, 10) + max_repeat = 5 # Only try a few times per room, otherwise + room_key = now_token.room_key + end_key = room_key + since_key = None if since_token and not newly_joined_room: since_key = since_token.room_key @@ -939,18 +943,24 @@ class SyncHandler(object): always_include(bool): Always include this room in the sync response, even if empty. """ - since_token = sync_result_builder.since_token - now_token = sync_result_builder.now_token - sync_config = sync_result_builder.sync_config - - room_id = room_builder.room_id - events = room_builder.events newly_joined = room_builder.newly_joined full_state = ( room_builder.full_state or newly_joined or sync_result_builder.full_state ) + events = room_builder.events + + # We want to shortcut out as early as possible. + if not (always_include or account_data or ephemeral or full_state): + if events == [] and tags is None: + return + + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config + + room_id = room_builder.room_id since_token = room_builder.since_token upto_token = room_builder.upto_token