From b2a629ef498df2b0585c9474613dda778bec7be0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 Jul 2019 09:49:26 +0100 Subject: [PATCH] Speed up current state background update. Turns out that storing huge JSON arrays in the progress JSON isn't something that postgres particularly likes. --- synapse/storage/roommember.py | 50 ++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 257bcdb2f8..b3c002b9eb 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -852,22 +852,25 @@ class RoomMemberStore(RoomMemberWorkerStore): @defer.inlineCallbacks def _background_current_state_membership(self, progress, batch_size): """Update the new membership column on current_state_events. + + This works by iterating over all rooms in alphebetical order. """ - if "rooms" not in progress: - rooms = yield self._simple_select_onecol( - table="current_state_events", - keyvalues={}, - retcol="DISTINCT room_id", - desc="_background_current_state_membership_get_rooms", - ) - progress["rooms"] = rooms - - rooms = progress["rooms"] - - def _background_current_state_membership_txn(txn): + def _background_current_state_membership_txn(txn, last_processed_room): processed = 0 - while rooms and processed < batch_size: + while processed < batch_size: + txn.execute( + """ + SELECT MIN(room_id) FROM rooms WHERE room_id > ? + """, + (last_processed_room,), + ) + row = txn.fetchone() + if not row or not row[0]: + return processed, True + + next_room, = row + sql = """ UPDATE current_state_events AS c SET membership = ( @@ -876,24 +879,33 @@ class RoomMemberStore(RoomMemberWorkerStore): ) WHERE room_id = ? """ - txn.execute(sql, (rooms.pop(),)) + txn.execute(sql, (next_room,)) processed += txn.rowcount + last_processed_room = next_room + self._background_update_progress_txn( - txn, _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, progress + txn, + _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, + {"last_processed_room": last_processed_room}, ) - return processed + return processed, False - result = yield self.runInteraction( + # If we haven't got a last processed room then just use the empty + # string, which will compare before all room IDs correctly. + last_processed_room = progress.get("last_processed_room", "") + + row_count, finished = yield self.runInteraction( "_background_current_state_membership_update", _background_current_state_membership_txn, + last_processed_room, ) - if not rooms: + if finished: yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME) - defer.returnValue(result) + defer.returnValue(row_count) class _JoinedHostsCache(object):