Merge pull request #2022 from matrix-org/erikj/no_op_sync
Implement no op for room stream in syncpull/2024/head
						commit
						248eb4638d
					
				|  | @ -399,8 +399,7 @@ class SynchrotronServer(HomeServer): | |||
|                 position = row[position_index] | ||||
|                 user_id = row[user_index] | ||||
| 
 | ||||
|                 rooms = yield store.get_rooms_for_user(user_id) | ||||
|                 room_ids = [r.room_id for r in rooms] | ||||
|                 room_ids = yield store.get_rooms_for_user(user_id) | ||||
| 
 | ||||
|                 notifier.on_new_event( | ||||
|                     "device_list_key", position, rooms=room_ids, | ||||
|  |  | |||
|  | @ -248,8 +248,7 @@ class DeviceHandler(BaseHandler): | |||
|             user_id, device_ids, list(hosts) | ||||
|         ) | ||||
| 
 | ||||
|         rooms = yield self.store.get_rooms_for_user(user_id) | ||||
|         room_ids = [r.room_id for r in rooms] | ||||
|         room_ids = yield self.store.get_rooms_for_user(user_id) | ||||
| 
 | ||||
|         yield self.notifier.on_new_event( | ||||
|             "device_list_key", position, rooms=room_ids, | ||||
|  | @ -270,8 +269,7 @@ class DeviceHandler(BaseHandler): | |||
|             user_id (str) | ||||
|             from_token (StreamToken) | ||||
|         """ | ||||
|         rooms = yield self.store.get_rooms_for_user(user_id) | ||||
|         room_ids = set(r.room_id for r in rooms) | ||||
|         room_ids = yield self.store.get_rooms_for_user(user_id) | ||||
| 
 | ||||
|         # First we check if any devices have changed | ||||
|         changed = yield self.store.get_user_whose_devices_changed( | ||||
|  | @ -347,8 +345,8 @@ class DeviceHandler(BaseHandler): | |||
|     @defer.inlineCallbacks | ||||
|     def user_left_room(self, user, room_id): | ||||
|         user_id = user.to_string() | ||||
|         rooms = yield self.store.get_rooms_for_user(user_id) | ||||
|         if not rooms: | ||||
|         room_ids = yield self.store.get_rooms_for_user(user_id) | ||||
|         if not room_ids: | ||||
|             # We no longer share rooms with this user, so we'll no longer | ||||
|             # receive device updates. Mark this in DB. | ||||
|             yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id) | ||||
|  | @ -404,8 +402,8 @@ class DeviceListEduUpdater(object): | |||
|             logger.warning("Got device list update edu for %r from %r", user_id, origin) | ||||
|             return | ||||
| 
 | ||||
|         rooms = yield self.store.get_rooms_for_user(user_id) | ||||
|         if not rooms: | ||||
|         room_ids = yield self.store.get_rooms_for_user(user_id) | ||||
|         if not room_ids: | ||||
|             # We don't share any rooms with this user. Ignore update, as we | ||||
|             # probably won't get any further updates. | ||||
|             return | ||||
|  |  | |||
|  | @ -557,9 +557,9 @@ class PresenceHandler(object): | |||
|         room_ids_to_states = {} | ||||
|         users_to_states = {} | ||||
|         for state in states: | ||||
|             events = yield self.store.get_rooms_for_user(state.user_id) | ||||
|             for e in events: | ||||
|                 room_ids_to_states.setdefault(e.room_id, []).append(state) | ||||
|             room_ids = yield self.store.get_rooms_for_user(state.user_id) | ||||
|             for room_id in room_ids: | ||||
|                 room_ids_to_states.setdefault(room_id, []).append(state) | ||||
| 
 | ||||
|             plist = yield self.store.get_presence_list_observers_accepted(state.user_id) | ||||
|             for u in plist: | ||||
|  | @ -913,11 +913,12 @@ class PresenceHandler(object): | |||
|     def is_visible(self, observed_user, observer_user): | ||||
|         """Returns whether a user can see another user's presence. | ||||
|         """ | ||||
|         observer_rooms = yield self.store.get_rooms_for_user(observer_user.to_string()) | ||||
|         observed_rooms = yield self.store.get_rooms_for_user(observed_user.to_string()) | ||||
| 
 | ||||
|         observer_room_ids = set(r.room_id for r in observer_rooms) | ||||
|         observed_room_ids = set(r.room_id for r in observed_rooms) | ||||
|         observer_room_ids = yield self.store.get_rooms_for_user( | ||||
|             observer_user.to_string() | ||||
|         ) | ||||
|         observed_room_ids = yield self.store.get_rooms_for_user( | ||||
|             observed_user.to_string() | ||||
|         ) | ||||
| 
 | ||||
|         if observer_room_ids & observed_room_ids: | ||||
|             defer.returnValue(True) | ||||
|  |  | |||
|  | @ -156,11 +156,11 @@ class ProfileHandler(BaseHandler): | |||
| 
 | ||||
|         self.ratelimit(requester) | ||||
| 
 | ||||
|         joins = yield self.store.get_rooms_for_user( | ||||
|         room_ids = yield self.store.get_rooms_for_user( | ||||
|             user.to_string(), | ||||
|         ) | ||||
| 
 | ||||
|         for j in joins: | ||||
|         for room_id in room_ids: | ||||
|             handler = self.hs.get_handlers().room_member_handler | ||||
|             try: | ||||
|                 # Assume the user isn't a guest because we don't let guests set | ||||
|  | @ -171,12 +171,12 @@ class ProfileHandler(BaseHandler): | |||
|                 yield handler.update_membership( | ||||
|                     requester, | ||||
|                     user, | ||||
|                     j.room_id, | ||||
|                     room_id, | ||||
|                     "join",  # We treat a profile update like a join. | ||||
|                     ratelimit=False,  # Try to hide that these events aren't atomic. | ||||
|                 ) | ||||
|             except Exception as e: | ||||
|                 logger.warn( | ||||
|                     "Failed to update join event for room %s - %s", | ||||
|                     j.room_id, str(e.message) | ||||
|                     room_id, str(e.message) | ||||
|                 ) | ||||
|  |  | |||
|  | @ -210,10 +210,9 @@ class ReceiptEventSource(object): | |||
|         else: | ||||
|             from_key = None | ||||
| 
 | ||||
|         rooms = yield self.store.get_rooms_for_user(user.to_string()) | ||||
|         rooms = [room.room_id for room in rooms] | ||||
|         room_ids = yield self.store.get_rooms_for_user(user.to_string()) | ||||
|         events = yield self.store.get_linearized_receipts_for_rooms( | ||||
|             rooms, | ||||
|             room_ids, | ||||
|             from_key=from_key, | ||||
|             to_key=to_key, | ||||
|         ) | ||||
|  |  | |||
|  | @ -20,6 +20,7 @@ from synapse.util.metrics import Measure, measure_func | |||
| from synapse.util.caches.response_cache import ResponseCache | ||||
| from synapse.push.clientformat import format_push_rules_for_user | ||||
| from synapse.visibility import filter_events_for_client | ||||
| from synapse.types import RoomStreamToken | ||||
| 
 | ||||
| from twisted.internet import defer | ||||
| 
 | ||||
|  | @ -225,8 +226,7 @@ class SyncHandler(object): | |||
|         with Measure(self.clock, "ephemeral_by_room"): | ||||
|             typing_key = since_token.typing_key if since_token else "0" | ||||
| 
 | ||||
|             rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) | ||||
|             room_ids = [room.room_id for room in rooms] | ||||
|             room_ids = yield self.store.get_rooms_for_user(sync_config.user.to_string()) | ||||
| 
 | ||||
|             typing_source = self.event_sources.sources["typing"] | ||||
|             typing, typing_key = yield typing_source.get_new_events( | ||||
|  | @ -568,16 +568,15 @@ class SyncHandler(object): | |||
|         since_token = sync_result_builder.since_token | ||||
| 
 | ||||
|         if since_token and since_token.device_list_key: | ||||
|             rooms = yield self.store.get_rooms_for_user(user_id) | ||||
|             room_ids = set(r.room_id for r in rooms) | ||||
|             room_ids = yield self.store.get_rooms_for_user(user_id) | ||||
| 
 | ||||
|             user_ids_changed = set() | ||||
|             changed = yield self.store.get_user_whose_devices_changed( | ||||
|                 since_token.device_list_key | ||||
|             ) | ||||
|             for other_user_id in changed: | ||||
|                 other_rooms = yield self.store.get_rooms_for_user(other_user_id) | ||||
|                 if room_ids.intersection(e.room_id for e in other_rooms): | ||||
|                 other_room_ids = yield self.store.get_rooms_for_user(other_user_id) | ||||
|                 if room_ids.intersection(other_room_ids): | ||||
|                     user_ids_changed.add(other_user_id) | ||||
| 
 | ||||
|             defer.returnValue(user_ids_changed) | ||||
|  | @ -765,6 +764,21 @@ class SyncHandler(object): | |||
|             ) | ||||
|             sync_result_builder.now_token = now_token | ||||
| 
 | ||||
|         # We check up front if anything has changed, if it hasn't then there is | ||||
|         # no point in going futher. | ||||
|         since_token = sync_result_builder.since_token | ||||
|         if not sync_result_builder.full_state: | ||||
|             if since_token and not ephemeral_by_room and not account_data_by_room: | ||||
|                 have_changed = yield self._have_rooms_changed(sync_result_builder) | ||||
|                 if not have_changed: | ||||
|                     tags_by_room = yield self.store.get_updated_tags( | ||||
|                         user_id, | ||||
|                         since_token.account_data_key, | ||||
|                     ) | ||||
|                     if not tags_by_room: | ||||
|                         logger.debug("no-oping sync") | ||||
|                         defer.returnValue(([], [])) | ||||
| 
 | ||||
|         ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( | ||||
|             "m.ignored_user_list", user_id=user_id, | ||||
|         ) | ||||
|  | @ -774,13 +788,12 @@ class SyncHandler(object): | |||
|         else: | ||||
|             ignored_users = frozenset() | ||||
| 
 | ||||
|         if sync_result_builder.since_token: | ||||
|         if since_token: | ||||
|             res = yield self._get_rooms_changed(sync_result_builder, ignored_users) | ||||
|             room_entries, invited, newly_joined_rooms = res | ||||
| 
 | ||||
|             tags_by_room = yield self.store.get_updated_tags( | ||||
|                 user_id, | ||||
|                 sync_result_builder.since_token.account_data_key, | ||||
|                 user_id, since_token.account_data_key, | ||||
|             ) | ||||
|         else: | ||||
|             res = yield self._get_all_rooms(sync_result_builder, ignored_users) | ||||
|  | @ -805,7 +818,7 @@ class SyncHandler(object): | |||
| 
 | ||||
|         # Now we want to get any newly joined users | ||||
|         newly_joined_users = set() | ||||
|         if sync_result_builder.since_token: | ||||
|         if since_token: | ||||
|             for joined_sync in sync_result_builder.joined: | ||||
|                 it = itertools.chain( | ||||
|                     joined_sync.timeline.events, joined_sync.state.values() | ||||
|  | @ -817,6 +830,38 @@ class SyncHandler(object): | |||
| 
 | ||||
|         defer.returnValue((newly_joined_rooms, newly_joined_users)) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def _have_rooms_changed(self, sync_result_builder): | ||||
|         """Returns whether there may be any new events that should be sent down | ||||
|         the sync. Returns True if there are. | ||||
|         """ | ||||
|         user_id = sync_result_builder.sync_config.user.to_string() | ||||
|         since_token = sync_result_builder.since_token | ||||
|         now_token = sync_result_builder.now_token | ||||
| 
 | ||||
|         assert since_token | ||||
| 
 | ||||
|         # Get a list of membership change events that have happened. | ||||
|         rooms_changed = yield self.store.get_membership_changes_for_user( | ||||
|             user_id, since_token.room_key, now_token.room_key | ||||
|         ) | ||||
| 
 | ||||
|         if rooms_changed: | ||||
|             defer.returnValue(True) | ||||
| 
 | ||||
|         app_service = self.store.get_app_service_by_user_id(user_id) | ||||
|         if app_service: | ||||
|             rooms = yield self.store.get_app_service_rooms(app_service) | ||||
|             joined_room_ids = set(r.room_id for r in rooms) | ||||
|         else: | ||||
|             joined_room_ids = yield self.store.get_rooms_for_user(user_id) | ||||
| 
 | ||||
|         stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream | ||||
|         for room_id in joined_room_ids: | ||||
|             if self.store.has_room_changed_since(room_id, stream_id): | ||||
|                 defer.returnValue(True) | ||||
|         defer.returnValue(False) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def _get_rooms_changed(self, sync_result_builder, ignored_users): | ||||
|         """Gets the the changes that have happened since the last sync. | ||||
|  | @ -841,8 +886,7 @@ class SyncHandler(object): | |||
|             rooms = yield self.store.get_app_service_rooms(app_service) | ||||
|             joined_room_ids = set(r.room_id for r in rooms) | ||||
|         else: | ||||
|             rooms = yield self.store.get_rooms_for_user(user_id) | ||||
|             joined_room_ids = set(r.room_id for r in rooms) | ||||
|             joined_room_ids = yield self.store.get_rooms_for_user(user_id) | ||||
| 
 | ||||
|         # Get a list of membership change events that have happened. | ||||
|         rooms_changed = yield self.store.get_membership_changes_for_user( | ||||
|  |  | |||
|  | @ -304,8 +304,7 @@ class Notifier(object): | |||
|         if user_stream is None: | ||||
|             current_token = yield self.event_sources.get_current_token() | ||||
|             if room_ids is None: | ||||
|                 rooms = yield self.store.get_rooms_for_user(user_id) | ||||
|                 room_ids = [room.room_id for room in rooms] | ||||
|                 room_ids = yield self.store.get_rooms_for_user(user_id) | ||||
|             user_stream = _NotifierUserStream( | ||||
|                 user_id=user_id, | ||||
|                 rooms=room_ids, | ||||
|  | @ -454,8 +453,7 @@ class Notifier(object): | |||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def _get_room_ids(self, user, explicit_room_id): | ||||
|         joined_rooms = yield self.store.get_rooms_for_user(user.to_string()) | ||||
|         joined_room_ids = map(lambda r: r.room_id, joined_rooms) | ||||
|         joined_room_ids = yield self.store.get_rooms_for_user(user.to_string()) | ||||
|         if explicit_room_id: | ||||
|             if explicit_room_id in joined_room_ids: | ||||
|                 defer.returnValue(([explicit_room_id], True)) | ||||
|  |  | |||
|  | @ -33,13 +33,13 @@ def get_badge_count(store, user_id): | |||
| 
 | ||||
|     badge = len(invites) | ||||
| 
 | ||||
|     for r in joins: | ||||
|         if r.room_id in my_receipts_by_room: | ||||
|             last_unread_event_id = my_receipts_by_room[r.room_id] | ||||
|     for room_id in joins: | ||||
|         if room_id in my_receipts_by_room: | ||||
|             last_unread_event_id = my_receipts_by_room[room_id] | ||||
| 
 | ||||
|             notifs = yield ( | ||||
|                 store.get_unread_event_push_actions_by_room_for_user( | ||||
|                     r.room_id, user_id, last_unread_event_id | ||||
|                     room_id, user_id, last_unread_event_id | ||||
|                 ) | ||||
|             ) | ||||
|             # return one badge count per conversation, as count per | ||||
|  |  | |||
|  | @ -748,8 +748,7 @@ class JoinedRoomsRestServlet(ClientV1RestServlet): | |||
|     def on_GET(self, request): | ||||
|         requester = yield self.auth.get_user_by_req(request, allow_guest=True) | ||||
| 
 | ||||
|         rooms = yield self.store.get_rooms_for_user(requester.user.to_string()) | ||||
|         room_ids = set(r.room_id for r in rooms)  # Ensure they're unique. | ||||
|         room_ids = yield self.store.get_rooms_for_user(requester.user.to_string()) | ||||
|         defer.returnValue((200, {"joined_rooms": list(room_ids)})) | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
|  | @ -274,24 +274,27 @@ class RoomMemberStore(SQLBaseStore): | |||
| 
 | ||||
|         return rows | ||||
| 
 | ||||
|     @cached(max_entries=500000, iterable=True) | ||||
|     @cachedInlineCallbacks(max_entries=500000, iterable=True) | ||||
|     def get_rooms_for_user(self, user_id): | ||||
|         return self.get_rooms_for_user_where_membership_is( | ||||
|         """Returns a set of room_ids the user is currently joined to | ||||
|         """ | ||||
|         rooms = yield self.get_rooms_for_user_where_membership_is( | ||||
|             user_id, membership_list=[Membership.JOIN], | ||||
|         ) | ||||
|         defer.returnValue(frozenset(r.room_id for r in rooms)) | ||||
| 
 | ||||
|     @cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True) | ||||
|     def get_users_who_share_room_with_user(self, user_id, cache_context): | ||||
|         """Returns the set of users who share a room with `user_id` | ||||
|         """ | ||||
|         rooms = yield self.get_rooms_for_user( | ||||
|         room_ids = yield self.get_rooms_for_user( | ||||
|             user_id, on_invalidate=cache_context.invalidate, | ||||
|         ) | ||||
| 
 | ||||
|         user_who_share_room = set() | ||||
|         for room in rooms: | ||||
|         for room_id in room_ids: | ||||
|             user_ids = yield self.get_users_in_room( | ||||
|                 room.room_id, on_invalidate=cache_context.invalidate, | ||||
|                 room_id, on_invalidate=cache_context.invalidate, | ||||
|             ) | ||||
|             user_who_share_room.update(user_ids) | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Erik Johnston
						Erik Johnston