Faster room joins: Try other destinations when resyncing the state of a partial-state room (#12812)
Signed-off-by: Sean Quah <seanq@matrix.org>pull/12895/head
							parent
							
								
									3594f6c1f3
								
							
						
					
					
						commit
						2fba1076c5
					
				|  | @ -0,0 +1 @@ | |||
| Try other homeservers when re-syncing state for rooms with partial state. | ||||
|  | @ -405,6 +405,9 @@ class FederationClient(FederationBase): | |||
| 
 | ||||
|         Returns: | ||||
|             a tuple of (state event_ids, auth event_ids) | ||||
| 
 | ||||
|         Raises: | ||||
|             InvalidResponseError: if fields in the response have the wrong type. | ||||
|         """ | ||||
|         result = await self.transport_layer.get_room_state_ids( | ||||
|             destination, room_id, event_id=event_id | ||||
|  | @ -416,7 +419,7 @@ class FederationClient(FederationBase): | |||
|         if not isinstance(state_event_ids, list) or not isinstance( | ||||
|             auth_event_ids, list | ||||
|         ): | ||||
|             raise Exception("invalid response from /state_ids") | ||||
|             raise InvalidResponseError("invalid response from /state_ids") | ||||
| 
 | ||||
|         return state_event_ids, auth_event_ids | ||||
| 
 | ||||
|  |  | |||
|  | @ -20,7 +20,16 @@ import itertools | |||
| import logging | ||||
| from enum import Enum | ||||
| from http import HTTPStatus | ||||
| from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union | ||||
| from typing import ( | ||||
|     TYPE_CHECKING, | ||||
|     Collection, | ||||
|     Dict, | ||||
|     Iterable, | ||||
|     List, | ||||
|     Optional, | ||||
|     Tuple, | ||||
|     Union, | ||||
| ) | ||||
| 
 | ||||
| import attr | ||||
| from signedjson.key import decode_verify_key_bytes | ||||
|  | @ -34,6 +43,7 @@ from synapse.api.errors import ( | |||
|     CodeMessageException, | ||||
|     Codes, | ||||
|     FederationDeniedError, | ||||
|     FederationError, | ||||
|     HttpResponseException, | ||||
|     NotFoundError, | ||||
|     RequestSendFailed, | ||||
|  | @ -545,7 +555,8 @@ class FederationHandler: | |||
|                 run_as_background_process( | ||||
|                     desc="sync_partial_state_room", | ||||
|                     func=self._sync_partial_state_room, | ||||
|                     destination=origin, | ||||
|                     initial_destination=origin, | ||||
|                     other_destinations=ret.servers_in_room, | ||||
|                     room_id=room_id, | ||||
|                 ) | ||||
| 
 | ||||
|  | @ -1454,13 +1465,16 @@ class FederationHandler: | |||
| 
 | ||||
|     async def _sync_partial_state_room( | ||||
|         self, | ||||
|         destination: str, | ||||
|         initial_destination: Optional[str], | ||||
|         other_destinations: Collection[str], | ||||
|         room_id: str, | ||||
|     ) -> None: | ||||
|         """Background process to resync the state of a partial-state room | ||||
| 
 | ||||
|         Args: | ||||
|             destination: homeserver to pull the state from | ||||
|             initial_destination: the initial homeserver to pull the state from | ||||
|             other_destinations: other homeservers to try to pull the state from, if | ||||
|                 `initial_destination` is unavailable | ||||
|             room_id: room to be resynced | ||||
|         """ | ||||
| 
 | ||||
|  | @ -1472,8 +1486,29 @@ class FederationHandler: | |||
|         #   really leave, that might mean we have difficulty getting the room state over | ||||
|         #   federation. | ||||
|         # | ||||
|         # TODO(faster_joins): try other destinations if the one we have fails | ||||
|         # TODO(faster_joins): we need some way of prioritising which homeservers in | ||||
|         #   `other_destinations` to try first, otherwise we'll spend ages trying dead | ||||
|         #   homeservers for large rooms. | ||||
| 
 | ||||
|         if initial_destination is None and len(other_destinations) == 0: | ||||
|             raise ValueError( | ||||
|                 f"Cannot resync state of {room_id}: no destinations provided" | ||||
|             ) | ||||
| 
 | ||||
|         # Make an infinite iterator of destinations to try. Once we find a working | ||||
|         # destination, we'll stick with it until it flakes. | ||||
|         if initial_destination is not None: | ||||
|             # Move `initial_destination` to the front of the list. | ||||
|             destinations = list(other_destinations) | ||||
|             if initial_destination in destinations: | ||||
|                 destinations.remove(initial_destination) | ||||
|             destinations = [initial_destination] + destinations | ||||
|             destination_iter = itertools.cycle(destinations) | ||||
|         else: | ||||
|             destination_iter = itertools.cycle(other_destinations) | ||||
| 
 | ||||
|         # `destination` is the current remote homeserver we're pulling from. | ||||
|         destination = next(destination_iter) | ||||
|         logger.info("Syncing state for room %s via %s", room_id, destination) | ||||
| 
 | ||||
|         # we work through the queue in order of increasing stream ordering. | ||||
|  | @ -1511,6 +1546,41 @@ class FederationHandler: | |||
|                 allow_rejected=True, | ||||
|             ) | ||||
|             for event in events: | ||||
|                 await self._federation_event_handler.update_state_for_partial_state_event( | ||||
|                     destination, event | ||||
|                 ) | ||||
|                 for attempt in itertools.count(): | ||||
|                     try: | ||||
|                         await self._federation_event_handler.update_state_for_partial_state_event( | ||||
|                             destination, event | ||||
|                         ) | ||||
|                         break | ||||
|                     except FederationError as e: | ||||
|                         if attempt == len(destinations) - 1: | ||||
|                             # We have tried every remote server for this event. Give up. | ||||
|                             # TODO(faster_joins) giving up isn't the right thing to do | ||||
|                             #   if there's a temporary network outage. retrying | ||||
|                             #   indefinitely is also not the right thing to do if we can | ||||
|                             #   reach all homeservers and they all claim they don't have | ||||
|                             #   the state we want. | ||||
|                             logger.error( | ||||
|                                 "Failed to get state for %s at %s from %s because %s, " | ||||
|                                 "giving up!", | ||||
|                                 room_id, | ||||
|                                 event, | ||||
|                                 destination, | ||||
|                                 e, | ||||
|                             ) | ||||
|                             raise | ||||
| 
 | ||||
|                         # Try the next remote server. | ||||
|                         logger.info( | ||||
|                             "Failed to get state for %s at %s from %s because %s", | ||||
|                             room_id, | ||||
|                             event, | ||||
|                             destination, | ||||
|                             e, | ||||
|                         ) | ||||
|                         destination = next(destination_iter) | ||||
|                         logger.info( | ||||
|                             "Syncing state for room %s via %s instead", | ||||
|                             room_id, | ||||
|                             destination, | ||||
|                         ) | ||||
|  |  | |||
|  | @ -505,6 +505,9 @@ class FederationEventHandler: | |||
|         Args: | ||||
|             destination: server to request full state from | ||||
|             event: partial-state event to be de-partial-stated | ||||
| 
 | ||||
|         Raises: | ||||
|             FederationError if we fail to request state from the remote server. | ||||
|         """ | ||||
|         logger.info("Updating state for %s", event.event_id) | ||||
|         with nested_logging_context(suffix=event.event_id): | ||||
|  | @ -815,6 +818,10 @@ class FederationEventHandler: | |||
|         Returns: | ||||
|             if we already had all the prev events, `None`. Otherwise, returns | ||||
|             the event ids of the state at `event`. | ||||
| 
 | ||||
|         Raises: | ||||
|             FederationError if we fail to get the state from the remote server after any | ||||
|                 missing `prev_event`s. | ||||
|         """ | ||||
|         room_id = event.room_id | ||||
|         event_id = event.event_id | ||||
|  | @ -901,6 +908,10 @@ class FederationEventHandler: | |||
| 
 | ||||
|         Returns: | ||||
|             The event ids of the state *after* the given event. | ||||
| 
 | ||||
|         Raises: | ||||
|             InvalidResponseError: if the remote homeserver's response contains fields | ||||
|                 of the wrong type. | ||||
|         """ | ||||
|         ( | ||||
|             state_event_ids, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Sean Quah
						Sean Quah