Merge pull request #5738 from matrix-org/erikj/faster_update
Speed up current state background update.pull/5740/head
						commit
						4f6984aa88
					
				|  | @ -0,0 +1 @@ | |||
| Reduce database IO usage by optimising queries for current membership. | ||||
|  | @ -852,22 +852,25 @@ class RoomMemberStore(RoomMemberWorkerStore): | |||
|     @defer.inlineCallbacks | ||||
|     def _background_current_state_membership(self, progress, batch_size): | ||||
|         """Update the new membership column on current_state_events. | ||||
| 
 | ||||
|         This works by iterating over all rooms in alphebetical order. | ||||
|         """ | ||||
| 
 | ||||
|         if "rooms" not in progress: | ||||
|             rooms = yield self._simple_select_onecol( | ||||
|                 table="current_state_events", | ||||
|                 keyvalues={}, | ||||
|                 retcol="DISTINCT room_id", | ||||
|                 desc="_background_current_state_membership_get_rooms", | ||||
|             ) | ||||
|             progress["rooms"] = rooms | ||||
| 
 | ||||
|         rooms = progress["rooms"] | ||||
| 
 | ||||
|         def _background_current_state_membership_txn(txn): | ||||
|         def _background_current_state_membership_txn(txn, last_processed_room): | ||||
|             processed = 0 | ||||
|             while rooms and processed < batch_size: | ||||
|             while processed < batch_size: | ||||
|                 txn.execute( | ||||
|                     """ | ||||
|                         SELECT MIN(room_id) FROM rooms WHERE room_id > ? | ||||
|                     """, | ||||
|                     (last_processed_room,), | ||||
|                 ) | ||||
|                 row = txn.fetchone() | ||||
|                 if not row or not row[0]: | ||||
|                     return processed, True | ||||
| 
 | ||||
|                 next_room, = row | ||||
| 
 | ||||
|                 sql = """ | ||||
|                     UPDATE current_state_events AS c | ||||
|                     SET membership = ( | ||||
|  | @ -876,24 +879,33 @@ class RoomMemberStore(RoomMemberWorkerStore): | |||
|                     ) | ||||
|                     WHERE room_id = ? | ||||
|                 """ | ||||
|                 txn.execute(sql, (rooms.pop(),)) | ||||
|                 txn.execute(sql, (next_room,)) | ||||
|                 processed += txn.rowcount | ||||
| 
 | ||||
|                 last_processed_room = next_room | ||||
| 
 | ||||
|             self._background_update_progress_txn( | ||||
|                 txn, _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, progress | ||||
|                 txn, | ||||
|                 _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, | ||||
|                 {"last_processed_room": last_processed_room}, | ||||
|             ) | ||||
| 
 | ||||
|             return processed | ||||
|             return processed, False | ||||
| 
 | ||||
|         result = yield self.runInteraction( | ||||
|         # If we haven't got a last processed room then just use the empty | ||||
|         # string, which will compare before all room IDs correctly. | ||||
|         last_processed_room = progress.get("last_processed_room", "") | ||||
| 
 | ||||
|         row_count, finished = yield self.runInteraction( | ||||
|             "_background_current_state_membership_update", | ||||
|             _background_current_state_membership_txn, | ||||
|             last_processed_room, | ||||
|         ) | ||||
| 
 | ||||
|         if not rooms: | ||||
|         if finished: | ||||
|             yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME) | ||||
| 
 | ||||
|         defer.returnValue(result) | ||||
|         defer.returnValue(row_count) | ||||
| 
 | ||||
| 
 | ||||
| class _JoinedHostsCache(object): | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Erik Johnston
						Erik Johnston