Fix 404 on `/sync` when the last event is a redaction of an unknown/purged event (#12905)
Currently, we try to pull the event corresponding to a sync token from the database. However, when we fetch redaction events, we check the target of that redaction (because we aren't allowed to send redactions to clients without validating them). So, if the sync token points to a redaction of an event that we don't have, we have a problem. It turns out we don't really need that event, and can just work with its ID and metadata, which sidesteps the whole problem.pull/12936/head
							parent
							
								
									5949ab86f8
								
							
						
					
					
						commit
						79dadf7216
					
				|  | @ -0,0 +1 @@ | |||
| Fix a bug introduced in Synapse 1.58.0 where `/sync` would fail if the most recent event in a room was a redaction of an event that has since been purged. | ||||
|  | @ -28,6 +28,7 @@ from synapse.api.constants import ( | |||
|     EventContentFields, | ||||
|     EventTypes, | ||||
|     GuestAccess, | ||||
|     HistoryVisibility, | ||||
|     Membership, | ||||
|     RelationTypes, | ||||
|     UserTypes, | ||||
|  | @ -66,7 +67,7 @@ from synapse.util import json_decoder, json_encoder, log_failure, unwrapFirstErr | |||
| from synapse.util.async_helpers import Linearizer, gather_results | ||||
| from synapse.util.caches.expiringcache import ExpiringCache | ||||
| from synapse.util.metrics import measure_func | ||||
| from synapse.visibility import filter_events_for_client | ||||
| from synapse.visibility import get_effective_room_visibility_from_state | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from synapse.events.third_party_rules import ThirdPartyEventRules | ||||
|  | @ -182,51 +183,31 @@ class MessageHandler: | |||
|         state_filter = state_filter or StateFilter.all() | ||||
| 
 | ||||
|         if at_token: | ||||
|             last_event = await self.store.get_last_event_in_room_before_stream_ordering( | ||||
|                 room_id, | ||||
|                 end_token=at_token.room_key, | ||||
|             last_event_id = ( | ||||
|                 await self.store.get_last_event_in_room_before_stream_ordering( | ||||
|                     room_id, | ||||
|                     end_token=at_token.room_key, | ||||
|                 ) | ||||
|             ) | ||||
| 
 | ||||
|             if not last_event: | ||||
|             if not last_event_id: | ||||
|                 raise NotFoundError("Can't find event for token %s" % (at_token,)) | ||||
| 
 | ||||
|             # check whether the user is in the room at that time to determine | ||||
|             # whether they should be treated as peeking. | ||||
|             state_map = await self._state_storage_controller.get_state_for_event( | ||||
|                 last_event.event_id, | ||||
|                 StateFilter.from_types([(EventTypes.Member, user_id)]), | ||||
|             ) | ||||
| 
 | ||||
|             joined = False | ||||
|             membership_event = state_map.get((EventTypes.Member, user_id)) | ||||
|             if membership_event: | ||||
|                 joined = membership_event.membership == Membership.JOIN | ||||
| 
 | ||||
|             is_peeking = not joined | ||||
| 
 | ||||
|             visible_events = await filter_events_for_client( | ||||
|                 self._storage_controllers, | ||||
|                 user_id, | ||||
|                 [last_event], | ||||
|                 filter_send_to_client=False, | ||||
|                 is_peeking=is_peeking, | ||||
|             ) | ||||
| 
 | ||||
|             if visible_events: | ||||
|                 room_state_events = ( | ||||
|                     await self._state_storage_controller.get_state_for_events( | ||||
|                         [last_event.event_id], state_filter=state_filter | ||||
|                     ) | ||||
|                 ) | ||||
|                 room_state: Mapping[Any, EventBase] = room_state_events[ | ||||
|                     last_event.event_id | ||||
|                 ] | ||||
|             else: | ||||
|             if not await self._user_can_see_state_at_event( | ||||
|                 user_id, room_id, last_event_id | ||||
|             ): | ||||
|                 raise AuthError( | ||||
|                     403, | ||||
|                     "User %s not allowed to view events in room %s at token %s" | ||||
|                     % (user_id, room_id, at_token), | ||||
|                 ) | ||||
| 
 | ||||
|             room_state_events = ( | ||||
|                 await self._state_storage_controller.get_state_for_events( | ||||
|                     [last_event_id], state_filter=state_filter | ||||
|                 ) | ||||
|             ) | ||||
|             room_state: Mapping[Any, EventBase] = room_state_events[last_event_id] | ||||
|         else: | ||||
|             ( | ||||
|                 membership, | ||||
|  | @ -256,6 +237,65 @@ class MessageHandler: | |||
|         events = self._event_serializer.serialize_events(room_state.values(), now) | ||||
|         return events | ||||
| 
 | ||||
|     async def _user_can_see_state_at_event( | ||||
|         self, user_id: str, room_id: str, event_id: str | ||||
|     ) -> bool: | ||||
|         # check whether the user was in the room, and the history visibility, | ||||
|         # at that time. | ||||
|         state_map = await self._state_storage_controller.get_state_for_event( | ||||
|             event_id, | ||||
|             StateFilter.from_types( | ||||
|                 [ | ||||
|                     (EventTypes.Member, user_id), | ||||
|                     (EventTypes.RoomHistoryVisibility, ""), | ||||
|                 ] | ||||
|             ), | ||||
|         ) | ||||
| 
 | ||||
|         membership = None | ||||
|         membership_event = state_map.get((EventTypes.Member, user_id)) | ||||
|         if membership_event: | ||||
|             membership = membership_event.membership | ||||
| 
 | ||||
|         # if the user was a member of the room at the time of the event, | ||||
|         # they can see it. | ||||
|         if membership == Membership.JOIN: | ||||
|             return True | ||||
| 
 | ||||
|         # otherwise, it depends on the history visibility. | ||||
|         visibility = get_effective_room_visibility_from_state(state_map) | ||||
| 
 | ||||
|         if visibility == HistoryVisibility.JOINED: | ||||
|             # we weren't a member at the time of the event, so we can't see this event. | ||||
|             return False | ||||
| 
 | ||||
|         # otherwise *invited* is good enough | ||||
|         if membership == Membership.INVITE: | ||||
|             return True | ||||
| 
 | ||||
|         if visibility == HistoryVisibility.INVITED: | ||||
|             # we weren't invited, so we can't see this event. | ||||
|             return False | ||||
| 
 | ||||
|         if visibility == HistoryVisibility.WORLD_READABLE: | ||||
|             return True | ||||
| 
 | ||||
|         # So it's SHARED, and the user was not a member at the time. The user cannot | ||||
|         # see history, unless they have *subsequently* joined the room. | ||||
|         # | ||||
|         # XXX: if the user has subsequently joined and then left again, | ||||
|         # ideally we would share history up to the point they left. But | ||||
|         # we don't know when they left. We just treat it as though they | ||||
|         # never joined, and restrict access. | ||||
| 
 | ||||
|         ( | ||||
|             current_membership, | ||||
|             _, | ||||
|         ) = await self.store.get_local_current_membership_for_user_in_room( | ||||
|             user_id, event_id | ||||
|         ) | ||||
|         return current_membership == Membership.JOIN | ||||
| 
 | ||||
|     async def get_joined_members(self, requester: Requester, room_id: str) -> dict: | ||||
|         """Get all the joined members in the room and their profile information. | ||||
| 
 | ||||
|  |  | |||
|  | @ -621,21 +621,32 @@ class SyncHandler: | |||
|         ) | ||||
| 
 | ||||
|     async def get_state_after_event( | ||||
|         self, event: EventBase, state_filter: Optional[StateFilter] = None | ||||
|         self, event_id: str, state_filter: Optional[StateFilter] = None | ||||
|     ) -> StateMap[str]: | ||||
|         """ | ||||
|         Get the room state after the given event | ||||
| 
 | ||||
|         Args: | ||||
|             event: event of interest | ||||
|             event_id: event of interest | ||||
|             state_filter: The state filter used to fetch state from the database. | ||||
|         """ | ||||
|         state_ids = await self._state_storage_controller.get_state_ids_for_event( | ||||
|             event.event_id, state_filter=state_filter or StateFilter.all() | ||||
|             event_id, state_filter=state_filter or StateFilter.all() | ||||
|         ) | ||||
|         if event.is_state(): | ||||
| 
 | ||||
|         # using get_metadata_for_events here (instead of get_event) sidesteps an issue | ||||
|         # with redactions: if `event_id` is a redaction event, and we don't have the | ||||
|         # original (possibly because it got purged), get_event will refuse to return | ||||
|         # the redaction event, which isn't terribly helpful here. | ||||
|         # | ||||
|         # (To be fair, in that case we could assume it's *not* a state event, and | ||||
|         # therefore we don't need to worry about it. But still, it seems cleaner just | ||||
|         # to pull the metadata.) | ||||
|         m = (await self.store.get_metadata_for_events([event_id]))[event_id] | ||||
|         if m.state_key is not None and m.rejection_reason is None: | ||||
|             state_ids = dict(state_ids) | ||||
|             state_ids[(event.type, event.state_key)] = event.event_id | ||||
|             state_ids[(m.event_type, m.state_key)] = event_id | ||||
| 
 | ||||
|         return state_ids | ||||
| 
 | ||||
|     async def get_state_at( | ||||
|  | @ -654,14 +665,14 @@ class SyncHandler: | |||
|         # FIXME: This gets the state at the latest event before the stream ordering, | ||||
|         # which might not be the same as the "current state" of the room at the time | ||||
|         # of the stream token if there were multiple forward extremities at the time. | ||||
|         last_event = await self.store.get_last_event_in_room_before_stream_ordering( | ||||
|         last_event_id = await self.store.get_last_event_in_room_before_stream_ordering( | ||||
|             room_id, | ||||
|             end_token=stream_position.room_key, | ||||
|         ) | ||||
| 
 | ||||
|         if last_event: | ||||
|         if last_event_id: | ||||
|             state = await self.get_state_after_event( | ||||
|                 last_event, state_filter=state_filter or StateFilter.all() | ||||
|                 last_event_id, state_filter=state_filter or StateFilter.all() | ||||
|             ) | ||||
| 
 | ||||
|         else: | ||||
|  |  | |||
|  | @ -54,6 +54,7 @@ class EventMetadata: | |||
|     room_id: str | ||||
|     event_type: str | ||||
|     state_key: Optional[str] | ||||
|     rejection_reason: Optional[str] | ||||
| 
 | ||||
| 
 | ||||
| def _retrieve_and_check_room_version(room_id: str, room_version_id: str) -> RoomVersion: | ||||
|  | @ -167,17 +168,22 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): | |||
|             ) | ||||
| 
 | ||||
|             sql = f""" | ||||
|                 SELECT e.event_id, e.room_id, e.type, se.state_key FROM events AS e | ||||
|                 SELECT e.event_id, e.room_id, e.type, se.state_key, r.reason | ||||
|                 FROM events AS e | ||||
|                 LEFT JOIN state_events se USING (event_id) | ||||
|                 LEFT JOIN rejections r USING (event_id) | ||||
|                 WHERE {clause} | ||||
|             """ | ||||
| 
 | ||||
|             txn.execute(sql, args) | ||||
|             return { | ||||
|                 event_id: EventMetadata( | ||||
|                     room_id=room_id, event_type=event_type, state_key=state_key | ||||
|                     room_id=room_id, | ||||
|                     event_type=event_type, | ||||
|                     state_key=state_key, | ||||
|                     rejection_reason=rejection_reason, | ||||
|                 ) | ||||
|                 for event_id, room_id, event_type, state_key in txn | ||||
|                 for event_id, room_id, event_type, state_key, rejection_reason in txn | ||||
|             } | ||||
| 
 | ||||
|         result_map: Dict[str, EventMetadata] = {} | ||||
|  |  | |||
|  | @ -765,15 +765,16 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): | |||
|         self, | ||||
|         room_id: str, | ||||
|         end_token: RoomStreamToken, | ||||
|     ) -> Optional[EventBase]: | ||||
|         """Returns the last event in a room at or before a stream ordering | ||||
|     ) -> Optional[str]: | ||||
|         """Returns the ID of the last event in a room at or before a stream ordering | ||||
| 
 | ||||
|         Args: | ||||
|             room_id | ||||
|             end_token: The token used to stream from | ||||
| 
 | ||||
|         Returns: | ||||
|             The most recent event. | ||||
|             The ID of the most recent event, or None if there are no events in the room | ||||
|             before this stream ordering. | ||||
|         """ | ||||
| 
 | ||||
|         last_row = await self.get_room_event_before_stream_ordering( | ||||
|  | @ -781,10 +782,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): | |||
|             stream_ordering=end_token.stream, | ||||
|         ) | ||||
|         if last_row: | ||||
|             _, _, event_id = last_row | ||||
|             event = await self.get_event(event_id, get_prev_content=True) | ||||
|             return event | ||||
| 
 | ||||
|             return last_row[2] | ||||
|         return None | ||||
| 
 | ||||
|     async def get_current_room_stream_token_for_room_id( | ||||
|  |  | |||
|  | @ -162,16 +162,7 @@ async def filter_events_for_client( | |||
|         state = event_id_to_state[event.event_id] | ||||
| 
 | ||||
|         # get the room_visibility at the time of the event. | ||||
|         visibility_event = state.get(_HISTORY_VIS_KEY, None) | ||||
|         if visibility_event: | ||||
|             visibility = visibility_event.content.get( | ||||
|                 "history_visibility", HistoryVisibility.SHARED | ||||
|             ) | ||||
|         else: | ||||
|             visibility = HistoryVisibility.SHARED | ||||
| 
 | ||||
|         if visibility not in VISIBILITY_PRIORITY: | ||||
|             visibility = HistoryVisibility.SHARED | ||||
|         visibility = get_effective_room_visibility_from_state(state) | ||||
| 
 | ||||
|         # Always allow history visibility events on boundaries. This is done | ||||
|         # by setting the effective visibility to the least restrictive | ||||
|  | @ -267,6 +258,23 @@ async def filter_events_for_client( | |||
|     return [ev for ev in filtered_events if ev] | ||||
| 
 | ||||
| 
 | ||||
| def get_effective_room_visibility_from_state(state: StateMap[EventBase]) -> str: | ||||
|     """Get the actual history vis, from a state map including the history_visibility event | ||||
| 
 | ||||
|     Handles missing and invalid history visibility events. | ||||
|     """ | ||||
|     visibility_event = state.get(_HISTORY_VIS_KEY, None) | ||||
|     if not visibility_event: | ||||
|         return HistoryVisibility.SHARED | ||||
| 
 | ||||
|     visibility = visibility_event.content.get( | ||||
|         "history_visibility", HistoryVisibility.SHARED | ||||
|     ) | ||||
|     if visibility not in VISIBILITY_PRIORITY: | ||||
|         visibility = HistoryVisibility.SHARED | ||||
|     return visibility | ||||
| 
 | ||||
| 
 | ||||
| async def filter_events_for_server( | ||||
|     storage: StorageControllers, | ||||
|     server_name: str, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Richard van der Hoff
						Richard van der Hoff