Use _invalidate_cache_and_stream_bulk in more places. (#16616)

This takes advantage of the new bulk method in more places to
invalidate caches for many keys at once (and then to stream that
over replication).
pull/16622/head
Patrick Cloke 2023-11-09 14:40:30 -05:00 committed by GitHub
parent ab3f1b3b53
commit 9f514dd0fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 72 additions and 47 deletions

View File

@ -1 +1 @@
Improve the performance of claiming encryption keys in multi-worker deployments.
Improve the performance of some operations in multi-worker deployments.

View File

@ -0,0 +1 @@
Improve the performance of some operations in multi-worker deployments.

View File

@ -747,8 +747,16 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
)
# Invalidate the cache for any ignored users which were added or removed.
for ignored_user_id in previously_ignored_users ^ currently_ignored_users:
self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
self._invalidate_cache_and_stream_bulk(
txn,
self.ignored_by,
[
(ignored_user_id,)
for ignored_user_id in (
previously_ignored_users ^ currently_ignored_users
)
],
)
self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))
async def remove_account_data_for_user(
@ -824,10 +832,14 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
)
# Invalidate the cache for ignored users which were removed.
for ignored_user_id in previously_ignored_users:
self._invalidate_cache_and_stream(
txn, self.ignored_by, (ignored_user_id,)
)
self._invalidate_cache_and_stream_bulk(
txn,
self.ignored_by,
[
(ignored_user_id,)
for ignored_user_id in previously_ignored_users
],
)
# Invalidate for this user the cache tracking ignored users.
self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))

View File

@ -1222,14 +1222,13 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
)
# Iterate the parent IDs and invalidate caches.
for parent_id in {r[1] for r in relations_to_insert}:
cache_tuple = (parent_id,)
self._invalidate_cache_and_stream( # type: ignore[attr-defined]
txn, self.get_relations_for_event, cache_tuple # type: ignore[attr-defined]
)
self._invalidate_cache_and_stream( # type: ignore[attr-defined]
txn, self.get_thread_summary, cache_tuple # type: ignore[attr-defined]
)
cache_tuples = {(r[1],) for r in relations_to_insert}
self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined]
txn, self.get_relations_for_event, cache_tuples # type: ignore[attr-defined]
)
self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined]
txn, self.get_thread_summary, cache_tuples # type: ignore[attr-defined]
)
if results:
latest_event_id = results[-1][0]

View File

@ -107,13 +107,16 @@ class KeyStore(CacheInvalidationWorkerStore):
# invalidate takes a tuple corresponding to the params of
# _get_server_keys_json. _get_server_keys_json only takes one
# param, which is itself the 2-tuple (server_name, key_id).
for key_id in verify_keys:
self._invalidate_cache_and_stream(
txn, self._get_server_keys_json, ((server_name, key_id),)
)
self._invalidate_cache_and_stream(
txn, self.get_server_key_json_for_remote, (server_name, key_id)
)
self._invalidate_cache_and_stream_bulk(
txn,
self._get_server_keys_json,
[((server_name, key_id),) for key_id in verify_keys],
)
self._invalidate_cache_and_stream_bulk(
txn,
self.get_server_key_json_for_remote,
[(server_name, key_id) for key_id in verify_keys],
)
await self.db_pool.runInteraction(
"store_server_keys_response", store_server_keys_response_txn

View File

@ -363,10 +363,11 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
# for their user ID.
value_values=[(presence_stream_id,) for _ in user_ids],
)
for user_id in user_ids:
self._invalidate_cache_and_stream(
txn, self._get_full_presence_stream_token_for_user, (user_id,)
)
self._invalidate_cache_and_stream_bulk(
txn,
self._get_full_presence_stream_token_for_user,
[(user_id,) for user_id in user_ids],
)
return await self.db_pool.runInteraction(
"add_users_to_send_full_presence_to", _add_users_to_send_full_presence_to

View File

@ -295,19 +295,28 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# so make sure to keep this actually last.
txn.execute("DROP TABLE events_to_purge")
for event_id, should_delete in event_rows:
self._invalidate_cache_and_stream(
txn, self._get_state_group_for_event, (event_id,)
)
self._invalidate_cache_and_stream_bulk(
txn,
self._get_state_group_for_event,
[(event_id,) for event_id, _ in event_rows],
)
# XXX: This is racy, since have_seen_events could be called between the
# transaction completing and the invalidation running. On the other hand,
# that's no different to calling `have_seen_events` just before the
# event is deleted from the database.
# XXX: This is racy, since have_seen_events could be called between the
# transaction completing and the invalidation running. On the other hand,
# that's no different to calling `have_seen_events` just before the
# event is deleted from the database.
self._invalidate_cache_and_stream_bulk(
txn,
self.have_seen_event,
[
(room_id, event_id)
for event_id, should_delete in event_rows
if should_delete
],
)
for event_id, should_delete in event_rows:
if should_delete:
self._invalidate_cache_and_stream(
txn, self.have_seen_event, (room_id, event_id)
)
self.invalidate_get_event_cache_after_txn(txn, event_id)
logger.info("[purge] done")

View File

@ -561,16 +561,15 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
updatevalues={"shadow_banned": shadow_banned},
)
# In order for this to apply immediately, clear the cache for this user.
tokens = self.db_pool.simple_select_onecol_txn(
tokens = self.db_pool.simple_select_list_txn(
txn,
table="access_tokens",
keyvalues={"user_id": user_id},
retcol="token",
retcols=("token",),
)
self._invalidate_cache_and_stream_bulk(
txn, self.get_user_by_access_token, tokens
)
for token in tokens:
self._invalidate_cache_and_stream(
txn, self.get_user_by_access_token, (token,)
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn)
@ -2683,10 +2682,11 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
)
tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]
for token, _, _ in tokens_and_devices:
self._invalidate_cache_and_stream(
txn, self.get_user_by_access_token, (token,)
)
self._invalidate_cache_and_stream_bulk(
txn,
self.get_user_by_access_token,
[(token,) for token, _, _ in tokens_and_devices],
)
txn.execute("DELETE FROM access_tokens WHERE %s" % where_clause, values)