Faster remote room joins: unblock tasks waiting for full room state when the un-partial-stating of that room is received over the replication stream. [rei:frrj/streams/unpsr] (#14474)
							parent
							
								
									66d47b44cd
								
							
						
					
					
						commit
						9e82caac45
					
				|  | @ -0,0 +1 @@ | |||
| Faster remote room joins: stream the un-partial-stating of rooms over replication. | ||||
|  | @ -36,12 +36,14 @@ from synapse.replication.tcp.streams import ( | |||
|     TagAccountDataStream, | ||||
|     ToDeviceStream, | ||||
|     TypingStream, | ||||
|     UnPartialStatedRoomStream, | ||||
| ) | ||||
| from synapse.replication.tcp.streams.events import ( | ||||
|     EventsStream, | ||||
|     EventsStreamEventRow, | ||||
|     EventsStreamRow, | ||||
| ) | ||||
| from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStreamRow | ||||
| from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID | ||||
| from synapse.util.async_helpers import Linearizer, timeout_deferred | ||||
| from synapse.util.metrics import Measure | ||||
|  | @ -117,6 +119,7 @@ class ReplicationDataHandler: | |||
|         self._streams = hs.get_replication_streams() | ||||
|         self._instance_name = hs.get_instance_name() | ||||
|         self._typing_handler = hs.get_typing_handler() | ||||
|         self._state_storage_controller = hs.get_storage_controllers().state | ||||
| 
 | ||||
|         self._notify_pushers = hs.config.worker.start_pushers | ||||
|         self._pusher_pool = hs.get_pusherpool() | ||||
|  | @ -236,6 +239,14 @@ class ReplicationDataHandler: | |||
|                     self.notifier.notify_user_joined_room( | ||||
|                         row.data.event_id, row.data.room_id | ||||
|                     ) | ||||
|         elif stream_name == UnPartialStatedRoomStream.NAME: | ||||
|             for row in rows: | ||||
|                 assert isinstance(row, UnPartialStatedRoomStreamRow) | ||||
| 
 | ||||
|                 # Wake up any tasks waiting for the room to be un-partial-stated. | ||||
|                 self._state_storage_controller.notify_room_un_partial_stated( | ||||
|                     row.room_id | ||||
|                 ) | ||||
| 
 | ||||
|         await self._presence_handler.process_replication_rows( | ||||
|             stream_name, instance_name, token, rows | ||||
|  |  | |||
|  | @ -0,0 +1,65 @@ | |||
| # Copyright 2022 The Matrix.org Foundation C.I.C. | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| from twisted.internet.defer import ensureDeferred | ||||
| 
 | ||||
| from synapse.rest.client import room | ||||
| 
 | ||||
| from tests.replication._base import BaseMultiWorkerStreamTestCase | ||||
| 
 | ||||
| 
 | ||||
| class PartialStateStreamsTestCase(BaseMultiWorkerStreamTestCase): | ||||
|     servlets = [room.register_servlets] | ||||
|     hijack_auth = True | ||||
|     user_id = "@bob:test" | ||||
| 
 | ||||
|     def setUp(self): | ||||
|         super().setUp() | ||||
|         self.store = self.hs.get_datastores().main | ||||
| 
 | ||||
|     def test_un_partial_stated_room_unblocks_over_replication(self) -> None: | ||||
|         """ | ||||
|         Tests that, when a room is un-partial-stated on another worker, | ||||
|         pending calls to `await_full_state` get unblocked. | ||||
|         """ | ||||
| 
 | ||||
|         # Make a room. | ||||
|         room_id = self.helper.create_room_as("@bob:test") | ||||
|         # Mark the room as partial-stated. | ||||
|         self.get_success( | ||||
|             self.store.store_partial_state_room(room_id, ["serv1", "serv2"], 0, "serv1") | ||||
|         ) | ||||
| 
 | ||||
|         worker = self.make_worker_hs("synapse.app.generic_worker") | ||||
| 
 | ||||
|         # On the worker, attempt to get the current hosts in the room | ||||
|         d = ensureDeferred( | ||||
|             worker.get_storage_controllers().state.get_current_hosts_in_room(room_id) | ||||
|         ) | ||||
| 
 | ||||
|         self.reactor.advance(0.1) | ||||
| 
 | ||||
|         # This should block | ||||
|         self.assertFalse( | ||||
|             d.called, "get_current_hosts_in_room/await_full_state did not block" | ||||
|         ) | ||||
| 
 | ||||
|         # On the master, clear the partial state flag. | ||||
|         self.get_success(self.store.clear_partial_state_room(room_id)) | ||||
| 
 | ||||
|         self.reactor.advance(0.1) | ||||
| 
 | ||||
|         # The worker should have unblocked | ||||
|         self.assertTrue( | ||||
|             d.called, "get_current_hosts_in_room/await_full_state did not unblock" | ||||
|         ) | ||||
		Loading…
	
		Reference in New Issue
	
	 reivilibre
						reivilibre