diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 0c3290b307..d129d4ca8a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -308,6 +308,11 @@ class PresenceHandler(BaseHandler): rm_handler = self.homeserver.get_handlers().room_member_handler return rm_handler.get_joined_rooms_for_user(user) + def get_joined_users_for_room_id(self, room_id): + rm_handler = self.homeserver.get_handlers().room_member_handler + return rm_handler.get_room_members(room_id) + + @defer.inlineCallbacks def changed_presencelike_data(self, user, state): """Updates the presence state of a local user. @@ -317,12 +322,9 @@ class PresenceHandler(BaseHandler): Returns: A Deferred """ - statuscache = self._get_or_make_usercache(user) - self._user_cachemap_latest_serial += 1 - statuscache.update(state, serial=self._user_cachemap_latest_serial) - - return self.push_presence(user, statuscache=statuscache) + statuscache = yield self.update_presence_cache(user, state) + yield self.push_presence(user, statuscache=statuscache) @log_function def started_user_eventstream(self, user): @@ -345,13 +347,12 @@ class PresenceHandler(BaseHandler): room_id(str): The room id the user joined. """ if self.hs.is_mine(user): - statuscache = self._get_or_make_usercache(user) - # No actual update but we need to bump the serial anyway for the # event source self._user_cachemap_latest_serial += 1 - statuscache.update({}, serial=self._user_cachemap_latest_serial) - + statuscache = yield self.update_presence_cache( + user, room_ids=[room_id] + ) self.push_update_to_local_and_remote( observed_user=user, room_ids=[room_id], @@ -359,16 +360,17 @@ class PresenceHandler(BaseHandler): ) # We also want to tell them about current presence of people. - rm_handler = self.homeserver.get_handlers().room_member_handler - curr_users = yield rm_handler.get_room_members(room_id) + curr_users = yield self.get_joined_users_for_room_id(room_id) for local_user in [c for c in curr_users if self.hs.is_mine(c)]: - statuscache = self._get_or_offline_usercache(local_user) - statuscache.update({}, serial=self._user_cachemap_latest_serial) + statuscache = yield self.update_presence_cache( + local_user, room_ids=[room_id], add_to_cache=False + ) + self.push_update_to_local_and_remote( observed_user=local_user, users_to_push=[user], - statuscache=self._get_or_offline_usercache(local_user), + statuscache=statuscache, ) @defer.inlineCallbacks @@ -829,10 +831,8 @@ class PresenceHandler(BaseHandler): self.clock.time_msec() - state.pop("last_active_ago") ) - statuscache = self._get_or_make_usercache(user) - self._user_cachemap_latest_serial += 1 - statuscache.update(state, serial=self._user_cachemap_latest_serial) + yield self.update_presence_cache(user, state, room_ids=room_ids) if not observers and not room_ids: logger.debug(" | no interested observers or room IDs") @@ -890,6 +890,32 @@ class PresenceHandler(BaseHandler): yield defer.DeferredList(deferreds, consumeErrors=True) + @defer.inlineCallbacks + def update_presence_cache(self, user, state={}, room_ids=None, + add_to_cache=True): + """Update the presence cache for a user with a new state and bump the + serial to the latest value. + + Args: + user(UserID): The user being updated + state(dict): The presence state being updated + room_ids(None or list of str): A list of room_ids to update. If + room_ids is None then fetch the list of room_ids the user is + joined to. + add_to_cache: Whether to add an entry to the presence cache if the + user isn't already in the cache. + Returns: + A Deferred UserPresenceCache for the user being updated. + """ + if room_ids is None: + room_ids = yield self.get_joined_rooms_for_user(user) + if add_to_cache: + statuscache = self._get_or_make_usercache(user) + else: + statuscache = self._get_or_offline_usercache(user) + statuscache.update(state, serial=self._user_cachemap_latest_serial) + defer.returnValue(statuscache) + @defer.inlineCallbacks def push_update_to_local_and_remote(self, observed_user, statuscache, users_to_push=[], room_ids=[],