User directory background update speedup (#15435)

c.f. #15264

The two changes are:
1. Add indexes so that the select / deletes don't do sequential scans
2. Don't repeatedly call `SELECT count(*)` each iteration, as that's slow
pull/15441/head
Erik Johnston 2023-04-14 16:10:32 +01:00 committed by GitHub
parent dabbb94faf
commit b5192355f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 45 deletions

1
changelog.d/15435.misc Normal file
View File

@ -0,0 +1 @@
Speed up the user directory background update.

View File

@ -102,44 +102,34 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
) -> int:
# Get all the rooms that we want to process.
def _make_staging_area(txn: LoggingTransaction) -> None:
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
)
txn.execute(sql)
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_position(position TEXT NOT NULL)"
)
txn.execute(sql)
# Get rooms we want to process from the database
sql = """
SELECT room_id, count(*) FROM current_state_events
sql = f"""
CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_rooms AS
SELECT room_id, count(*) AS events
FROM current_state_events
GROUP BY room_id
"""
txn.execute(sql)
rooms = list(txn.fetchall())
self.db_pool.simple_insert_many_txn(
txn, TEMP_TABLE + "_rooms", keys=("room_id", "events"), values=rooms
txn.execute(
f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_rooms_rm ON {TEMP_TABLE}_rooms (room_id)"
)
txn.execute(
f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_rooms_evs ON {TEMP_TABLE}_rooms (events)"
)
del rooms
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_users(user_id TEXT NOT NULL)"
)
sql = f"""
CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_position (
position TEXT NOT NULL
)
"""
txn.execute(sql)
txn.execute("SELECT name FROM users")
users = list(txn.fetchall())
self.db_pool.simple_insert_many_txn(
txn, TEMP_TABLE + "_users", keys=("user_id",), values=users
sql = f"""
CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_users AS
SELECT name AS user_id FROM users
"""
txn.execute(sql)
txn.execute(
f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_users_idx ON {TEMP_TABLE}_users (user_id)"
)
new_pos = await self.get_max_stream_id_in_current_state_deltas()
@ -222,12 +212,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
if not rooms_to_work_on:
return None
# Get how many are left to process, so we can give status on how
# far we are in processing
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
result = txn.fetchone()
assert result is not None
progress["remaining"] = result[0]
if "remaining" not in progress:
# Get how many are left to process, so we can give status on how
# far we are in processing
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
result = txn.fetchone()
assert result is not None
progress["remaining"] = result[0]
return rooms_to_work_on
@ -332,7 +323,14 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
if processed_event_count > batch_size:
# Don't process any more rooms, we've hit our batch size.
return processed_event_count
break
await self.db_pool.runInteraction(
"populate_user_directory",
self.db_pool.updates._background_update_progress_txn,
"populate_user_directory_process_rooms",
progress,
)
return processed_event_count
@ -356,13 +354,14 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
users_to_work_on = [x[0] for x in user_result]
# Get how many are left to process, so we can give status on how
# far we are in processing
sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
txn.execute(sql)
count_result = txn.fetchone()
assert count_result is not None
progress["remaining"] = count_result[0]
if "remaining" not in progress:
# Get how many are left to process, so we can give status on how
# far we are in processing
sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
txn.execute(sql)
count_result = txn.fetchone()
assert count_result is not None
progress["remaining"] = count_result[0]
return users_to_work_on