From b5605dfecc1f277e03165b374bd6ce81638ccb36 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 May 2016 17:37:01 +0100 Subject: [PATCH 01/16] Refactor SyncHandler --- synapse/handlers/sync.py | 978 +++++++++++++++++++-------------------- 1 file changed, 481 insertions(+), 497 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9ebfccc8bf..80eccf19ae 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -194,157 +194,7 @@ class SyncHandler(object): Returns: A Deferred SyncResult. """ - if since_token is None or full_state: - return self.full_state_sync(sync_config, since_token) - else: - return self.incremental_sync_with_gap(sync_config, since_token) - - @defer.inlineCallbacks - def full_state_sync(self, sync_config, timeline_since_token): - """Get a sync for a client which is starting without any state. - - If a 'message_since_token' is given, only timeline events which have - happened since that token will be returned. - - Returns: - A Deferred SyncResult. - """ - now_token = yield self.event_sources.get_current_token() - - now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token - ) - - presence_stream = self.event_sources.sources["presence"] - # TODO (mjark): This looks wrong, shouldn't we be getting the presence - # UP to the present rather than after the present? - pagination_config = PaginationConfig(from_token=now_token) - presence, _ = yield presence_stream.get_pagination_rows( - user=sync_config.user, - pagination_config=pagination_config.get_source_config("presence"), - key=None - ) - - membership_list = ( - Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN - ) - - room_list = yield self.store.get_rooms_for_user_where_membership_is( - user_id=sync_config.user.to_string(), - membership_list=membership_list - ) - - account_data, account_data_by_room = ( - yield self.store.get_account_data_for_user( - sync_config.user.to_string() - ) - ) - - account_data['m.push_rules'] = yield self.push_rules_for_user( - sync_config.user - ) - - tags_by_room = yield self.store.get_tags_for_user( - sync_config.user.to_string() - ) - - ignored_users = account_data.get( - "m.ignored_user_list", {} - ).get("ignored_users", {}).keys() - - joined = [] - invited = [] - archived = [] - - user_id = sync_config.user.to_string() - - @defer.inlineCallbacks - def _generate_room_entry(event): - if event.membership == Membership.JOIN: - room_result = yield self.full_state_sync_for_joined_room( - room_id=event.room_id, - sync_config=sync_config, - now_token=now_token, - timeline_since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) - joined.append(room_result) - elif event.membership == Membership.INVITE: - if event.sender in ignored_users: - return - invite = yield self.store.get_event(event.event_id) - invited.append(InvitedSyncResult( - room_id=event.room_id, - invite=invite, - )) - elif event.membership in (Membership.LEAVE, Membership.BAN): - # Always send down rooms we were banned or kicked from. - if not sync_config.filter_collection.include_leave: - if event.membership == Membership.LEAVE: - if user_id == event.sender: - return - - leave_token = now_token.copy_and_replace( - "room_key", "s%d" % (event.stream_ordering,) - ) - room_result = yield self.full_state_sync_for_archived_room( - sync_config=sync_config, - room_id=event.room_id, - leave_event_id=event.event_id, - leave_token=leave_token, - timeline_since_token=timeline_since_token, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) - archived.append(room_result) - - yield concurrently_execute(_generate_room_entry, room_list, 10) - - account_data_for_user = sync_config.filter_collection.filter_account_data( - self.account_data_for_user(account_data) - ) - - presence = sync_config.filter_collection.filter_presence( - presence - ) - - defer.returnValue(SyncResult( - presence=presence, - account_data=account_data_for_user, - joined=joined, - invited=invited, - archived=archived, - next_batch=now_token, - )) - - @defer.inlineCallbacks - def full_state_sync_for_joined_room(self, room_id, sync_config, - now_token, timeline_since_token, - ephemeral_by_room, tags_by_room, - account_data_by_room): - """Sync a room for a client which is starting without any state - Returns: - A Deferred JoinedSyncResult. - """ - - batch = yield self.load_filtered_recents( - room_id, sync_config, now_token, since_token=timeline_since_token - ) - - room_sync = yield self.incremental_sync_with_gap_for_room( - room_id, sync_config, - now_token=now_token, - since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - batch=batch, - full_state=True, - ) - - defer.returnValue(room_sync) + return self.generate_sync_result(sync_config, since_token, full_state) @defer.inlineCallbacks def push_rules_for_user(self, user): @@ -365,24 +215,6 @@ class SyncHandler(object): return account_data_events - def account_data_for_room(self, room_id, tags_by_room, account_data_by_room): - account_data_events = [] - tags = tags_by_room.get(room_id) - if tags is not None: - account_data_events.append({ - "type": "m.tag", - "content": {"tags": tags}, - }) - - account_data = account_data_by_room.get(room_id, {}) - for account_data_type, content in account_data.items(): - account_data_events.append({ - "type": account_data_type, - "content": content, - }) - - return account_data_events - @defer.inlineCallbacks def ephemeral_by_room(self, sync_config, now_token, since_token=None): """Get the ephemeral events for each room the user is in @@ -445,237 +277,6 @@ class SyncHandler(object): defer.returnValue((now_token, ephemeral_by_room)) - def full_state_sync_for_archived_room(self, room_id, sync_config, - leave_event_id, leave_token, - timeline_since_token, tags_by_room, - account_data_by_room): - """Sync a room for a client which is starting without any state - Returns: - A Deferred ArchivedSyncResult. - """ - - return self.incremental_sync_for_archived_room( - sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room, - account_data_by_room, full_state=True, leave_token=leave_token, - ) - - @defer.inlineCallbacks - def incremental_sync_with_gap(self, sync_config, since_token): - """ Get the incremental delta needed to bring the client up to - date with the server. - Returns: - A Deferred SyncResult. - """ - now_token = yield self.event_sources.get_current_token() - - rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) - room_ids = [room.room_id for room in rooms] - - presence_source = self.event_sources.sources["presence"] - presence, presence_key = yield presence_source.get_new_events( - user=sync_config.user, - from_key=since_token.presence_key, - limit=sync_config.filter_collection.presence_limit(), - room_ids=room_ids, - is_guest=sync_config.is_guest, - ) - now_token = now_token.copy_and_replace("presence_key", presence_key) - - now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token, since_token - ) - - app_service = yield self.store.get_app_service_by_user_id( - sync_config.user.to_string() - ) - if app_service: - rooms = yield self.store.get_app_service_rooms(app_service) - joined_room_ids = set(r.room_id for r in rooms) - else: - rooms = yield self.store.get_rooms_for_user( - sync_config.user.to_string() - ) - joined_room_ids = set(r.room_id for r in rooms) - - user_id = sync_config.user.to_string() - - timeline_limit = sync_config.filter_collection.timeline_limit() - - tags_by_room = yield self.store.get_updated_tags( - user_id, - since_token.account_data_key, - ) - - account_data, account_data_by_room = ( - yield self.store.get_updated_account_data_for_user( - user_id, - since_token.account_data_key, - ) - ) - - push_rules_changed = yield self.store.have_push_rules_changed_for_user( - user_id, int(since_token.push_rules_key) - ) - - if push_rules_changed: - account_data["m.push_rules"] = yield self.push_rules_for_user( - sync_config.user - ) - - ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( - "m.ignored_user_list", user_id=user_id, - ) - - if ignored_account_data: - ignored_users = ignored_account_data.get("ignored_users", {}).keys() - else: - ignored_users = frozenset() - - # Get a list of membership change events that have happened. - rooms_changed = yield self.store.get_membership_changes_for_user( - user_id, since_token.room_key, now_token.room_key - ) - - mem_change_events_by_room_id = {} - for event in rooms_changed: - mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) - - newly_joined_rooms = [] - archived = [] - invited = [] - for room_id, events in mem_change_events_by_room_id.items(): - non_joins = [e for e in events if e.membership != Membership.JOIN] - has_join = len(non_joins) != len(events) - - # We want to figure out if we joined the room at some point since - # the last sync (even if we have since left). This is to make sure - # we do send down the room, and with full state, where necessary - if room_id in joined_room_ids or has_join: - old_state = yield self.get_state_at(room_id, since_token) - old_mem_ev = old_state.get((EventTypes.Member, user_id), None) - if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: - newly_joined_rooms.append(room_id) - - if room_id in joined_room_ids: - continue - - if not non_joins: - continue - - # Only bother if we're still currently invited - should_invite = non_joins[-1].membership == Membership.INVITE - if should_invite: - if event.sender not in ignored_users: - room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) - if room_sync: - invited.append(room_sync) - - # Always include leave/ban events. Just take the last one. - # TODO: How do we handle ban -> leave in same batch? - leave_events = [ - e for e in non_joins - if e.membership in (Membership.LEAVE, Membership.BAN) - ] - - if leave_events: - leave_event = leave_events[-1] - room_sync = yield self.incremental_sync_for_archived_room( - sync_config, room_id, leave_event.event_id, since_token, - tags_by_room, account_data_by_room, - full_state=room_id in newly_joined_rooms - ) - if room_sync: - archived.append(room_sync) - - # Get all events for rooms we're currently joined to. - room_to_events = yield self.store.get_room_events_stream_for_rooms( - room_ids=joined_room_ids, - from_key=since_token.room_key, - to_key=now_token.room_key, - limit=timeline_limit + 1, - ) - - joined = [] - # We loop through all room ids, even if there are no new events, in case - # there are non room events taht we need to notify about. - for room_id in joined_room_ids: - room_entry = room_to_events.get(room_id, None) - - if room_entry: - events, start_key = room_entry - - prev_batch_token = now_token.copy_and_replace("room_key", start_key) - - newly_joined_room = room_id in newly_joined_rooms - full_state = newly_joined_room - - batch = yield self.load_filtered_recents( - room_id, sync_config, prev_batch_token, - since_token=since_token, - recents=events, - newly_joined_room=newly_joined_room, - ) - else: - batch = TimelineBatch( - events=[], - prev_batch=since_token, - limited=False, - ) - full_state = False - - room_sync = yield self.incremental_sync_with_gap_for_room( - room_id=room_id, - sync_config=sync_config, - since_token=since_token, - now_token=now_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - batch=batch, - full_state=full_state, - ) - if room_sync: - joined.append(room_sync) - - # For each newly joined room, we want to send down presence of - # existing users. - presence_handler = self.presence_handler - extra_presence_users = set() - for room_id in newly_joined_rooms: - users = yield self.store.get_users_in_room(event.room_id) - extra_presence_users.update(users) - - # For each new member, send down presence. - for joined_sync in joined: - it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values()) - for event in it: - if event.type == EventTypes.Member: - if event.membership == Membership.JOIN: - extra_presence_users.add(event.state_key) - - states = yield presence_handler.get_states( - [u for u in extra_presence_users if u != user_id], - as_event=True, - ) - presence.extend(states) - - account_data_for_user = sync_config.filter_collection.filter_account_data( - self.account_data_for_user(account_data) - ) - - presence = sync_config.filter_collection.filter_presence( - presence - ) - - defer.returnValue(SyncResult( - presence=presence, - account_data=account_data_for_user, - joined=joined, - invited=invited, - archived=archived, - next_batch=now_token, - )) - @defer.inlineCallbacks def load_filtered_recents(self, room_id, sync_config, now_token, since_token=None, recents=None, newly_joined_room=False): @@ -696,6 +297,10 @@ class SyncHandler(object): else: limited = False + if since_token: + if not now_token.is_after(since_token): + limited = False + if recents is not None: recents = sync_config.filter_collection.filter_room_timeline(recents) recents = yield filter_events_for_client( @@ -748,103 +353,6 @@ class SyncHandler(object): limited=limited or newly_joined_room )) - @defer.inlineCallbacks - def incremental_sync_with_gap_for_room(self, room_id, sync_config, - since_token, now_token, - ephemeral_by_room, tags_by_room, - account_data_by_room, - batch, full_state=False): - state = yield self.compute_state_delta( - room_id, batch, sync_config, since_token, now_token, - full_state=full_state - ) - - account_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) - - account_data = sync_config.filter_collection.filter_room_account_data( - account_data - ) - - ephemeral = sync_config.filter_collection.filter_room_ephemeral( - ephemeral_by_room.get(room_id, []) - ) - - unread_notifications = {} - room_sync = JoinedSyncResult( - room_id=room_id, - timeline=batch, - state=state, - ephemeral=ephemeral, - account_data=account_data, - unread_notifications=unread_notifications, - ) - - if room_sync: - notifs = yield self.unread_notifs_for_room_id( - room_id, sync_config - ) - - if notifs is not None: - unread_notifications["notification_count"] = notifs["notify_count"] - unread_notifications["highlight_count"] = notifs["highlight_count"] - - logger.debug("Room sync: %r", room_sync) - - defer.returnValue(room_sync) - - @defer.inlineCallbacks - def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id, - since_token, tags_by_room, - account_data_by_room, full_state, - leave_token=None): - """ Get the incremental delta needed to bring the client up to date for - the archived room. - Returns: - A Deferred ArchivedSyncResult - """ - - if not leave_token: - stream_token = yield self.store.get_stream_token_for_event( - leave_event_id - ) - - leave_token = since_token.copy_and_replace("room_key", stream_token) - - if since_token and since_token.is_after(leave_token): - defer.returnValue(None) - - batch = yield self.load_filtered_recents( - room_id, sync_config, leave_token, since_token, - ) - - logger.debug("Recents %r", batch) - - state_events_delta = yield self.compute_state_delta( - room_id, batch, sync_config, since_token, leave_token, - full_state=full_state - ) - - account_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) - - account_data = sync_config.filter_collection.filter_room_account_data( - account_data - ) - - room_sync = ArchivedSyncResult( - room_id=room_id, - timeline=batch, - state=state_events_delta, - account_data=account_data, - ) - - logger.debug("Room sync: %r", room_sync) - - defer.returnValue(room_sync) - @defer.inlineCallbacks def get_state_after_event(self, event): """ @@ -1010,6 +518,457 @@ class SyncHandler(object): # count is whatever it was last time. defer.returnValue(None) + @defer.inlineCallbacks + def generate_sync_result(self, sync_config, since_token=None, full_state=False): + now_token = yield self.event_sources.get_current_token() + + sync_result_builer = SyncResultBuilder( + sync_config, full_state, + since_token=since_token, + now_token=now_token, + ) + + account_data_by_room = yield self.generate_sync_entry_for_account_data( + sync_result_builer + ) + + newly_joined_rooms, newly_joined_users = yield self.generate_sync_entry_for_rooms( + sync_result_builer, account_data_by_room + ) + + yield self.generate_sync_entry_for_presence( + sync_result_builer, newly_joined_rooms, newly_joined_users + ) + + defer.returnValue(SyncResult( + presence=sync_result_builer.presence, + account_data=sync_result_builer.account_data, + joined=sync_result_builer.joined, + invited=sync_result_builer.invited, + archived=sync_result_builer.archived, + next_batch=sync_result_builer.now_token, + )) + + @defer.inlineCallbacks + def generate_sync_entry_for_account_data(self, sync_result_builer): + sync_config = sync_result_builer.sync_config + user_id = sync_result_builer.sync_config.user.to_string() + since_token = sync_result_builer.since_token + + if since_token and not sync_result_builer.full_state: + account_data, account_data_by_room = ( + yield self.store.get_updated_account_data_for_user( + user_id, + since_token.account_data_key, + ) + ) + + push_rules_changed = yield self.store.have_push_rules_changed_for_user( + user_id, int(since_token.push_rules_key) + ) + + if push_rules_changed: + account_data["m.push_rules"] = yield self.push_rules_for_user( + sync_config.user + ) + else: + account_data, account_data_by_room = ( + yield self.store.get_account_data_for_user( + sync_config.user.to_string() + ) + ) + + account_data['m.push_rules'] = yield self.push_rules_for_user( + sync_config.user + ) + + account_data_for_user = sync_config.filter_collection.filter_account_data( + self.account_data_for_user(account_data) + ) + + sync_result_builer.account_data = account_data_for_user + + defer.returnValue(account_data_by_room) + + @defer.inlineCallbacks + def generate_sync_entry_for_presence(self, sync_result_builer, newly_joined_rooms, + newly_joined_users): + now_token = sync_result_builer.now_token + sync_config = sync_result_builer.sync_config + user = sync_result_builer.sync_config.user + + presence_source = self.event_sources.sources["presence"] + + since_token = sync_result_builer.since_token + if since_token and not sync_result_builer.full_state: + presence_key = since_token.presence_key + else: + presence_key = None + + presence, presence_key = yield presence_source.get_new_events( + user=user, + from_key=presence_key, + is_guest=sync_config.is_guest, + ) + sync_result_builer.now_token = now_token.copy_and_replace( + "presence_key", presence_key + ) + + extra_users_ids = set(newly_joined_users) + for room_id in newly_joined_rooms: + users = yield self.store.get_users_in_room(room_id) + extra_users_ids.update(users) + extra_users_ids.discard(user.to_string()) + + states = yield self.presence_handler.get_states( + extra_users_ids, + as_event=True, + ) + presence.extend(states) + + presence = sync_config.filter_collection.filter_presence( + presence + ) + + sync_result_builer.presence = presence + + @defer.inlineCallbacks + def generate_sync_entry_for_rooms(self, sync_result_builer, account_data_by_room): + user_id = sync_result_builer.sync_config.user.to_string() + + now_token, ephemeral_by_room = yield self.ephemeral_by_room( + sync_result_builer.sync_config, sync_result_builer.now_token + ) + sync_result_builer.now_token = now_token + + ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( + "m.ignored_user_list", user_id=user_id, + ) + + if ignored_account_data: + ignored_users = ignored_account_data.get("ignored_users", {}).keys() + else: + ignored_users = frozenset() + + if sync_result_builer.since_token: + res = yield self._get_rooms_changed(sync_result_builer, ignored_users) + joined, invited, archived, newly_joined_rooms = res + + tags_by_room = yield self.store.get_updated_tags( + user_id, + sync_result_builer.since_token.account_data_key, + ) + else: + res = yield self._get_all_rooms(sync_result_builer, ignored_users) + joined, invited, archived, newly_joined_rooms = res + + tags_by_room = yield self.store.get_tags_for_user(user_id) + + for room_entry in joined: + yield self._generate_room_entry( + "joined", + sync_result_builer, + ignored_users, + room_entry, + ephemeral=ephemeral_by_room.get(room_entry.room_id, []), + tags=tags_by_room.get(room_entry.room_id), + account_data=account_data_by_room.get(room_entry.room_id, {}), + always_include=sync_result_builer.full_state, + ) + for room_entry in archived: + yield self._generate_room_entry( + "archived", + sync_result_builer, + ignored_users, + room_entry, + ephemeral=ephemeral_by_room.get(room_entry.room_id, []), + tags=tags_by_room.get(room_entry.room_id), + account_data=account_data_by_room.get(room_entry.room_id, {}), + always_include=sync_result_builer.full_state, + ) + + sync_result_builer.invited.extend(invited) + + # Now we want to get any newly joined users + newly_joined_users = set() + for joined_sync in sync_result_builer.joined: + it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values()) + for event in it: + if event.type == EventTypes.Member: + if event.membership == Membership.JOIN: + newly_joined_users.add(event.state_key) + + defer.returnValue((newly_joined_rooms, newly_joined_users)) + + @defer.inlineCallbacks + def _get_rooms_changed(self, sync_result_builer, ignored_users): + user_id = sync_result_builer.sync_config.user.to_string() + since_token = sync_result_builer.since_token + now_token = sync_result_builer.now_token + sync_config = sync_result_builer.sync_config + + assert since_token + + app_service = yield self.store.get_app_service_by_user_id(user_id) + if app_service: + rooms = yield self.store.get_app_service_rooms(app_service) + joined_room_ids = set(r.room_id for r in rooms) + else: + rooms = yield self.store.get_rooms_for_user(user_id) + joined_room_ids = set(r.room_id for r in rooms) + + # Get a list of membership change events that have happened. + rooms_changed = yield self.store.get_membership_changes_for_user( + user_id, since_token.room_key, now_token.room_key + ) + + mem_change_events_by_room_id = {} + for event in rooms_changed: + mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) + + newly_joined_rooms = [] + archived = [] + invited = [] + for room_id, events in mem_change_events_by_room_id.items(): + non_joins = [e for e in events if e.membership != Membership.JOIN] + has_join = len(non_joins) != len(events) + + # We want to figure out if we joined the room at some point since + # the last sync (even if we have since left). This is to make sure + # we do send down the room, and with full state, where necessary + if room_id in joined_room_ids or has_join: + old_state = yield self.get_state_at(room_id, since_token) + old_mem_ev = old_state.get((EventTypes.Member, user_id), None) + if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: + newly_joined_rooms.append(room_id) + + if room_id in joined_room_ids: + continue + + if not non_joins: + continue + + # Only bother if we're still currently invited + should_invite = non_joins[-1].membership == Membership.INVITE + if should_invite: + if event.sender not in ignored_users: + room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) + if room_sync: + invited.append(room_sync) + + # Always include leave/ban events. Just take the last one. + # TODO: How do we handle ban -> leave in same batch? + leave_events = [ + e for e in non_joins + if e.membership in (Membership.LEAVE, Membership.BAN) + ] + + if leave_events: + leave_event = leave_events[-1] + leave_stream_token = yield self.store.get_stream_token_for_event( + leave_event.event_id + ) + leave_token = since_token.copy_and_replace( + "room_key", leave_stream_token + ) + + if since_token and since_token.is_after(leave_token): + continue + + archived.append(RoomSyncResultBuilder( + room_id=room_id, + events=None, + newly_joined=room_id in newly_joined_rooms, + full_state=False, + since_token=since_token, + upto_token=leave_token, + )) + + timeline_limit = sync_config.filter_collection.timeline_limit() + + # Get all events for rooms we're currently joined to. + room_to_events = yield self.store.get_room_events_stream_for_rooms( + room_ids=joined_room_ids, + from_key=since_token.room_key, + to_key=now_token.room_key, + limit=timeline_limit + 1, + ) + + joined = [] + # We loop through all room ids, even if there are no new events, in case + # there are non room events taht we need to notify about. + for room_id in joined_room_ids: + room_entry = room_to_events.get(room_id, None) + + if room_entry: + events, start_key = room_entry + + prev_batch_token = now_token.copy_and_replace("room_key", start_key) + + joined.append(RoomSyncResultBuilder( + room_id=room_id, + events=events, + newly_joined=room_id in newly_joined_rooms, + full_state=False, + since_token=None if room_id in newly_joined_rooms else since_token, + upto_token=prev_batch_token, + )) + else: + joined.append(RoomSyncResultBuilder( + room_id=room_id, + events=[], + newly_joined=room_id in newly_joined_rooms, + full_state=False, + since_token=since_token, + upto_token=since_token, + )) + + defer.returnValue((joined, invited, archived, newly_joined_rooms)) + + @defer.inlineCallbacks + def _get_all_rooms(self, sync_result_builer, ignored_users): + user_id = sync_result_builer.sync_config.user.to_string() + since_token = sync_result_builer.since_token + now_token = sync_result_builer.now_token + sync_config = sync_result_builer.sync_config + + membership_list = ( + Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN + ) + + room_list = yield self.store.get_rooms_for_user_where_membership_is( + user_id=user_id, + membership_list=membership_list + ) + + joined = [] + invited = [] + archived = [] + + for event in room_list: + if event.membership == Membership.JOIN: + joined.append(RoomSyncResultBuilder( + room_id=event.room_id, + events=None, + newly_joined=False, + full_state=True, + since_token=since_token, + upto_token=now_token, + )) + elif event.membership == Membership.INVITE: + if event.sender in ignored_users: + continue + invite = yield self.store.get_event(event.event_id) + invited.append(InvitedSyncResult( + room_id=event.room_id, + invite=invite, + )) + elif event.membership in (Membership.LEAVE, Membership.BAN): + # Always send down rooms we were banned or kicked from. + if not sync_config.filter_collection.include_leave: + if event.membership == Membership.LEAVE: + if user_id == event.sender: + continue + + leave_token = now_token.copy_and_replace( + "room_key", "s%d" % (event.stream_ordering,) + ) + archived.append(RoomSyncResultBuilder( + room_id=event.room_id, + events=None, + newly_joined=False, + full_state=True, + since_token=since_token, + upto_token=leave_token, + )) + + defer.returnValue((joined, invited, archived, [])) + + @defer.inlineCallbacks + def _generate_room_entry(self, room_type, sync_result_builer, ignored_users, + room_builder, ephemeral, tags, account_data, + always_include=False): + since_token = sync_result_builer.since_token + now_token = sync_result_builer.now_token + sync_config = sync_result_builer.sync_config + + room_id = room_builder.room_id + events = room_builder.events + newly_joined = room_builder.newly_joined + full_state = ( + room_builder.full_state + or newly_joined + or sync_result_builer.full_state + ) + since_token = room_builder.since_token + upto_token = room_builder.upto_token + + batch = yield self.load_filtered_recents( + room_id, sync_config, + now_token=upto_token, + since_token=since_token, + recents=events, + newly_joined_room=newly_joined, # FIXME + ) + + account_data_events = [] + if tags is not None: + account_data_events.append({ + "type": "m.tag", + "content": {"tags": tags}, + }) + + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + account_data = sync_config.filter_collection.filter_room_account_data( + account_data_events + ) + + ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral) + + if not (always_include or batch or account_data or ephemeral or full_state): + return + + state = yield self.compute_state_delta( + room_id, batch, sync_config, since_token, now_token, + full_state=full_state + ) + + if room_type == "joined": + unread_notifications = {} + room_sync = JoinedSyncResult( + room_id=room_id, + timeline=batch, + state=state, + ephemeral=ephemeral, + account_data=account_data_events, + unread_notifications=unread_notifications, + ) + + if room_sync or always_include: + notifs = yield self.unread_notifs_for_room_id( + room_id, sync_config + ) + + if notifs is not None: + unread_notifications["notification_count"] = notifs["notify_count"] + unread_notifications["highlight_count"] = notifs["highlight_count"] + + sync_result_builer.joined.append(room_sync) + elif room_type == "archived": + room_sync = ArchivedSyncResult( + room_id=room_id, + timeline=batch, + state=state, + account_data=account_data, + ) + if room_sync or always_include: + sync_result_builer.archived.append(room_sync) + def _action_has_highlight(actions): for action in actions: @@ -1057,3 +1016,28 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): (e.type, e.state_key): e for e in evs } + + +class SyncResultBuilder(object): + def __init__(self, sync_config, full_state, since_token, now_token): + self.sync_config = sync_config + self.full_state = full_state + self.since_token = since_token + self.now_token = now_token + + self.presence = [] + self.account_data = [] + self.joined = [] + self.invited = [] + self.archived = [] + + +class RoomSyncResultBuilder(object): + def __init__(self, room_id, events, newly_joined, full_state, since_token, + upto_token): + self.room_id = room_id + self.events = events + self.newly_joined = newly_joined + self.full_state = full_state + self.since_token = since_token + self.upto_token = upto_token From c0c79ef444ca0f21e8324abc8a813026aaf6cf17 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 May 2016 18:21:27 +0100 Subject: [PATCH 02/16] Add back concurrently_execute --- synapse/handlers/sync.py | 34 +++++++++------------------------- 1 file changed, 9 insertions(+), 25 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 80eccf19ae..bc6d6af133 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes from synapse.util.async import concurrently_execute from synapse.util.logcontext import LoggingContext @@ -478,26 +477,6 @@ class SyncHandler(object): for e in sync_config.filter_collection.filter_room_state(state.values()) }) - def check_joined_room(self, sync_config, state_delta): - """ - Check if the user has just joined the given room (so should - be given the full state) - - Args: - sync_config(synapse.handlers.sync.SyncConfig): - state_delta(dict[(str,str), synapse.events.FrozenEvent]): the - difference in state since the last sync - - Returns: - A deferred Tuple (state_delta, limited) - """ - join_event = state_delta.get(( - EventTypes.Member, sync_config.user.to_string()), None) - if join_event is not None: - if join_event.content["membership"] == Membership.JOIN: - return True - return False - @defer.inlineCallbacks def unread_notifs_for_room_id(self, room_id, sync_config): with Measure(self.clock, "unread_notifs_for_room_id"): @@ -664,8 +643,8 @@ class SyncHandler(object): tags_by_room = yield self.store.get_tags_for_user(user_id) - for room_entry in joined: - yield self._generate_room_entry( + def handle_joined(room_entry): + return self._generate_room_entry( "joined", sync_result_builer, ignored_users, @@ -675,8 +654,11 @@ class SyncHandler(object): account_data=account_data_by_room.get(room_entry.room_id, {}), always_include=sync_result_builer.full_state, ) - for room_entry in archived: - yield self._generate_room_entry( + + yield concurrently_execute(handle_joined, joined, 10) + + def handle_archived(room_entry): + return self._generate_room_entry( "archived", sync_result_builer, ignored_users, @@ -687,6 +669,8 @@ class SyncHandler(object): always_include=sync_result_builer.full_state, ) + yield concurrently_execute(handle_archived, archived, 10) + sync_result_builer.invited.extend(invited) # Now we want to get any newly joined users From 137e6a45577d5850ef6936670791af12c2fe74d9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 09:43:35 +0100 Subject: [PATCH 03/16] Shuffle things room --- synapse/handlers/sync.py | 70 +++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 37 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index bc6d6af133..6b7c6a436e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -499,6 +499,10 @@ class SyncHandler(object): @defer.inlineCallbacks def generate_sync_result(self, sync_config, since_token=None, full_state=False): + # NB: The now_token gets changed by some of the generate_sync_* methods, + # this is due to some of the underlying streams not supporting the ability + # to query up to a given point. + # Always use the `now_token` in `SyncResultBuilder` now_token = yield self.event_sources.get_current_token() sync_result_builer = SyncResultBuilder( @@ -511,9 +515,10 @@ class SyncHandler(object): sync_result_builer ) - newly_joined_rooms, newly_joined_users = yield self.generate_sync_entry_for_rooms( + res = yield self.generate_sync_entry_for_rooms( sync_result_builer, account_data_by_room ) + newly_joined_rooms, newly_joined_users = res yield self.generate_sync_entry_for_presence( sync_result_builer, newly_joined_rooms, newly_joined_users @@ -631,7 +636,7 @@ class SyncHandler(object): if sync_result_builer.since_token: res = yield self._get_rooms_changed(sync_result_builer, ignored_users) - joined, invited, archived, newly_joined_rooms = res + room_entries, invited, newly_joined_rooms = res tags_by_room = yield self.store.get_updated_tags( user_id, @@ -639,13 +644,12 @@ class SyncHandler(object): ) else: res = yield self._get_all_rooms(sync_result_builer, ignored_users) - joined, invited, archived, newly_joined_rooms = res + room_entries, invited, newly_joined_rooms = res tags_by_room = yield self.store.get_tags_for_user(user_id) - def handle_joined(room_entry): + def handle_room_entries(room_entry): return self._generate_room_entry( - "joined", sync_result_builer, ignored_users, room_entry, @@ -655,21 +659,7 @@ class SyncHandler(object): always_include=sync_result_builer.full_state, ) - yield concurrently_execute(handle_joined, joined, 10) - - def handle_archived(room_entry): - return self._generate_room_entry( - "archived", - sync_result_builer, - ignored_users, - room_entry, - ephemeral=ephemeral_by_room.get(room_entry.room_id, []), - tags=tags_by_room.get(room_entry.room_id), - account_data=account_data_by_room.get(room_entry.room_id, {}), - always_include=sync_result_builer.full_state, - ) - - yield concurrently_execute(handle_archived, archived, 10) + yield concurrently_execute(handle_room_entries, room_entries, 10) sync_result_builer.invited.extend(invited) @@ -711,7 +701,7 @@ class SyncHandler(object): mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) newly_joined_rooms = [] - archived = [] + room_entries = [] invited = [] for room_id, events in mem_change_events_by_room_id.items(): non_joins = [e for e in events if e.membership != Membership.JOIN] @@ -759,8 +749,9 @@ class SyncHandler(object): if since_token and since_token.is_after(leave_token): continue - archived.append(RoomSyncResultBuilder( + room_entries.append(RoomSyncResultBuilder( room_id=room_id, + rtype="archived", events=None, newly_joined=room_id in newly_joined_rooms, full_state=False, @@ -778,7 +769,6 @@ class SyncHandler(object): limit=timeline_limit + 1, ) - joined = [] # We loop through all room ids, even if there are no new events, in case # there are non room events taht we need to notify about. for room_id in joined_room_ids: @@ -789,8 +779,9 @@ class SyncHandler(object): prev_batch_token = now_token.copy_and_replace("room_key", start_key) - joined.append(RoomSyncResultBuilder( + room_entries.append(RoomSyncResultBuilder( room_id=room_id, + rtype="joined", events=events, newly_joined=room_id in newly_joined_rooms, full_state=False, @@ -798,8 +789,9 @@ class SyncHandler(object): upto_token=prev_batch_token, )) else: - joined.append(RoomSyncResultBuilder( + room_entries.append(RoomSyncResultBuilder( room_id=room_id, + rtype="joined", events=[], newly_joined=room_id in newly_joined_rooms, full_state=False, @@ -807,7 +799,7 @@ class SyncHandler(object): upto_token=since_token, )) - defer.returnValue((joined, invited, archived, newly_joined_rooms)) + defer.returnValue((room_entries, invited, newly_joined_rooms)) @defer.inlineCallbacks def _get_all_rooms(self, sync_result_builer, ignored_users): @@ -825,14 +817,14 @@ class SyncHandler(object): membership_list=membership_list ) - joined = [] + room_entries = [] invited = [] - archived = [] for event in room_list: if event.membership == Membership.JOIN: - joined.append(RoomSyncResultBuilder( + room_entries.append(RoomSyncResultBuilder( room_id=event.room_id, + rtype="joined", events=None, newly_joined=False, full_state=True, @@ -857,8 +849,9 @@ class SyncHandler(object): leave_token = now_token.copy_and_replace( "room_key", "s%d" % (event.stream_ordering,) ) - archived.append(RoomSyncResultBuilder( + room_entries.append(RoomSyncResultBuilder( room_id=event.room_id, + rtype="archived", events=None, newly_joined=False, full_state=True, @@ -866,10 +859,10 @@ class SyncHandler(object): upto_token=leave_token, )) - defer.returnValue((joined, invited, archived, [])) + defer.returnValue((room_entries, invited, [])) @defer.inlineCallbacks - def _generate_room_entry(self, room_type, sync_result_builer, ignored_users, + def _generate_room_entry(self, sync_result_builer, ignored_users, room_builder, ephemeral, tags, account_data, always_include=False): since_token = sync_result_builer.since_token @@ -892,7 +885,7 @@ class SyncHandler(object): now_token=upto_token, since_token=since_token, recents=events, - newly_joined_room=newly_joined, # FIXME + newly_joined_room=newly_joined, ) account_data_events = [] @@ -922,7 +915,7 @@ class SyncHandler(object): full_state=full_state ) - if room_type == "joined": + if room_builder.rtype == "joined": unread_notifications = {} room_sync = JoinedSyncResult( room_id=room_id, @@ -943,7 +936,7 @@ class SyncHandler(object): unread_notifications["highlight_count"] = notifs["highlight_count"] sync_result_builer.joined.append(room_sync) - elif room_type == "archived": + elif room_builder.rtype == "archived": room_sync = ArchivedSyncResult( room_id=room_id, timeline=batch, @@ -952,6 +945,8 @@ class SyncHandler(object): ) if room_sync or always_include: sync_result_builer.archived.append(room_sync) + else: + raise Exception("Unrecognized rtype: %r", room_builder.rtype) def _action_has_highlight(actions): @@ -1017,9 +1012,10 @@ class SyncResultBuilder(object): class RoomSyncResultBuilder(object): - def __init__(self, room_id, events, newly_joined, full_state, since_token, - upto_token): + def __init__(self, room_id, rtype, events, newly_joined, full_state, + since_token, upto_token): self.room_id = room_id + self.rtype = rtype self.events = events self.newly_joined = newly_joined self.full_state = full_state From 84f94e4cbbd8c79d867503159b564aaf233e46d5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 10:14:53 +0100 Subject: [PATCH 04/16] Add comments --- synapse/handlers/sync.py | 112 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 105 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6b7c6a436e..271dd4c147 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -499,6 +499,17 @@ class SyncHandler(object): @defer.inlineCallbacks def generate_sync_result(self, sync_config, since_token=None, full_state=False): + """Generates a sync result. + + Args: + sync_config (SyncConfig) + since_token (StreamToken) + full_state (bool) + + Returns: + Deferred(SyncResult) + """ + # NB: The now_token gets changed by some of the generate_sync_* methods, # this is due to some of the underlying streams not supporting the ability # to query up to a given point. @@ -511,16 +522,16 @@ class SyncHandler(object): now_token=now_token, ) - account_data_by_room = yield self.generate_sync_entry_for_account_data( + account_data_by_room = yield self._generate_sync_entry_for_account_data( sync_result_builer ) - res = yield self.generate_sync_entry_for_rooms( + res = yield self._generate_sync_entry_for_rooms( sync_result_builer, account_data_by_room ) newly_joined_rooms, newly_joined_users = res - yield self.generate_sync_entry_for_presence( + yield self._generate_sync_entry_for_presence( sync_result_builer, newly_joined_rooms, newly_joined_users ) @@ -534,7 +545,16 @@ class SyncHandler(object): )) @defer.inlineCallbacks - def generate_sync_entry_for_account_data(self, sync_result_builer): + def _generate_sync_entry_for_account_data(self, sync_result_builer): + """Generates the account data portion of the sync response. Populates + `sync_result_builer` with the result. + + Args: + sync_result_builer(SyncResultBuilder) + + Returns: + Deferred(dict): A dictionary containing the per room account data. + """ sync_config = sync_result_builer.sync_config user_id = sync_result_builer.sync_config.user.to_string() since_token = sync_result_builer.since_token @@ -575,8 +595,18 @@ class SyncHandler(object): defer.returnValue(account_data_by_room) @defer.inlineCallbacks - def generate_sync_entry_for_presence(self, sync_result_builer, newly_joined_rooms, - newly_joined_users): + def _generate_sync_entry_for_presence(self, sync_result_builer, newly_joined_rooms, + newly_joined_users): + """Generates the presence portion of the sync response. Populates the + `sync_result_builer` with the result. + + Args: + sync_result_builer(SyncResultBuilder) + newly_joined_rooms(list): List of rooms that the user has joined + since the last sync (or empty if an initial sync) + newly_joined_users(list): List of users that have joined rooms + since the last sync (or empty if an initial sync) + """ now_token = sync_result_builer.now_token sync_config = sync_result_builer.sync_config user = sync_result_builer.sync_config.user @@ -617,7 +647,18 @@ class SyncHandler(object): sync_result_builer.presence = presence @defer.inlineCallbacks - def generate_sync_entry_for_rooms(self, sync_result_builer, account_data_by_room): + def _generate_sync_entry_for_rooms(self, sync_result_builer, account_data_by_room): + """Generates the rooms portion of the sync response. Populates the + `sync_result_builer` with the result. + + Args: + sync_result_builer(SyncResultBuilder) + account_data_by_room(dict): Dictionary of per room account data + + Returns: + Deferred(tuple): Returns a 2-tuple of + `(newly_joined_rooms, newly_joined_users)` + """ user_id = sync_result_builer.sync_config.user.to_string() now_token, ephemeral_by_room = yield self.ephemeral_by_room( @@ -676,6 +717,16 @@ class SyncHandler(object): @defer.inlineCallbacks def _get_rooms_changed(self, sync_result_builer, ignored_users): + """Gets the the changes that have happened since the last sync. + + Args: + sync_result_builer(SyncResultBuilder) + ignored_users(set(str)): Set of users ignored by user. + + Returns: + Deferred(tuple): Returns a tuple of the form: + `([RoomSyncResultBuilder], [InvitedSyncResult], newly_joined_rooms)` + """ user_id = sync_result_builer.sync_config.user.to_string() since_token = sync_result_builer.since_token now_token = sync_result_builer.now_token @@ -803,6 +854,17 @@ class SyncHandler(object): @defer.inlineCallbacks def _get_all_rooms(self, sync_result_builer, ignored_users): + """Returns entries for all rooms for the user. + + Args: + sync_result_builer(SyncResultBuilder) + ignored_users(set(str)): Set of users ignored by user. + + Returns: + Deferred(tuple): Returns a tuple of the form: + `([RoomSyncResultBuilder], [InvitedSyncResult], [])` + """ + user_id = sync_result_builer.sync_config.user.to_string() since_token = sync_result_builer.since_token now_token = sync_result_builer.now_token @@ -865,6 +927,20 @@ class SyncHandler(object): def _generate_room_entry(self, sync_result_builer, ignored_users, room_builder, ephemeral, tags, account_data, always_include=False): + """Populates the `joined` and `archived` section of `sync_result_builer` + based on the `room_builder`. + + Args: + sync_result_builer(SyncResultBuilder) + ignored_users(set(str)): Set of users ignored by user. + room_builder(RoomSyncResultBuilder) + ephemeral(list): List of new ephemeral events for room + tags(list): List of *all* tags for room, or None if there has been + no change. + account_data(list): List of new account data for room + always_include(bool): Always include this room in the sync response, + even if empty. + """ since_token = sync_result_builer.since_token now_token = sync_result_builer.now_token sync_config = sync_result_builer.sync_config @@ -998,7 +1074,15 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): class SyncResultBuilder(object): + "Used to help build up a new SyncResult for a user" def __init__(self, sync_config, full_state, since_token, now_token): + """ + Args: + sync_config(SyncConfig) + full_state(bool): The full_state flag as specified by user + since_token(StreamToken): The token supplied by user, or None. + now_token(StreamToken): The token to sync up to. + """ self.sync_config = sync_config self.full_state = full_state self.since_token = since_token @@ -1012,8 +1096,22 @@ class SyncResultBuilder(object): class RoomSyncResultBuilder(object): + """Stores information needed to create either a `JoinedSyncResult` or + `ArchivedSyncResult`. + """ def __init__(self, room_id, rtype, events, newly_joined, full_state, since_token, upto_token): + """ + Args: + room_id(str) + rtype(str): One of `"joined"` or `"archived"` + events(list): List of events to include in the room, (more events + may be added when generating result). + newly_joined(bool): If the user has newly joined the room + full_state(bool): Whether the full state should be sent in result + since_token(StreamToken): Earliest point to return events from, or None + upto_token(StreamToken): Latest point to return events from. + """ self.room_id = room_id self.rtype = rtype self.events = events From 79bea8ab9a4c4cb4a0907784603698731424c00a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 10:22:24 +0100 Subject: [PATCH 05/16] Inline function. Make load_filtered_recents private --- synapse/handlers/sync.py | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 271dd4c147..3aa4432d68 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -203,17 +203,6 @@ class SyncHandler(object): rules = format_push_rules_for_user(user, rawrules, enabled_map) defer.returnValue(rules) - def account_data_for_user(self, account_data): - account_data_events = [] - - for account_data_type, content in account_data.items(): - account_data_events.append({ - "type": account_data_type, - "content": content, - }) - - return account_data_events - @defer.inlineCallbacks def ephemeral_by_room(self, sync_config, now_token, since_token=None): """Get the ephemeral events for each room the user is in @@ -277,8 +266,8 @@ class SyncHandler(object): defer.returnValue((now_token, ephemeral_by_room)) @defer.inlineCallbacks - def load_filtered_recents(self, room_id, sync_config, now_token, - since_token=None, recents=None, newly_joined_room=False): + def _load_filtered_recents(self, room_id, sync_config, now_token, + since_token=None, recents=None, newly_joined_room=False): """ Returns: a Deferred TimelineBatch @@ -586,9 +575,10 @@ class SyncHandler(object): sync_config.user ) - account_data_for_user = sync_config.filter_collection.filter_account_data( - self.account_data_for_user(account_data) - ) + account_data_for_user = sync_config.filter_collection.filter_account_data([ + {"type": account_data_type, "content": content} + for account_data_type, content in account_data.items() + ]) sync_result_builer.account_data = account_data_for_user @@ -956,7 +946,7 @@ class SyncHandler(object): since_token = room_builder.since_token upto_token = room_builder.upto_token - batch = yield self.load_filtered_recents( + batch = yield self._load_filtered_recents( room_id, sync_config, now_token=upto_token, since_token=since_token, From be2c67738640ff88fcf63701c9e3d82afc385e47 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 10:53:03 +0100 Subject: [PATCH 06/16] Spell builder correctly --- synapse/handlers/sync.py | 126 +++++++++++++++++++-------------------- 1 file changed, 63 insertions(+), 63 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 3aa4432d68..12a4cc8b57 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -505,50 +505,50 @@ class SyncHandler(object): # Always use the `now_token` in `SyncResultBuilder` now_token = yield self.event_sources.get_current_token() - sync_result_builer = SyncResultBuilder( + sync_result_builder = SyncResultBuilder( sync_config, full_state, since_token=since_token, now_token=now_token, ) account_data_by_room = yield self._generate_sync_entry_for_account_data( - sync_result_builer + sync_result_builder ) res = yield self._generate_sync_entry_for_rooms( - sync_result_builer, account_data_by_room + sync_result_builder, account_data_by_room ) newly_joined_rooms, newly_joined_users = res yield self._generate_sync_entry_for_presence( - sync_result_builer, newly_joined_rooms, newly_joined_users + sync_result_builder, newly_joined_rooms, newly_joined_users ) defer.returnValue(SyncResult( - presence=sync_result_builer.presence, - account_data=sync_result_builer.account_data, - joined=sync_result_builer.joined, - invited=sync_result_builer.invited, - archived=sync_result_builer.archived, - next_batch=sync_result_builer.now_token, + presence=sync_result_builder.presence, + account_data=sync_result_builder.account_data, + joined=sync_result_builder.joined, + invited=sync_result_builder.invited, + archived=sync_result_builder.archived, + next_batch=sync_result_builder.now_token, )) @defer.inlineCallbacks - def _generate_sync_entry_for_account_data(self, sync_result_builer): + def _generate_sync_entry_for_account_data(self, sync_result_builder): """Generates the account data portion of the sync response. Populates - `sync_result_builer` with the result. + `sync_result_builder` with the result. Args: - sync_result_builer(SyncResultBuilder) + sync_result_builder(SyncResultBuilder) Returns: Deferred(dict): A dictionary containing the per room account data. """ - sync_config = sync_result_builer.sync_config - user_id = sync_result_builer.sync_config.user.to_string() - since_token = sync_result_builer.since_token + sync_config = sync_result_builder.sync_config + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token - if since_token and not sync_result_builer.full_state: + if since_token and not sync_result_builder.full_state: account_data, account_data_by_room = ( yield self.store.get_updated_account_data_for_user( user_id, @@ -580,31 +580,31 @@ class SyncHandler(object): for account_data_type, content in account_data.items() ]) - sync_result_builer.account_data = account_data_for_user + sync_result_builder.account_data = account_data_for_user defer.returnValue(account_data_by_room) @defer.inlineCallbacks - def _generate_sync_entry_for_presence(self, sync_result_builer, newly_joined_rooms, + def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_rooms, newly_joined_users): """Generates the presence portion of the sync response. Populates the - `sync_result_builer` with the result. + `sync_result_builder` with the result. Args: - sync_result_builer(SyncResultBuilder) + sync_result_builder(SyncResultBuilder) newly_joined_rooms(list): List of rooms that the user has joined since the last sync (or empty if an initial sync) newly_joined_users(list): List of users that have joined rooms since the last sync (or empty if an initial sync) """ - now_token = sync_result_builer.now_token - sync_config = sync_result_builer.sync_config - user = sync_result_builer.sync_config.user + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config + user = sync_result_builder.sync_config.user presence_source = self.event_sources.sources["presence"] - since_token = sync_result_builer.since_token - if since_token and not sync_result_builer.full_state: + since_token = sync_result_builder.since_token + if since_token and not sync_result_builder.full_state: presence_key = since_token.presence_key else: presence_key = None @@ -614,7 +614,7 @@ class SyncHandler(object): from_key=presence_key, is_guest=sync_config.is_guest, ) - sync_result_builer.now_token = now_token.copy_and_replace( + sync_result_builder.now_token = now_token.copy_and_replace( "presence_key", presence_key ) @@ -634,27 +634,27 @@ class SyncHandler(object): presence ) - sync_result_builer.presence = presence + sync_result_builder.presence = presence @defer.inlineCallbacks - def _generate_sync_entry_for_rooms(self, sync_result_builer, account_data_by_room): + def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room): """Generates the rooms portion of the sync response. Populates the - `sync_result_builer` with the result. + `sync_result_builder` with the result. Args: - sync_result_builer(SyncResultBuilder) + sync_result_builder(SyncResultBuilder) account_data_by_room(dict): Dictionary of per room account data Returns: Deferred(tuple): Returns a 2-tuple of `(newly_joined_rooms, newly_joined_users)` """ - user_id = sync_result_builer.sync_config.user.to_string() + user_id = sync_result_builder.sync_config.user.to_string() now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_result_builer.sync_config, sync_result_builer.now_token + sync_result_builder.sync_config, sync_result_builder.now_token ) - sync_result_builer.now_token = now_token + sync_result_builder.now_token = now_token ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( "m.ignored_user_list", user_id=user_id, @@ -665,38 +665,38 @@ class SyncHandler(object): else: ignored_users = frozenset() - if sync_result_builer.since_token: - res = yield self._get_rooms_changed(sync_result_builer, ignored_users) + if sync_result_builder.since_token: + res = yield self._get_rooms_changed(sync_result_builder, ignored_users) room_entries, invited, newly_joined_rooms = res tags_by_room = yield self.store.get_updated_tags( user_id, - sync_result_builer.since_token.account_data_key, + sync_result_builder.since_token.account_data_key, ) else: - res = yield self._get_all_rooms(sync_result_builer, ignored_users) + res = yield self._get_all_rooms(sync_result_builder, ignored_users) room_entries, invited, newly_joined_rooms = res tags_by_room = yield self.store.get_tags_for_user(user_id) def handle_room_entries(room_entry): return self._generate_room_entry( - sync_result_builer, + sync_result_builder, ignored_users, room_entry, ephemeral=ephemeral_by_room.get(room_entry.room_id, []), tags=tags_by_room.get(room_entry.room_id), account_data=account_data_by_room.get(room_entry.room_id, {}), - always_include=sync_result_builer.full_state, + always_include=sync_result_builder.full_state, ) yield concurrently_execute(handle_room_entries, room_entries, 10) - sync_result_builer.invited.extend(invited) + sync_result_builder.invited.extend(invited) # Now we want to get any newly joined users newly_joined_users = set() - for joined_sync in sync_result_builer.joined: + for joined_sync in sync_result_builder.joined: it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values()) for event in it: if event.type == EventTypes.Member: @@ -706,21 +706,21 @@ class SyncHandler(object): defer.returnValue((newly_joined_rooms, newly_joined_users)) @defer.inlineCallbacks - def _get_rooms_changed(self, sync_result_builer, ignored_users): + def _get_rooms_changed(self, sync_result_builder, ignored_users): """Gets the the changes that have happened since the last sync. Args: - sync_result_builer(SyncResultBuilder) + sync_result_builder(SyncResultBuilder) ignored_users(set(str)): Set of users ignored by user. Returns: Deferred(tuple): Returns a tuple of the form: `([RoomSyncResultBuilder], [InvitedSyncResult], newly_joined_rooms)` """ - user_id = sync_result_builer.sync_config.user.to_string() - since_token = sync_result_builer.since_token - now_token = sync_result_builer.now_token - sync_config = sync_result_builer.sync_config + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config assert since_token @@ -843,11 +843,11 @@ class SyncHandler(object): defer.returnValue((room_entries, invited, newly_joined_rooms)) @defer.inlineCallbacks - def _get_all_rooms(self, sync_result_builer, ignored_users): + def _get_all_rooms(self, sync_result_builder, ignored_users): """Returns entries for all rooms for the user. Args: - sync_result_builer(SyncResultBuilder) + sync_result_builder(SyncResultBuilder) ignored_users(set(str)): Set of users ignored by user. Returns: @@ -855,10 +855,10 @@ class SyncHandler(object): `([RoomSyncResultBuilder], [InvitedSyncResult], [])` """ - user_id = sync_result_builer.sync_config.user.to_string() - since_token = sync_result_builer.since_token - now_token = sync_result_builer.now_token - sync_config = sync_result_builer.sync_config + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config membership_list = ( Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN @@ -914,14 +914,14 @@ class SyncHandler(object): defer.returnValue((room_entries, invited, [])) @defer.inlineCallbacks - def _generate_room_entry(self, sync_result_builer, ignored_users, + def _generate_room_entry(self, sync_result_builder, ignored_users, room_builder, ephemeral, tags, account_data, always_include=False): - """Populates the `joined` and `archived` section of `sync_result_builer` + """Populates the `joined` and `archived` section of `sync_result_builder` based on the `room_builder`. Args: - sync_result_builer(SyncResultBuilder) + sync_result_builder(SyncResultBuilder) ignored_users(set(str)): Set of users ignored by user. room_builder(RoomSyncResultBuilder) ephemeral(list): List of new ephemeral events for room @@ -931,9 +931,9 @@ class SyncHandler(object): always_include(bool): Always include this room in the sync response, even if empty. """ - since_token = sync_result_builer.since_token - now_token = sync_result_builer.now_token - sync_config = sync_result_builer.sync_config + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config room_id = room_builder.room_id events = room_builder.events @@ -941,7 +941,7 @@ class SyncHandler(object): full_state = ( room_builder.full_state or newly_joined - or sync_result_builer.full_state + or sync_result_builder.full_state ) since_token = room_builder.since_token upto_token = room_builder.upto_token @@ -1001,7 +1001,7 @@ class SyncHandler(object): unread_notifications["notification_count"] = notifs["notify_count"] unread_notifications["highlight_count"] = notifs["highlight_count"] - sync_result_builer.joined.append(room_sync) + sync_result_builder.joined.append(room_sync) elif room_builder.rtype == "archived": room_sync = ArchivedSyncResult( room_id=room_id, @@ -1010,7 +1010,7 @@ class SyncHandler(object): account_data=account_data, ) if room_sync or always_include: - sync_result_builer.archived.append(room_sync) + sync_result_builder.archived.append(room_sync) else: raise Exception("Unrecognized rtype: %r", room_builder.rtype) From b08ad0389e0e412798f32d9f5656db483238173d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 11:04:35 +0100 Subject: [PATCH 07/16] Only include non-offline presence in initial sync --- synapse/handlers/sync.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 12a4cc8b57..8143171c11 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -606,13 +606,16 @@ class SyncHandler(object): since_token = sync_result_builder.since_token if since_token and not sync_result_builder.full_state: presence_key = since_token.presence_key + include_offline = True else: presence_key = None + include_offline = False presence, presence_key = yield presence_source.get_new_events( user=user, from_key=presence_key, is_guest=sync_config.is_guest, + include_offline=include_offline, ) sync_result_builder.now_token = now_token.copy_and_replace( "presence_key", presence_key From 1c5ed2a19ba6da4478c54df072e623598c0ea60d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 11:21:34 +0100 Subject: [PATCH 08/16] Only work out newly_joined_users for incremental sync --- synapse/handlers/sync.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 8143171c11..476dcf38e2 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -699,12 +699,15 @@ class SyncHandler(object): # Now we want to get any newly joined users newly_joined_users = set() - for joined_sync in sync_result_builder.joined: - it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values()) - for event in it: - if event.type == EventTypes.Member: - if event.membership == Membership.JOIN: - newly_joined_users.add(event.state_key) + if sync_result_builder.since_token: + for joined_sync in sync_result_builder.joined: + it = itertools.chain( + joined_sync.timeline.events, joined_sync.state.values() + ) + for event in it: + if event.type == EventTypes.Member: + if event.membership == Membership.JOIN: + newly_joined_users.add(event.state_key) defer.returnValue((newly_joined_rooms, newly_joined_users)) From 69003039973169097592359f3f34fc32c5bbaeb0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 11:44:55 +0100 Subject: [PATCH 09/16] Don't send down all ephemeral events --- synapse/handlers/sync.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 476dcf38e2..6f7dd45ef3 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -655,7 +655,9 @@ class SyncHandler(object): user_id = sync_result_builder.sync_config.user.to_string() now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_result_builder.sync_config, sync_result_builder.now_token + sync_result_builder.sync_config, + now_token=sync_result_builder.now_token, + since_token=sync_result_builder.since_token, ) sync_result_builder.now_token = now_token From faad233ea61cfff2c377609fa1d3c64d39f8a039 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 14:00:43 +0100 Subject: [PATCH 10/16] Change short circuit path --- synapse/handlers/sync.py | 42 +++++++++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6f7dd45ef3..3b89582d79 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -273,23 +273,14 @@ class SyncHandler(object): a Deferred TimelineBatch """ with Measure(self.clock, "load_filtered_recents"): - filtering_factor = 2 timeline_limit = sync_config.filter_collection.timeline_limit() - load_limit = max(timeline_limit * filtering_factor, 10) - max_repeat = 5 # Only try a few times per room, otherwise - room_key = now_token.room_key - end_key = room_key if recents is None or newly_joined_room or timeline_limit < len(recents): limited = True else: limited = False - if since_token: - if not now_token.is_after(since_token): - limited = False - - if recents is not None: + if recents: recents = sync_config.filter_collection.filter_room_timeline(recents) recents = yield filter_events_for_client( self.store, @@ -299,6 +290,19 @@ class SyncHandler(object): else: recents = [] + if not limited: + defer.returnValue(TimelineBatch( + events=recents, + prev_batch=now_token, + limited=False + )) + + filtering_factor = 2 + load_limit = max(timeline_limit * filtering_factor, 10) + max_repeat = 5 # Only try a few times per room, otherwise + room_key = now_token.room_key + end_key = room_key + since_key = None if since_token and not newly_joined_room: since_key = since_token.room_key @@ -939,18 +943,24 @@ class SyncHandler(object): always_include(bool): Always include this room in the sync response, even if empty. """ - since_token = sync_result_builder.since_token - now_token = sync_result_builder.now_token - sync_config = sync_result_builder.sync_config - - room_id = room_builder.room_id - events = room_builder.events newly_joined = room_builder.newly_joined full_state = ( room_builder.full_state or newly_joined or sync_result_builder.full_state ) + events = room_builder.events + + # We want to shortcut out as early as possible. + if not (always_include or account_data or ephemeral or full_state): + if events == [] and tags is None: + return + + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config + + room_id = room_builder.room_id since_token = room_builder.since_token upto_token = room_builder.upto_token From aaa70e26a2eb37fbdf728393148e003dc9866afd Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 1 Jun 2016 22:13:47 +0100 Subject: [PATCH 11/16] special case m.room.third_party_invite event auth to match invites, otherwise they get out of sync and you get https://github.com/vector-im/vector-web/issues/1208 --- synapse/api/auth.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 2474a1453b..007a0998a7 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -120,6 +120,24 @@ class Auth(object): return allowed self.check_event_sender_in_room(event, auth_events) + + # Special case to allow m.room.third_party_invite events wherever + # a user is allowed to issue invites. Fixes + # https://github.com/vector-im/vector-web/issues/1208 hopefully + if event.type == EventTypes.ThirdPartyInvite: + user_level = self._get_user_power_level(event.user_id, auth_events) + invite_level = self._get_named_level(auth_events, "invite", 0) + + if user_level < invite_level: + raise AuthError( + 403, ( + "You cannot issue a third party invite for %s." % + (event.content.display_name,) + ) + ) + else: + return True + self._can_send_event(event, auth_events) if event.type == EventTypes.PowerLevels: From f84b89f0c6b2e67897fec8639b79bf1d45c8f2b6 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 2 Jun 2016 13:29:48 +0100 Subject: [PATCH 12/16] if an email pusher specifies a brand param, use it --- synapse/push/emailpusher.py | 7 ++++++- synapse/push/mailer.py | 4 ++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index a72cba8306..e38ed02006 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -72,7 +72,12 @@ class EmailPusher(object): self.processing = False if self.hs.config.email_enable_notifs: - self.mailer = Mailer(self.hs) + if 'data' in pusherdict and 'brand' in pusherdict['data']: + app_name = pusherdict['data']['brand'] + else: + app_name = self.hs.config.email_app_name + + self.mailer = Mailer(self.hs, app_name) else: self.mailer = None diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index fe5d67a03c..0e9d8ccb53 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -78,12 +78,12 @@ ALLOWED_ATTRS = { class Mailer(object): - def __init__(self, hs): + def __init__(self, hs, app_name): self.hs = hs self.store = self.hs.get_datastore() self.state_handler = self.hs.get_state_handler() loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir) - self.app_name = self.hs.config.email_app_name + self.app_name = app_name env = jinja2.Environment(loader=loader) env.filters["format_ts"] = format_ts_filter env.filters["mxc_to_http"] = self.mxc_to_http_filter From 356f13c0696526032c211c103dad2f57d18473fa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Jun 2016 14:07:38 +0100 Subject: [PATCH 13/16] Disable INCLUDE_ALL_UNREAD_NOTIFS --- synapse/push/emailpusher.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index e38ed02006..2c21ed3088 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -44,7 +44,8 @@ THROTTLE_RESET_AFTER_MS = (12 * 60 * 60 * 1000) # does each email include all unread notifs, or just the ones which have happened # since the last mail? -INCLUDE_ALL_UNREAD_NOTIFS = True +# XXX: this is currently broken as it includes ones from parted rooms(!) +INCLUDE_ALL_UNREAD_NOTIFS = False class EmailPusher(object): From 70599ce9252997d32d0bf9f26a4e02c99bbe474d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 2 Jun 2016 15:20:15 +0100 Subject: [PATCH 14/16] Allow external processes to mark a user as syncing. (#812) * Add infrastructure to the presence handler to track sync requests in external processes * Expire stale entries for dead external processes * Add an http endpoint for making users as syncing Add some docstrings and comments. * Fixes --- synapse/handlers/presence.py | 119 ++++++++++++++++++++--- synapse/replication/presence_resource.py | 59 +++++++++++ synapse/replication/resource.py | 2 + tests/handlers/test_presence.py | 16 ++- 4 files changed, 174 insertions(+), 22 deletions(-) create mode 100644 synapse/replication/presence_resource.py diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 37f57301fb..fc8538b41e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -68,6 +68,10 @@ FEDERATION_TIMEOUT = 30 * 60 * 1000 # How often to resend presence to remote servers FEDERATION_PING_INTERVAL = 25 * 60 * 1000 +# How long we will wait before assuming that the syncs from an external process +# are dead. +EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000 + assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER @@ -158,10 +162,21 @@ class PresenceHandler(object): self.serial_to_user = {} self._next_serial = 1 - # Keeps track of the number of *ongoing* syncs. While this is non zero - # a user will never go offline. + # Keeps track of the number of *ongoing* syncs on this process. While + # this is non zero a user will never go offline. self.user_to_num_current_syncs = {} + # Keeps track of the number of *ongoing* syncs on other processes. + # While any sync is ongoing on another process the user will never + # go offline. + # Each process has a unique identifier and an update frequency. If + # no update is received from that process within the update period then + # we assume that all the sync requests on that process have stopped. + # Stored as a dict from process_id to set of user_id, and a dict of + # process_id to millisecond timestamp last updated. + self.external_process_to_current_syncs = {} + self.external_process_last_updated_ms = {} + # Start a LoopingCall in 30s that fires every 5s. # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. @@ -272,13 +287,26 @@ class PresenceHandler(object): # Fetch the list of users that *may* have timed out. Things may have # changed since the timeout was set, so we won't necessarily have to # take any action. - users_to_check = self.wheel_timer.fetch(now) + users_to_check = set(self.wheel_timer.fetch(now)) + + # Check whether the lists of syncing processes from an external + # process have expired. + expired_process_ids = [ + process_id for process_id, last_update + in self.external_process_last_update.items() + if now - last_update > EXTERNAL_PROCESS_EXPIRY + ] + for process_id in expired_process_ids: + users_to_check.update( + self.external_process_to_current_syncs.pop(process_id, ()) + ) + self.external_process_last_update.pop(process_id) states = [ self.user_to_current_state.get( user_id, UserPresenceState.default(user_id) ) - for user_id in set(users_to_check) + for user_id in users_to_check ] timers_fired_counter.inc_by(len(states)) @@ -286,7 +314,7 @@ class PresenceHandler(object): changes = handle_timeouts( states, is_mine_fn=self.is_mine_id, - user_to_num_current_syncs=self.user_to_num_current_syncs, + syncing_users=self.get_syncing_users(), now=now, ) @@ -363,6 +391,73 @@ class PresenceHandler(object): defer.returnValue(_user_syncing()) + def get_currently_syncing_users(self): + """Get the set of user ids that are currently syncing on this HS. + Returns: + set(str): A set of user_id strings. + """ + syncing_user_ids = { + user_id for user_id, count in self.user_to_num_current_syncs.items() + if count + } + syncing_user_ids.update(self.external_process_to_current_syncs.values()) + return syncing_user_ids + + @defer.inlineCallbacks + def update_external_syncs(self, process_id, syncing_user_ids): + """Update the syncing users for an external process + + Args: + process_id(str): An identifier for the process the users are + syncing against. This allows synapse to process updates + as user start and stop syncing against a given process. + syncing_user_ids(set(str)): The set of user_ids that are + currently syncing on that server. + """ + + # Grab the previous list of user_ids that were syncing on that process + prev_syncing_user_ids = ( + self.external_process_to_current_syncs.get(process_id, set()) + ) + # Grab the current presence state for both the users that are syncing + # now and the users that were syncing before this update. + prev_states = yield self.current_state_for_users( + syncing_user_ids | prev_syncing_user_ids + ) + updates = [] + time_now_ms = self.clock.time_msec() + + # For each new user that is syncing check if we need to mark them as + # being online. + for new_user_id in syncing_user_ids - prev_syncing_user_ids: + prev_state = prev_states[new_user_id] + if prev_state.state == PresenceState.OFFLINE: + updates.append(prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=time_now_ms, + last_user_sync_ts=time_now_ms, + )) + else: + updates.append(prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + )) + + # For each user that is still syncing or stopped syncing update the + # last sync time so that we will correctly apply the grace period when + # they stop syncing. + for old_user_id in prev_syncing_user_ids: + prev_state = prev_states[old_user_id] + updates.append(prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + )) + + yield self._update_states(updates) + + # Update the last updated time for the process. We expire the entries + # if we don't receive an update in the given timeframe. + self.external_process_last_updated_ms[process_id] = self.clock.time_msec() + self.external_process_to_current_syncs[process_id] = syncing_user_ids + @defer.inlineCallbacks def current_state_for_user(self, user_id): """Get the current presence state for a user. @@ -935,15 +1030,14 @@ class PresenceEventSource(object): return self.get_new_events(user, from_key=None, include_offline=False) -def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now): +def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now): """Checks the presence of users that have timed out and updates as appropriate. Args: user_states(list): List of UserPresenceState's to check. is_mine_fn (fn): Function that returns if a user_id is ours - user_to_num_current_syncs (dict): Mapping of user_id to number of currently - active syncs. + syncing_user_ids (set): Set of user_ids with active syncs. now (int): Current time in ms. Returns: @@ -954,21 +1048,20 @@ def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now): for state in user_states: is_mine = is_mine_fn(state.user_id) - new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now) + new_state = handle_timeout(state, is_mine, syncing_user_ids, now) if new_state: changes[state.user_id] = new_state return changes.values() -def handle_timeout(state, is_mine, user_to_num_current_syncs, now): +def handle_timeout(state, is_mine, syncing_user_ids, now): """Checks the presence of the user to see if any of the timers have elapsed Args: state (UserPresenceState) is_mine (bool): Whether the user is ours - user_to_num_current_syncs (dict): Mapping of user_id to number of currently - active syncs. + syncing_user_ids (set): Set of user_ids with active syncs. now (int): Current time in ms. Returns: @@ -1002,7 +1095,7 @@ def handle_timeout(state, is_mine, user_to_num_current_syncs, now): # If there are have been no sync for a while (and none ongoing), # set presence to offline - if not user_to_num_current_syncs.get(user_id, 0): + if user_id not in syncing_user_ids: if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT: state = state.copy_and_replace( state=PresenceState.OFFLINE, diff --git a/synapse/replication/presence_resource.py b/synapse/replication/presence_resource.py new file mode 100644 index 0000000000..fc18130ab4 --- /dev/null +++ b/synapse/replication/presence_resource.py @@ -0,0 +1,59 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.http.server import respond_with_json_bytes, request_handler +from synapse.http.servlet import parse_json_object_from_request + +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET +from twisted.internet import defer + + +class PresenceResource(Resource): + """ + HTTP endpoint for marking users as syncing. + + POST /_synapse/replication/presence HTTP/1.1 + Content-Type: application/json + + { + "process_id": "", + "syncing_users": [""] + } + """ + + def __init__(self, hs): + Resource.__init__(self) # Resource is old-style, so no super() + + self.version_string = hs.version_string + self.clock = hs.get_clock() + self.presence_handler = hs.get_presence_handler() + + def render_POST(self, request): + self._async_render_POST(request) + return NOT_DONE_YET + + @request_handler() + @defer.inlineCallbacks + def _async_render_POST(self, request): + content = parse_json_object_from_request(request) + + process_id = content["process_id"] + syncing_user_ids = content["syncing_users"] + + yield self.presence_handler.update_external_syncs( + process_id, set(syncing_user_ids) + ) + + respond_with_json_bytes(request, 200, "{}") diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 847f212a3d..8c2d487ff4 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -16,6 +16,7 @@ from synapse.http.servlet import parse_integer, parse_string from synapse.http.server import request_handler, finish_request from synapse.replication.pusher_resource import PusherResource +from synapse.replication.presence_resource import PresenceResource from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET @@ -115,6 +116,7 @@ class ReplicationResource(Resource): self.clock = hs.get_clock() self.putChild("remove_pushers", PusherResource(hs)) + self.putChild("syncing_users", PresenceResource(hs)) def render_GET(self, request): self._async_render_GET(request) diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 87c795fcfa..b531ba8540 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -264,7 +264,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, user_to_num_current_syncs={}, now=now + state, is_mine=True, syncing_user_ids=set(), now=now ) self.assertIsNotNone(new_state) @@ -282,7 +282,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, user_to_num_current_syncs={}, now=now + state, is_mine=True, syncing_user_ids=set(), now=now ) self.assertIsNotNone(new_state) @@ -300,9 +300,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, user_to_num_current_syncs={ - user_id: 1, - }, now=now + state, is_mine=True, syncing_user_ids=set([user_id]), now=now ) self.assertIsNotNone(new_state) @@ -321,7 +319,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, user_to_num_current_syncs={}, now=now + state, is_mine=True, syncing_user_ids=set(), now=now ) self.assertIsNotNone(new_state) @@ -340,7 +338,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, user_to_num_current_syncs={}, now=now + state, is_mine=True, syncing_user_ids=set(), now=now ) self.assertIsNone(new_state) @@ -358,7 +356,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=False, user_to_num_current_syncs={}, now=now + state, is_mine=False, syncing_user_ids=set(), now=now ) self.assertIsNotNone(new_state) @@ -377,7 +375,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, user_to_num_current_syncs={}, now=now + state, is_mine=True, syncing_user_ids=set(), now=now ) self.assertIsNotNone(new_state) From 661a540dd1de89a3ab3a8f6ca0f780ea7d264176 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 2 Jun 2016 15:20:28 +0100 Subject: [PATCH 15/16] Deduplicate presence entries in sync (#818) --- synapse/handlers/sync.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 3b89582d79..5307b62b85 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -637,6 +637,9 @@ class SyncHandler(object): ) presence.extend(states) + # Deduplicate the presence entries so that there's at most one per user + presence = {p["content"]["user_id"]: p for p in presence}.values() + presence = sync_config.filter_collection.filter_presence( presence ) From 80f34d7b574e1a6b8bc922df41bd53b59260fcf2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 2 Jun 2016 15:23:09 +0100 Subject: [PATCH 16/16] Fix setting the _clock in SQLBaseStore --- synapse/storage/_base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 56a0dd80f3..32c6677d47 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -152,6 +152,7 @@ class SQLBaseStore(object): def __init__(self, hs): self.hs = hs + self._clock = hs.get_clock() self._db_pool = hs.get_db_pool() self._previous_txn_total_time = 0