Merge pull request #147 from matrix-org/presence-performance
Improvement to performance of presence event stream handlingpull/151/head
						commit
						c167cbc9fd
					
				|  | @ -26,6 +26,7 @@ import synapse.metrics | |||
| from ._base import BaseHandler | ||||
| 
 | ||||
| import logging | ||||
| from collections import OrderedDict | ||||
| 
 | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
|  | @ -143,7 +144,7 @@ class PresenceHandler(BaseHandler): | |||
|         self._remote_offline_serials = [] | ||||
| 
 | ||||
|         # map any user to a UserPresenceCache | ||||
|         self._user_cachemap = {} | ||||
|         self._user_cachemap = OrderedDict()  # keep them sorted by serial | ||||
|         self._user_cachemap_latest_serial = 0 | ||||
| 
 | ||||
|         metrics.register_callback( | ||||
|  | @ -165,6 +166,14 @@ class PresenceHandler(BaseHandler): | |||
|         else: | ||||
|             return UserPresenceCache() | ||||
| 
 | ||||
|     def _bump_serial(self, user=None): | ||||
|         self._user_cachemap_latest_serial += 1 | ||||
| 
 | ||||
|         if user: | ||||
|             # Move to end | ||||
|             cache = self._user_cachemap.pop(user) | ||||
|             self._user_cachemap[user] = cache | ||||
| 
 | ||||
|     def registered_user(self, user): | ||||
|         return self.store.create_presence(user.localpart) | ||||
| 
 | ||||
|  | @ -300,7 +309,7 @@ class PresenceHandler(BaseHandler): | |||
|     def changed_presencelike_data(self, user, state): | ||||
|         statuscache = self._get_or_make_usercache(user) | ||||
| 
 | ||||
|         self._user_cachemap_latest_serial += 1 | ||||
|         self._bump_serial(user=user) | ||||
|         statuscache.update(state, serial=self._user_cachemap_latest_serial) | ||||
| 
 | ||||
|         return self.push_presence(user, statuscache=statuscache) | ||||
|  | @ -322,7 +331,7 @@ class PresenceHandler(BaseHandler): | |||
| 
 | ||||
|             # No actual update but we need to bump the serial anyway for the | ||||
|             # event source | ||||
|             self._user_cachemap_latest_serial += 1 | ||||
|             self._bump_serial() | ||||
|             statuscache.update({}, serial=self._user_cachemap_latest_serial) | ||||
| 
 | ||||
|             self.push_update_to_local_and_remote( | ||||
|  | @ -704,7 +713,7 @@ class PresenceHandler(BaseHandler): | |||
| 
 | ||||
|             statuscache = self._get_or_make_usercache(user) | ||||
| 
 | ||||
|             self._user_cachemap_latest_serial += 1 | ||||
|             self._bump_serial(user=user) | ||||
|             statuscache.update(state, serial=self._user_cachemap_latest_serial) | ||||
| 
 | ||||
|             if not observers and not room_ids: | ||||
|  | @ -864,10 +873,15 @@ class PresenceEventSource(object): | |||
| 
 | ||||
|         updates = [] | ||||
|         # TODO(paul): use a DeferredList ? How to limit concurrency. | ||||
|         for observed_user in cachemap.keys(): | ||||
|         for observed_user in reversed(cachemap.keys()): | ||||
|             cached = cachemap[observed_user] | ||||
| 
 | ||||
|             if cached.serial <= from_key or cached.serial > max_serial: | ||||
|             # Since this is ordered in descending order of serial, we can just | ||||
|             # stop once we've seen enough | ||||
|             if cached.serial <= from_key: | ||||
|                 break | ||||
| 
 | ||||
|             if cached.serial > max_serial: | ||||
|                 continue | ||||
| 
 | ||||
|             if not (yield self.is_visible(observer_user, observed_user)): | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Mark Haines
						Mark Haines