diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 1fdf978313..f5e20d6a6e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -72,7 +72,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ ) -class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ +class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [ "room_id", # str "timeline", # TimelineBatch "state", # dict[(str, str), FrozenEvent] @@ -429,44 +429,20 @@ class SyncHandler(BaseHandler): defer.returnValue((now_token, ephemeral_by_room)) - @defer.inlineCallbacks def full_state_sync_for_archived_room(self, room_id, sync_config, leave_event_id, leave_token, timeline_since_token, tags_by_room, account_data_by_room): """Sync a room for a client which is starting without any state Returns: - A Deferred JoinedSyncResult. + A Deferred ArchivedSyncResult. """ - batch = yield self.load_filtered_recents( - room_id, sync_config, leave_token, since_token=timeline_since_token + return self.incremental_sync_for_archived_room( + sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room, + account_data_by_room, full_state=True, leave_token=leave_token, ) - leave_state = yield self.store.get_state_for_event(leave_event_id) - - leave_state = { - (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state( - leave_state.values() - ) - } - - account_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) - - account_data = sync_config.filter_collection.filter_room_account_data( - account_data - ) - - defer.returnValue(ArchivedSyncResult( - room_id=room_id, - timeline=batch, - state=leave_state, - account_data=account_data, - )) - @defer.inlineCallbacks def incremental_sync_with_gap(self, sync_config, since_token): """ Get the incremental delta needed to bring the client up to @@ -512,173 +488,127 @@ class SyncHandler(BaseHandler): sync_config.user ) + user_id = sync_config.user.to_string() + timeline_limit = sync_config.filter_collection.timeline_limit() tags_by_room = yield self.store.get_updated_tags( - sync_config.user.to_string(), + user_id, since_token.account_data_key, ) account_data, account_data_by_room = ( yield self.store.get_updated_account_data_for_user( - sync_config.user.to_string(), + user_id, since_token.account_data_key, ) ) + # Get a list of membership change events that have happened. rooms_changed = yield self.store.get_room_changes_for_user( - sync_config.user.to_string(), since_token.room_key, now_token.room_key + user_id, since_token.room_key, now_token.room_key ) + mem_change_events_by_room_id = {} + for event in rooms_changed: + mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) + + newly_joined_rooms = [] + archived = [] + invited = [] + for room_id, events in mem_change_events_by_room_id.items(): + non_joins = [e for e in events if e.membership != Membership.JOIN] + has_join = len(non_joins) != len(events) + + # We want to figure out if we joined the room at some point since + # the last sync (even if we have since left). This is to make sure + # we do send down the room, and with full state, where necessary + if room_id in joined_room_ids or has_join: + old_state = yield self.get_state_at(room_id, since_token) + old_mem_ev = old_state.get((EventTypes.Member, user_id), None) + if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: + newly_joined_rooms.append(room_id) + + if room_id in joined_room_ids: + continue + + if not non_joins: + continue + + # Only bother if we're still currently invited + should_invite = non_joins[-1].membership == Membership.INVITE + if should_invite: + room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) + if room_sync: + invited.append(room_sync) + + # Always include leave/ban events. Just take the last one. + # TODO: How do we handle ban -> leave in same batch? + leave_events = [ + e for e in non_joins + if e.membership in (Membership.LEAVE, Membership.BAN) + ] + + if leave_events: + leave_event = leave_events[-1] + room_sync = yield self.incremental_sync_for_archived_room( + sync_config, room_id, leave_event.event_id, since_token, + tags_by_room, account_data_by_room, + full_state=room_id in newly_joined_rooms + ) + if room_sync: + archived.append(room_sync) + + # Get all events for rooms we're currently joined to. room_to_events = yield self.store.get_room_events_stream_for_rooms( - room_ids=room_ids, + room_ids=joined_room_ids, from_key=since_token.room_key, to_key=now_token.room_key, limit=timeline_limit + 1, ) - room_events = [ - event - for events, _ in room_to_events.values() - for event in events - ] - - room_events.extend(rooms_changed) - - # room_events, _ = yield self.store.get_room_events_stream( - # sync_config.user.to_string(), - # from_key=since_token.room_key, - # to_key=now_token.room_key, - # limit=timeline_limit + 1, - # ) - joined = [] - archived = [] - if len(room_events) <= timeline_limit: - # There is no gap in any of the rooms. Therefore we can just - # partition the new events by room and return them. - logger.debug("Got %i events for incremental sync - not limited", - len(room_events)) + # We loop through all room ids, even if there are no new events, in case + # there are non room events taht we need to notify about. + for room_id in joined_room_ids: + room_entry = room_to_events.get(room_id, None) - invite_events = [] - leave_events = [] - events_by_room_id = {} - for event in room_events: - events_by_room_id.setdefault(event.room_id, []).append(event) - if event.room_id not in joined_room_ids: - if (event.type == EventTypes.Member - and event.state_key == sync_config.user.to_string()): - if event.membership == Membership.INVITE: - invite_events.append(event) - elif event.membership in (Membership.LEAVE, Membership.BAN): - leave_events.append(event) + if room_entry: + events, start_key = room_entry - for room_id in joined_room_ids: - recents = events_by_room_id.get(room_id, []) - logger.debug("Events for room %s: %r", room_id, recents) - state = { - (event.type, event.state_key): event - for event in recents if event.is_state()} - limited = False + prev_batch_token = now_token.copy_and_replace("room_key", start_key) - if recents: - prev_batch = now_token.copy_and_replace( - "room_key", recents[0].internal_metadata.before - ) - else: - prev_batch = now_token + newly_joined_room = room_id in newly_joined_rooms + full_state = newly_joined_room - just_joined = yield self.check_joined_room(sync_config, state) - if just_joined: - logger.debug("User has just joined %s: needs full state", - room_id) - state = yield self.get_state_at(room_id, now_token) - # the timeline is inherently limited if we've just joined - limited = True - - recents = sync_config.filter_collection.filter_room_timeline(recents) - - state = { - (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state( - state.values() - ) - } - - acc_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room + batch = yield self.load_filtered_recents( + room_id, sync_config, prev_batch_token, + since_token=since_token, + recents=events, + newly_joined_room=newly_joined_room, ) - - acc_data = sync_config.filter_collection.filter_room_account_data( - acc_data + else: + batch = TimelineBatch( + events=[], + prev_batch=since_token, + limited=False, ) + full_state = False - ephemeral = sync_config.filter_collection.filter_room_ephemeral( - ephemeral_by_room.get(room_id, []) - ) - - room_sync = JoinedSyncResult( - room_id=room_id, - timeline=TimelineBatch( - events=recents, - prev_batch=prev_batch, - limited=limited, - ), - state=state, - ephemeral=ephemeral, - account_data=acc_data, - unread_notifications={}, - ) - logger.debug("Result for room %s: %r", room_id, room_sync) - - if room_sync: - notifs = yield self.unread_notifs_for_room_id( - room_id, sync_config, all_ephemeral_by_room - ) - - if notifs is not None: - notif_dict = room_sync.unread_notifications - notif_dict["notification_count"] = len(notifs) - notif_dict["highlight_count"] = len([ - 1 for notif in notifs - if _action_has_highlight(notif["actions"]) - ]) - - joined.append(room_sync) - - else: - logger.debug("Got %i events for incremental sync - hit limit", - len(room_events)) - - invite_events = yield self.store.get_invites_for_user( - sync_config.user.to_string() - ) - - leave_events = yield self.store.get_leave_and_ban_events_for_user( - sync_config.user.to_string() - ) - - for room_id in joined_room_ids: - room_sync = yield self.incremental_sync_with_gap_for_room( - room_id, sync_config, since_token, now_token, - ephemeral_by_room, tags_by_room, account_data_by_room, - all_ephemeral_by_room=all_ephemeral_by_room, - ) - if room_sync: - joined.append(room_sync) - - for leave_event in leave_events: - room_sync = yield self.incremental_sync_for_archived_room( - sync_config, leave_event, since_token, tags_by_room, - account_data_by_room + room_sync = yield self.incremental_sync_with_gap_for_room( + room_id=room_id, + sync_config=sync_config, + since_token=since_token, + now_token=now_token, + ephemeral_by_room=ephemeral_by_room, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + all_ephemeral_by_room=all_ephemeral_by_room, + batch=batch, + full_state=full_state, ) if room_sync: - archived.append(room_sync) - - invited = [ - InvitedSyncResult(room_id=event.room_id, invite=event) - for event in invite_events - ] + joined.append(room_sync) account_data_for_user = sync_config.filter_collection.filter_account_data( self.account_data_for_user(account_data) @@ -699,12 +629,10 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def load_filtered_recents(self, room_id, sync_config, now_token, - since_token=None): + since_token=None, recents=None, newly_joined_room=False): """ :returns a Deferred TimelineBatch """ - limited = True - recents = [] filtering_factor = 2 timeline_limit = sync_config.filter_collection.timeline_limit() load_limit = max(timeline_limit * filtering_factor, 100) @@ -712,11 +640,27 @@ class SyncHandler(BaseHandler): room_key = now_token.room_key end_key = room_key + limited = recents is None or newly_joined_room or timeline_limit < len(recents) + + if recents is not None: + recents = sync_config.filter_collection.filter_room_timeline(recents) + recents = yield self._filter_events_for_client( + sync_config.user.to_string(), + recents, + is_peeking=sync_config.is_guest, + ) + else: + recents = [] + + since_key = None + if since_token and not newly_joined_room: + since_key = since_token.room_key + while limited and len(recents) < timeline_limit and max_repeat: - events, end_key = yield self.store.get_recent_room_events_stream_for_room( + events, end_key = yield self.store.get_room_events_stream_for_room( room_id, limit=load_limit + 1, - from_key=since_token.room_key if since_token else None, + from_key=since_key, to_key=end_key, ) loaded_recents = sync_config.filter_collection.filter_room_timeline(events) @@ -727,6 +671,7 @@ class SyncHandler(BaseHandler): ) loaded_recents.extend(recents) recents = loaded_recents + if len(events) <= load_limit: limited = False break @@ -742,7 +687,9 @@ class SyncHandler(BaseHandler): ) defer.returnValue(TimelineBatch( - events=recents, prev_batch=prev_batch_token, limited=limited + events=recents, + prev_batch=prev_batch_token, + limited=limited or newly_joined_room )) @defer.inlineCallbacks @@ -750,24 +697,8 @@ class SyncHandler(BaseHandler): since_token, now_token, ephemeral_by_room, tags_by_room, account_data_by_room, - all_ephemeral_by_room): - """ Get the incremental delta needed to bring the client up to date for - the room. Gives the client the most recent events and the changes to - state. - Returns: - A Deferred JoinedSyncResult - """ - logger.debug("Doing incremental sync for room %s between %s and %s", - room_id, since_token, now_token) - - # TODO(mjark): Check for redactions we might have missed. - - batch = yield self.load_filtered_recents( - room_id, sync_config, now_token, since_token, - ) - - logger.debug("Recents %r", batch) - + all_ephemeral_by_room, + batch, full_state=False): if batch.limited: current_state = yield self.get_state_at(room_id, now_token) @@ -832,43 +763,48 @@ class SyncHandler(BaseHandler): defer.returnValue(room_sync) @defer.inlineCallbacks - def incremental_sync_for_archived_room(self, sync_config, leave_event, + def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id, since_token, tags_by_room, - account_data_by_room): + account_data_by_room, full_state, + leave_token=None): """ Get the incremental delta needed to bring the client up to date for the archived room. Returns: A Deferred ArchivedSyncResult """ - stream_token = yield self.store.get_stream_token_for_event( - leave_event.event_id - ) + if not leave_token: + stream_token = yield self.store.get_stream_token_for_event( + leave_event_id + ) - leave_token = since_token.copy_and_replace("room_key", stream_token) + leave_token = since_token.copy_and_replace("room_key", stream_token) - if since_token.is_after(leave_token): + if since_token and since_token.is_after(leave_token): defer.returnValue(None) batch = yield self.load_filtered_recents( - leave_event.room_id, sync_config, leave_token, since_token, + room_id, sync_config, leave_token, since_token, ) logger.debug("Recents %r", batch) state_events_at_leave = yield self.store.get_state_for_event( - leave_event.event_id + leave_event_id ) - state_at_previous_sync = yield self.get_state_at( - leave_event.room_id, stream_position=since_token - ) + if not full_state: + state_at_previous_sync = yield self.get_state_at( + room_id, stream_position=since_token + ) - state_events_delta = yield self.compute_state_delta( - since_token=since_token, - previous_state=state_at_previous_sync, - current_state=state_events_at_leave, - ) + state_events_delta = yield self.compute_state_delta( + since_token=since_token, + previous_state=state_at_previous_sync, + current_state=state_events_at_leave, + ) + else: + state_events_delta = state_events_at_leave state_events_delta = { (e.type, e.state_key): e @@ -878,7 +814,7 @@ class SyncHandler(BaseHandler): } account_data = self.account_data_for_room( - leave_event.room_id, tags_by_room, account_data_by_room + room_id, tags_by_room, account_data_by_room ) account_data = sync_config.filter_collection.filter_room_account_data( @@ -886,7 +822,7 @@ class SyncHandler(BaseHandler): ) room_sync = ArchivedSyncResult( - room_id=leave_event.room_id, + room_id=room_id, timeline=batch, state=state_events_delta, account_data=account_data, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d96ea3a30e..0dd1daaa2e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -128,7 +128,6 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) - logger.info("Invalidating %r at %r", event.room_id, stream_ordering) self._events_stream_cache.room_has_changed(None, event.room_id, stream_ordering) except _RollbackButIsFineException: pass diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3a32a0019a..563e289c4e 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -179,7 +179,7 @@ class StreamStore(SQLBaseStore): room_ids = list(room_ids) for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)): res = yield defer.gatherResults([ - self.get_recent_room_events_stream_for_room( + self.get_room_events_stream_for_room( room_id, from_key, to_key, limit ).addCallback(lambda r, rm: (rm, r), room_id) for room_id in room_ids @@ -189,7 +189,7 @@ class StreamStore(SQLBaseStore): defer.returnValue(results) @defer.inlineCallbacks - def get_recent_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0): + def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0): if from_key is not None: from_id = RoomStreamToken.parse_stream_token(from_key).stream else: @@ -246,7 +246,7 @@ class StreamStore(SQLBaseStore): key = from_key return ret, key - res = yield self.runInteraction("get_recent_room_events_stream_for_room", f) + res = yield self.runInteraction("get_room_events_stream_for_room", f) defer.returnValue(res) def get_room_changes_for_user(self, user_id, from_key, to_key):