From b5192355f6ac11eec4781d73a59b14cfc8732d1f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Apr 2023 16:10:32 +0100 Subject: [PATCH] User directory background update speedup (#15435) c.f. #15264 The two changes are: 1. Add indexes so that the select / deletes don't do sequential scans 2. Don't repeatedly call `SELECT count(*)` each iteration, as that's slow --- changelog.d/15435.misc | 1 + .../storage/databases/main/user_directory.py | 89 +++++++++---------- 2 files changed, 45 insertions(+), 45 deletions(-) create mode 100644 changelog.d/15435.misc diff --git a/changelog.d/15435.misc b/changelog.d/15435.misc new file mode 100644 index 0000000000..e0f591b6d1 --- /dev/null +++ b/changelog.d/15435.misc @@ -0,0 +1 @@ +Speed up the user directory background update. diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 9fced4b997..5d65faed16 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -102,44 +102,34 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): ) -> int: # Get all the rooms that we want to process. def _make_staging_area(txn: LoggingTransaction) -> None: - sql = ( - "CREATE TABLE IF NOT EXISTS " - + TEMP_TABLE - + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)" - ) - txn.execute(sql) - - sql = ( - "CREATE TABLE IF NOT EXISTS " - + TEMP_TABLE - + "_position(position TEXT NOT NULL)" - ) - txn.execute(sql) - - # Get rooms we want to process from the database - sql = """ - SELECT room_id, count(*) FROM current_state_events + sql = f""" + CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_rooms AS + SELECT room_id, count(*) AS events + FROM current_state_events GROUP BY room_id """ txn.execute(sql) - rooms = list(txn.fetchall()) - self.db_pool.simple_insert_many_txn( - txn, TEMP_TABLE + "_rooms", keys=("room_id", "events"), values=rooms + txn.execute( + f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_rooms_rm ON {TEMP_TABLE}_rooms (room_id)" + ) + txn.execute( + f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_rooms_evs ON {TEMP_TABLE}_rooms (events)" ) - del rooms - sql = ( - "CREATE TABLE IF NOT EXISTS " - + TEMP_TABLE - + "_users(user_id TEXT NOT NULL)" - ) + sql = f""" + CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_position ( + position TEXT NOT NULL + ) + """ txn.execute(sql) - txn.execute("SELECT name FROM users") - users = list(txn.fetchall()) - - self.db_pool.simple_insert_many_txn( - txn, TEMP_TABLE + "_users", keys=("user_id",), values=users + sql = f""" + CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_users AS + SELECT name AS user_id FROM users + """ + txn.execute(sql) + txn.execute( + f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_users_idx ON {TEMP_TABLE}_users (user_id)" ) new_pos = await self.get_max_stream_id_in_current_state_deltas() @@ -222,12 +212,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): if not rooms_to_work_on: return None - # Get how many are left to process, so we can give status on how - # far we are in processing - txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms") - result = txn.fetchone() - assert result is not None - progress["remaining"] = result[0] + if "remaining" not in progress: + # Get how many are left to process, so we can give status on how + # far we are in processing + txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms") + result = txn.fetchone() + assert result is not None + progress["remaining"] = result[0] return rooms_to_work_on @@ -332,7 +323,14 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): if processed_event_count > batch_size: # Don't process any more rooms, we've hit our batch size. - return processed_event_count + break + + await self.db_pool.runInteraction( + "populate_user_directory", + self.db_pool.updates._background_update_progress_txn, + "populate_user_directory_process_rooms", + progress, + ) return processed_event_count @@ -356,13 +354,14 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): users_to_work_on = [x[0] for x in user_result] - # Get how many are left to process, so we can give status on how - # far we are in processing - sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users" - txn.execute(sql) - count_result = txn.fetchone() - assert count_result is not None - progress["remaining"] = count_result[0] + if "remaining" not in progress: + # Get how many are left to process, so we can give status on how + # far we are in processing + sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users" + txn.execute(sql) + count_result = txn.fetchone() + assert count_result is not None + progress["remaining"] = count_result[0] return users_to_work_on