Refactor presence so we can prune user in room caches (#13313)

See #10826 and #10786 for context as to why we had to disable pruning on
those caches.

Now that `get_users_who_share_room_with_user` is called frequently only
for presence, we just need to make calls to it less frequent and then we
can remove the various levels of caching that is going on.
pull/13365/head
Erik Johnston 2022-07-25 10:21:06 +01:00 committed by GitHub
parent 357561c1a2
commit 43adf2521c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 109 additions and 91 deletions

1
changelog.d/13313.misc Normal file
View File

@ -0,0 +1 @@
Change `get_users_in_room` and `get_rooms_for_user` caches to enable pruning of old entries.

View File

@ -34,7 +34,6 @@ from typing import (
Callable, Callable,
Collection, Collection,
Dict, Dict,
FrozenSet,
Generator, Generator,
Iterable, Iterable,
List, List,
@ -42,7 +41,6 @@ from typing import (
Set, Set,
Tuple, Tuple,
Type, Type,
Union,
) )
from prometheus_client import Counter from prometheus_client import Counter
@ -68,7 +66,6 @@ from synapse.storage.databases.main import DataStore
from synapse.streams import EventSource from synapse.streams import EventSource
from synapse.types import JsonDict, StreamKeyType, UserID, get_domain_from_id from synapse.types import JsonDict, StreamKeyType, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer from synapse.util.async_helpers import Linearizer
from synapse.util.caches.descriptors import _CacheContext, cached
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer from synapse.util.wheel_timer import WheelTimer
@ -1656,15 +1653,18 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
# doesn't return. C.f. #5503. # doesn't return. C.f. #5503.
return [], max_token return [], max_token
# Figure out which other users this user should receive updates for # Figure out which other users this user should explicitly receive
users_interested_in = await self._get_interested_in(user, explicit_room_id) # updates for
additional_users_interested_in = (
await self.get_presence_router().get_interested_users(user.to_string())
)
# We have a set of users that we're interested in the presence of. We want to # We have a set of users that we're interested in the presence of. We want to
# cross-reference that with the users that have actually changed their presence. # cross-reference that with the users that have actually changed their presence.
# Check whether this user should see all user updates # Check whether this user should see all user updates
if users_interested_in == PresenceRouter.ALL_USERS: if additional_users_interested_in == PresenceRouter.ALL_USERS:
# Provide presence state for all users # Provide presence state for all users
presence_updates = await self._filter_all_presence_updates_for_user( presence_updates = await self._filter_all_presence_updates_for_user(
user_id, include_offline, from_key user_id, include_offline, from_key
@ -1673,34 +1673,47 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
return presence_updates, max_token return presence_updates, max_token
# Make mypy happy. users_interested_in should now be a set # Make mypy happy. users_interested_in should now be a set
assert not isinstance(users_interested_in, str) assert not isinstance(additional_users_interested_in, str)
# We always care about our own presence.
additional_users_interested_in.add(user_id)
if explicit_room_id:
user_ids = await self.store.get_users_in_room(explicit_room_id)
additional_users_interested_in.update(user_ids)
# The set of users that we're interested in and that have had a presence update. # The set of users that we're interested in and that have had a presence update.
# We'll actually pull the presence updates for these users at the end. # We'll actually pull the presence updates for these users at the end.
interested_and_updated_users: Union[Set[str], FrozenSet[str]] = set() interested_and_updated_users: Collection[str]
if from_key is not None: if from_key is not None:
# First get all users that have had a presence update # First get all users that have had a presence update
updated_users = stream_change_cache.get_all_entities_changed(from_key) updated_users = stream_change_cache.get_all_entities_changed(from_key)
# Cross-reference users we're interested in with those that have had updates. # Cross-reference users we're interested in with those that have had updates.
# Use a slightly-optimised method for processing smaller sets of updates. if updated_users is not None:
if updated_users is not None and len(updated_users) < 500: # If we have the full list of changes for presence we can
# For small deltas, it's quicker to get all changes and then # simply check which ones share a room with the user.
# cross-reference with the users we're interested in
get_updates_counter.labels("stream").inc() get_updates_counter.labels("stream").inc()
for other_user_id in updated_users:
if other_user_id in users_interested_in: sharing_users = await self.store.do_users_share_a_room(
# mypy thinks this variable could be a FrozenSet as it's possibly set user_id, updated_users
# to one in the `get_entities_changed` call below, and `add()` is not )
# method on a FrozenSet. That doesn't affect us here though, as
# `interested_and_updated_users` is clearly a set() above. interested_and_updated_users = (
interested_and_updated_users.add(other_user_id) # type: ignore sharing_users.union(additional_users_interested_in)
).intersection(updated_users)
else: else:
# Too many possible updates. Find all users we can see and check # Too many possible updates. Find all users we can see and check
# if any of them have changed. # if any of them have changed.
get_updates_counter.labels("full").inc() get_updates_counter.labels("full").inc()
users_interested_in = (
await self.store.get_users_who_share_room_with_user(user_id)
)
users_interested_in.update(additional_users_interested_in)
interested_and_updated_users = ( interested_and_updated_users = (
stream_change_cache.get_entities_changed( stream_change_cache.get_entities_changed(
users_interested_in, from_key users_interested_in, from_key
@ -1709,7 +1722,10 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
else: else:
# No from_key has been specified. Return the presence for all users # No from_key has been specified. Return the presence for all users
# this user is interested in # this user is interested in
interested_and_updated_users = users_interested_in interested_and_updated_users = (
await self.store.get_users_who_share_room_with_user(user_id)
)
interested_and_updated_users.update(additional_users_interested_in)
# Retrieve the current presence state for each user # Retrieve the current presence state for each user
users_to_state = await self.get_presence_handler().current_state_for_users( users_to_state = await self.get_presence_handler().current_state_for_users(
@ -1804,62 +1820,6 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
def get_current_key(self) -> int: def get_current_key(self) -> int:
return self.store.get_current_presence_token() return self.store.get_current_presence_token()
@cached(num_args=2, cache_context=True)
async def _get_interested_in(
self,
user: UserID,
explicit_room_id: Optional[str] = None,
cache_context: Optional[_CacheContext] = None,
) -> Union[Set[str], str]:
"""Returns the set of users that the given user should see presence
updates for.
Args:
user: The user to retrieve presence updates for.
explicit_room_id: The users that are in the room will be returned.
Returns:
A set of user IDs to return presence updates for, or "ALL" to return all
known updates.
"""
user_id = user.to_string()
users_interested_in = set()
users_interested_in.add(user_id) # So that we receive our own presence
# cache_context isn't likely to ever be None due to the @cached decorator,
# but we can't have a non-optional argument after the optional argument
# explicit_room_id either. Assert cache_context is not None so we can use it
# without mypy complaining.
assert cache_context
# Check with the presence router whether we should poll additional users for
# their presence information
additional_users = await self.get_presence_router().get_interested_users(
user.to_string()
)
if additional_users == PresenceRouter.ALL_USERS:
# If the module requested that this user see the presence updates of *all*
# users, then simply return that instead of calculating what rooms this
# user shares
return PresenceRouter.ALL_USERS
# Add the additional users from the router
users_interested_in.update(additional_users)
# Find the users who share a room with this user
users_who_share_room = await self.store.get_users_who_share_room_with_user(
user_id, on_invalidate=cache_context.invalidate
)
users_interested_in.update(users_who_share_room)
if explicit_room_id:
user_ids = await self.store.get_users_in_room(
explicit_room_id, on_invalidate=cache_context.invalidate
)
users_interested_in.update(user_ids)
return users_interested_in
def handle_timeouts( def handle_timeouts(
user_states: List[UserPresenceState], user_states: List[UserPresenceState],

View File

@ -80,6 +80,10 @@ class SQLBaseStore(metaclass=ABCMeta):
) )
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,)) self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
# There's no easy way of invalidating this cache for just the users
# that have changed, so we just clear the entire thing.
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
for user_id in members_changed: for user_id in members_changed:
self._attempt_to_invalidate_cache( self._attempt_to_invalidate_cache(
"get_user_in_room_with_profile", (room_id, user_id) "get_user_in_room_with_profile", (room_id, user_id)

View File

@ -21,6 +21,7 @@ from typing import (
FrozenSet, FrozenSet,
Iterable, Iterable,
List, List,
Mapping,
Optional, Optional,
Set, Set,
Tuple, Tuple,
@ -55,6 +56,7 @@ from synapse.types import JsonDict, PersistedEventPosition, StateMap, get_domain
from synapse.util.async_helpers import Linearizer from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
if TYPE_CHECKING: if TYPE_CHECKING:
@ -183,7 +185,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
self._check_safe_current_state_events_membership_updated_txn, self._check_safe_current_state_events_membership_updated_txn,
) )
@cached(max_entries=100000, iterable=True, prune_unread_entries=False) @cached(max_entries=100000, iterable=True)
async def get_users_in_room(self, room_id: str) -> List[str]: async def get_users_in_room(self, room_id: str) -> List[str]:
return await self.db_pool.runInteraction( return await self.db_pool.runInteraction(
"get_users_in_room", self.get_users_in_room_txn, room_id "get_users_in_room", self.get_users_in_room_txn, room_id
@ -561,7 +563,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return results_dict.get("membership"), results_dict.get("event_id") return results_dict.get("membership"), results_dict.get("event_id")
@cached(max_entries=500000, iterable=True, prune_unread_entries=False) @cached(max_entries=500000, iterable=True)
async def get_rooms_for_user_with_stream_ordering( async def get_rooms_for_user_with_stream_ordering(
self, user_id: str self, user_id: str
) -> FrozenSet[GetRoomsForUserWithStreamOrdering]: ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
@ -732,25 +734,76 @@ class RoomMemberWorkerStore(EventsWorkerStore):
) )
return frozenset(r.room_id for r in rooms) return frozenset(r.room_id for r in rooms)
@cached( @cached(max_entries=10000)
max_entries=500000, async def does_pair_of_users_share_a_room(
cache_context=True, self, user_id: str, other_user_id: str
iterable=True, ) -> bool:
prune_unread_entries=False, raise NotImplementedError()
@cachedList(
cached_method_name="does_pair_of_users_share_a_room", list_name="other_user_ids"
) )
async def get_users_who_share_room_with_user( async def _do_users_share_a_room(
self, user_id: str, cache_context: _CacheContext self, user_id: str, other_user_ids: Collection[str]
) -> Mapping[str, Optional[bool]]:
"""Return mapping from user ID to whether they share a room with the
given user.
Note: `None` and `False` are equivalent and mean they don't share a
room.
"""
def do_users_share_a_room_txn(
txn: LoggingTransaction, user_ids: Collection[str]
) -> Dict[str, bool]:
clause, args = make_in_list_sql_clause(
self.database_engine, "state_key", user_ids
)
# This query works by fetching both the list of rooms for the target
# user and the set of other users, and then checking if there is any
# overlap.
sql = f"""
SELECT b.state_key
FROM (
SELECT room_id FROM current_state_events
WHERE type = 'm.room.member' AND membership = 'join' AND state_key = ?
) AS a
INNER JOIN (
SELECT room_id, state_key FROM current_state_events
WHERE type = 'm.room.member' AND membership = 'join' AND {clause}
) AS b using (room_id)
LIMIT 1
"""
txn.execute(sql, (user_id, *args))
return {u: True for u, in txn}
to_return = {}
for batch_user_ids in batch_iter(other_user_ids, 1000):
res = await self.db_pool.runInteraction(
"do_users_share_a_room", do_users_share_a_room_txn, batch_user_ids
)
to_return.update(res)
return to_return
async def do_users_share_a_room(
self, user_id: str, other_user_ids: Collection[str]
) -> Set[str]: ) -> Set[str]:
"""Return the set of users who share a room with the first users"""
user_dict = await self._do_users_share_a_room(user_id, other_user_ids)
return {u for u, share_room in user_dict.items() if share_room}
async def get_users_who_share_room_with_user(self, user_id: str) -> Set[str]:
"""Returns the set of users who share a room with `user_id`""" """Returns the set of users who share a room with `user_id`"""
room_ids = await self.get_rooms_for_user( room_ids = await self.get_rooms_for_user(user_id)
user_id, on_invalidate=cache_context.invalidate
)
user_who_share_room = set() user_who_share_room = set()
for room_id in room_ids: for room_id in room_ids:
user_ids = await self.get_users_in_room( user_ids = await self.get_users_in_room(room_id)
room_id, on_invalidate=cache_context.invalidate
)
user_who_share_room.update(user_ids) user_who_share_room.update(user_ids)
return user_who_share_room return user_who_share_room