Speed up rebuilding of the user directory for local users (#15529)

The idea here is to batch up the work.
pull/15533/head
Erik Johnston 2023-05-03 14:41:37 +01:00 committed by GitHub
parent 9890f23469
commit fc3a878220
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 171 additions and 76 deletions

1
changelog.d/15529.misc Normal file
View File

@ -0,0 +1 @@
Speed up rebuilding of the user directory for local users.

View File

@ -386,13 +386,20 @@ class LoggingTransaction:
self.executemany(sql, args)
def execute_values(
self, sql: str, values: Iterable[Iterable[Any]], fetch: bool = True
self,
sql: str,
values: Iterable[Iterable[Any]],
template: Optional[str] = None,
fetch: bool = True,
) -> List[Tuple]:
"""Corresponds to psycopg2.extras.execute_values. Only available when
using postgres.
The `fetch` parameter must be set to False if the query does not return
rows (e.g. INSERTs).
The `template` is the snippet to merge to every item in argslist to
compose the query.
"""
assert isinstance(self.database_engine, PostgresEngine)
from psycopg2.extras import execute_values
@ -400,7 +407,9 @@ class LoggingTransaction:
return self._do_execute(
# TODO: is it safe for values to be Iterable[Iterable[Any]] here?
# https://www.psycopg.org/docs/extras.html?highlight=execute_batch#psycopg2.extras.execute_values says values should be Sequence[Sequence]
lambda the_sql: execute_values(self.txn, the_sql, values, fetch=fetch),
lambda the_sql: execute_values(
self.txn, the_sql, values, template=template, fetch=fetch
),
sql,
)

View File

@ -27,6 +27,8 @@ from typing import (
cast,
)
import attr
try:
# Figure out if ICU support is available for searching users.
import icu
@ -66,6 +68,19 @@ logger = logging.getLogger(__name__)
TEMP_TABLE = "_temp_populate_user_directory"
@attr.s(auto_attribs=True, frozen=True)
class _UserDirProfile:
"""Helper type for the user directory code for an entry to be inserted into
the directory.
"""
user_id: str
# If the display name or avatar URL are unexpected types, replace with None
display_name: Optional[str] = attr.ib(default=None, converter=non_null_str_or_none)
avatar_url: Optional[str] = attr.ib(default=None, converter=non_null_str_or_none)
class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
# How many records do we calculate before sending it to
# add_users_who_share_private_rooms?
@ -381,25 +396,65 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
% (len(users_to_work_on), progress["remaining"])
)
for user_id in users_to_work_on:
if await self.should_include_local_user_in_dir(user_id):
profile = await self.get_profileinfo(get_localpart_from_id(user_id)) # type: ignore[attr-defined]
await self.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)
# First filter down to users we want to insert into the user directory.
users_to_insert = [
user_id
for user_id in users_to_work_on
if await self.should_include_local_user_in_dir(user_id)
]
# We've finished processing a user. Delete it from the table.
await self.db_pool.simple_delete_one(
TEMP_TABLE + "_users", {"user_id": user_id}
)
# Update the remaining counter.
progress["remaining"] -= 1
await self.db_pool.runInteraction(
"populate_user_directory",
self.db_pool.updates._background_update_progress_txn,
"populate_user_directory_process_users",
progress,
# Next fetch their profiles. Note that the `user_id` here is the
# *localpart*, and that not all users have profiles.
profile_rows = await self.db_pool.simple_select_many_batch(
table="profiles",
column="user_id",
iterable=[get_localpart_from_id(u) for u in users_to_insert],
retcols=(
"user_id",
"displayname",
"avatar_url",
),
keyvalues={},
desc="populate_user_directory_process_users_get_profiles",
)
profiles = {
f"@{row['user_id']}:{self.server_name}": _UserDirProfile(
f"@{row['user_id']}:{self.server_name}",
row["displayname"],
row["avatar_url"],
)
for row in profile_rows
}
profiles_to_insert = [
profiles.get(user_id) or _UserDirProfile(user_id)
for user_id in users_to_insert
]
# Actually insert the users with their profiles into the directory.
await self.db_pool.runInteraction(
"populate_user_directory_process_users_insertion",
self._update_profiles_in_user_dir_txn,
profiles_to_insert,
)
# We've finished processing the users. Delete it from the table.
await self.db_pool.simple_delete_many(
table=TEMP_TABLE + "_users",
column="user_id",
iterable=users_to_work_on,
keyvalues={},
desc="populate_user_directory_process_users_delete",
)
# Update the remaining counter.
progress["remaining"] -= len(users_to_work_on)
await self.db_pool.runInteraction(
"populate_user_directory",
self.db_pool.updates._background_update_progress_txn,
"populate_user_directory_process_users",
progress,
)
return len(users_to_work_on)
@ -584,72 +639,102 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
Update or add a user's profile in the user directory.
If the user is remote, the profile will be marked as not stale.
"""
# If the display name or avatar URL are unexpected types, replace with None.
display_name = non_null_str_or_none(display_name)
avatar_url = non_null_str_or_none(avatar_url)
await self.db_pool.runInteraction(
"update_profiles_in_user_dir",
self._update_profiles_in_user_dir_txn,
[_UserDirProfile(user_id, display_name, avatar_url)],
)
def _update_profile_in_user_dir_txn(txn: LoggingTransaction) -> None:
self.db_pool.simple_upsert_txn(
def _update_profiles_in_user_dir_txn(
self,
txn: LoggingTransaction,
profiles: Sequence[_UserDirProfile],
) -> None:
self.db_pool.simple_upsert_many_txn(
txn,
table="user_directory",
key_names=("user_id",),
key_values=[(p.user_id,) for p in profiles],
value_names=("display_name", "avatar_url"),
value_values=[
(
p.display_name,
p.avatar_url,
)
for p in profiles
],
)
# Remote users: Make sure the profile is not marked as stale anymore.
remote_users = [
p.user_id for p in profiles if not self.hs.is_mine_id(p.user_id)
]
if remote_users:
self.db_pool.simple_delete_many_txn(
txn,
table="user_directory",
keyvalues={"user_id": user_id},
values={"display_name": display_name, "avatar_url": avatar_url},
table="user_directory_stale_remote_users",
column="user_id",
values=remote_users,
keyvalues={},
)
if not self.hs.is_mine_id(user_id):
# Remote users: Make sure the profile is not marked as stale anymore.
self.db_pool.simple_delete_txn(
txn,
table="user_directory_stale_remote_users",
keyvalues={"user_id": user_id},
if isinstance(self.database_engine, PostgresEngine):
# We weight the localpart most highly, then display name and finally
# server name
template = """
(
%s,
setweight(to_tsvector('simple', %s), 'A')
|| setweight(to_tsvector('simple', %s), 'D')
|| setweight(to_tsvector('simple', COALESCE(%s, '')), 'B')
)
"""
# The display name that goes into the database index.
index_display_name = display_name
if index_display_name is not None:
index_display_name = _filter_text_for_index(index_display_name)
if isinstance(self.database_engine, PostgresEngine):
# We weight the localpart most highly, then display name and finally
# server name
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES (?,
setweight(to_tsvector('simple', ?), 'A')
|| setweight(to_tsvector('simple', ?), 'D')
|| setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
"""
txn.execute(
sql,
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES ? ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
"""
txn.execute_values(
sql,
[
(
user_id,
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
index_display_name,
),
)
elif isinstance(self.database_engine, Sqlite3Engine):
value = (
"%s %s" % (user_id, index_display_name)
if index_display_name
else user_id
)
self.db_pool.simple_upsert_txn(
txn,
table="user_directory_search",
keyvalues={"user_id": user_id},
values={"value": value},
)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")
p.user_id,
get_localpart_from_id(p.user_id),
get_domain_from_id(p.user_id),
_filter_text_for_index(p.display_name)
if p.display_name
else None,
)
for p in profiles
],
template=template,
fetch=False,
)
elif isinstance(self.database_engine, Sqlite3Engine):
values = []
for p in profiles:
if p.display_name is not None:
index_display_name = _filter_text_for_index(p.display_name)
value = f"{p.user_id} {index_display_name}"
else:
value = p.user_id
txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
values.append((value,))
await self.db_pool.runInteraction(
"update_profile_in_user_dir", _update_profile_in_user_dir_txn
)
self.db_pool.simple_upsert_many_txn(
txn,
table="user_directory_search",
key_names=("user_id",),
key_values=[(p.user_id,) for p in profiles],
value_names=("value",),
value_values=values,
)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")
for p in profiles:
txn.call_after(self.get_user_in_directory.invalidate, (p.user_id,))
async def add_users_who_share_private_room(
self, room_id: str, user_id_tuples: Iterable[Tuple[str, str]]