Faster joins: omit partial rooms from eager syncs until the resync completes (#14870)
* Allow `AbstractSet` in `StrCollection` Or else frozensets are excluded. This will be useful in an upcoming commit where I plan to change a function that accepts `List[str]` to accept `StrCollection` instead. * `rooms_to_exclude` -> `rooms_to_exclude_globally` I am about to make use of this exclusion mechanism to exclude rooms for a specific user and a specific sync. This rename helps to clarify the distinction between the global config and the rooms to exclude for a specific sync. * Better function names for internal sync methods * Track a list of excluded rooms on SyncResultBuilder I plan to feed a list of partially stated rooms for this sync to ignore * Exclude partial state rooms during eager sync using the mechanism established in the previous commit * Track un-partial-state stream in sync tokens So that we can work out which rooms have become fully-stated during a given sync period. * Fix mutation of `@cached` return value This was fouling up a complement test added alongside this PR. Excluding a room would mean the set of forgotten rooms in the cache would be extended. This means that room could be erroneously considered forgotten in the future. Introduced in #12310, Synapse 1.57.0. I don't think this had any user-visible side effects (until now). * SyncResultBuilder: track rooms to force as newly joined Similar plan as before. We've omitted rooms from certain sync responses; now we establish the mechanism to reintroduce them into future syncs. * Read new field, to present rooms as newly joined * Force un-partial-stated rooms to be newly-joined for eager incremental syncs only, provided they're still fully stated * Notify user stream listeners to wake up long polling syncs * Changelog * Typo fix Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> * Unnecessary list cast Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> * Rephrase comment Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> * Another comment Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> * Fixup merge(?) * Poke notifier when receiving un-partial-stated msg over replication * Fixup merge whoops Thanks MV :) Co-authored-by: Mathieu Velen <mathieuv@matrix.org> Co-authored-by: Mathieu Velten <mathieuv@matrix.org> Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>pull/14906/head
parent
5e75771ece
commit
80d44060c9
|
@ -0,0 +1 @@
|
||||||
|
Faster joins: allow non-lazy-loading ("eager") syncs to complete after a partial join by omitting partial state rooms until they become fully stated.
|
|
@ -1868,22 +1868,17 @@ class FederationHandler:
|
||||||
|
|
||||||
async with self._is_partial_state_room_linearizer.queue(room_id):
|
async with self._is_partial_state_room_linearizer.queue(room_id):
|
||||||
logger.info("Clearing partial-state flag for %s", room_id)
|
logger.info("Clearing partial-state flag for %s", room_id)
|
||||||
success = await self.store.clear_partial_state_room(room_id)
|
new_stream_id = await self.store.clear_partial_state_room(room_id)
|
||||||
|
|
||||||
# Poke the notifier so that other workers see the write to
|
if new_stream_id is not None:
|
||||||
# the un-partial-stated rooms stream.
|
|
||||||
self._notifier.notify_replication()
|
|
||||||
|
|
||||||
if success:
|
|
||||||
logger.info("State resync complete for %s", room_id)
|
logger.info("State resync complete for %s", room_id)
|
||||||
self._storage_controllers.state.notify_room_un_partial_stated(
|
self._storage_controllers.state.notify_room_un_partial_stated(
|
||||||
room_id
|
room_id
|
||||||
)
|
)
|
||||||
|
|
||||||
# Poke the notifier so that other workers see the write to
|
await self._notifier.on_un_partial_stated_room(
|
||||||
# the un-partial-stated rooms stream.
|
room_id, new_stream_id
|
||||||
self._notifier.notify_replication()
|
)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# we raced against more events arriving with partial state. Go round
|
# we raced against more events arriving with partial state. Go round
|
||||||
|
|
|
@ -290,7 +290,7 @@ class SyncHandler:
|
||||||
expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
|
expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.rooms_to_exclude = hs.config.server.rooms_to_exclude_from_sync
|
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
|
||||||
|
|
||||||
async def wait_for_sync_for_user(
|
async def wait_for_sync_for_user(
|
||||||
self,
|
self,
|
||||||
|
@ -1340,7 +1340,10 @@ class SyncHandler:
|
||||||
membership_change_events = []
|
membership_change_events = []
|
||||||
if since_token:
|
if since_token:
|
||||||
membership_change_events = await self.store.get_membership_changes_for_user(
|
membership_change_events = await self.store.get_membership_changes_for_user(
|
||||||
user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
|
user_id,
|
||||||
|
since_token.room_key,
|
||||||
|
now_token.room_key,
|
||||||
|
self.rooms_to_exclude_globally,
|
||||||
)
|
)
|
||||||
|
|
||||||
mem_last_change_by_room_id: Dict[str, EventBase] = {}
|
mem_last_change_by_room_id: Dict[str, EventBase] = {}
|
||||||
|
@ -1375,12 +1378,39 @@ class SyncHandler:
|
||||||
else:
|
else:
|
||||||
mutable_joined_room_ids.discard(room_id)
|
mutable_joined_room_ids.discard(room_id)
|
||||||
|
|
||||||
|
# Tweak the set of rooms to return to the client for eager (non-lazy) syncs.
|
||||||
|
mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally)
|
||||||
|
if not sync_config.filter_collection.lazy_load_members():
|
||||||
|
# Non-lazy syncs should never include partially stated rooms.
|
||||||
|
# Exclude all partially stated rooms from this sync.
|
||||||
|
for room_id in mutable_joined_room_ids:
|
||||||
|
if await self.store.is_partial_state_room(room_id):
|
||||||
|
mutable_rooms_to_exclude.add(room_id)
|
||||||
|
|
||||||
|
# Incremental eager syncs should additionally include rooms that
|
||||||
|
# - we are joined to
|
||||||
|
# - are full-stated
|
||||||
|
# - became fully-stated at some point during the sync period
|
||||||
|
# (These rooms will have been omitted during a previous eager sync.)
|
||||||
|
forced_newly_joined_room_ids = set()
|
||||||
|
if since_token and not sync_config.filter_collection.lazy_load_members():
|
||||||
|
un_partial_stated_rooms = (
|
||||||
|
await self.store.get_un_partial_stated_rooms_between(
|
||||||
|
since_token.un_partial_stated_rooms_key,
|
||||||
|
now_token.un_partial_stated_rooms_key,
|
||||||
|
mutable_joined_room_ids,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
for room_id in un_partial_stated_rooms:
|
||||||
|
if not await self.store.is_partial_state_room(room_id):
|
||||||
|
forced_newly_joined_room_ids.add(room_id)
|
||||||
|
|
||||||
# Now we have our list of joined room IDs, exclude as configured and freeze
|
# Now we have our list of joined room IDs, exclude as configured and freeze
|
||||||
joined_room_ids = frozenset(
|
joined_room_ids = frozenset(
|
||||||
(
|
(
|
||||||
room_id
|
room_id
|
||||||
for room_id in mutable_joined_room_ids
|
for room_id in mutable_joined_room_ids
|
||||||
if room_id not in self.rooms_to_exclude
|
if room_id not in mutable_rooms_to_exclude
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1397,6 +1427,8 @@ class SyncHandler:
|
||||||
since_token=since_token,
|
since_token=since_token,
|
||||||
now_token=now_token,
|
now_token=now_token,
|
||||||
joined_room_ids=joined_room_ids,
|
joined_room_ids=joined_room_ids,
|
||||||
|
excluded_room_ids=frozenset(mutable_rooms_to_exclude),
|
||||||
|
forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids),
|
||||||
membership_change_events=membership_change_events,
|
membership_change_events=membership_change_events,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1834,14 +1866,16 @@ class SyncHandler:
|
||||||
# 3. Work out which rooms need reporting in the sync response.
|
# 3. Work out which rooms need reporting in the sync response.
|
||||||
ignored_users = await self.store.ignored_users(user_id)
|
ignored_users = await self.store.ignored_users(user_id)
|
||||||
if since_token:
|
if since_token:
|
||||||
room_changes = await self._get_rooms_changed(
|
room_changes = await self._get_room_changes_for_incremental_sync(
|
||||||
sync_result_builder, ignored_users
|
sync_result_builder, ignored_users
|
||||||
)
|
)
|
||||||
tags_by_room = await self.store.get_updated_tags(
|
tags_by_room = await self.store.get_updated_tags(
|
||||||
user_id, since_token.account_data_key
|
user_id, since_token.account_data_key
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
|
room_changes = await self._get_room_changes_for_initial_sync(
|
||||||
|
sync_result_builder, ignored_users
|
||||||
|
)
|
||||||
tags_by_room = await self.store.get_tags_for_user(user_id)
|
tags_by_room = await self.store.get_tags_for_user(user_id)
|
||||||
|
|
||||||
log_kv({"rooms_changed": len(room_changes.room_entries)})
|
log_kv({"rooms_changed": len(room_changes.room_entries)})
|
||||||
|
@ -1900,7 +1934,7 @@ class SyncHandler:
|
||||||
|
|
||||||
assert since_token
|
assert since_token
|
||||||
|
|
||||||
if membership_change_events:
|
if membership_change_events or sync_result_builder.forced_newly_joined_room_ids:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
stream_id = since_token.room_key.stream
|
stream_id = since_token.room_key.stream
|
||||||
|
@ -1909,7 +1943,7 @@ class SyncHandler:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
async def _get_rooms_changed(
|
async def _get_room_changes_for_incremental_sync(
|
||||||
self,
|
self,
|
||||||
sync_result_builder: "SyncResultBuilder",
|
sync_result_builder: "SyncResultBuilder",
|
||||||
ignored_users: FrozenSet[str],
|
ignored_users: FrozenSet[str],
|
||||||
|
@ -1947,7 +1981,9 @@ class SyncHandler:
|
||||||
for event in membership_change_events:
|
for event in membership_change_events:
|
||||||
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
|
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
|
||||||
|
|
||||||
newly_joined_rooms: List[str] = []
|
newly_joined_rooms: List[str] = list(
|
||||||
|
sync_result_builder.forced_newly_joined_room_ids
|
||||||
|
)
|
||||||
newly_left_rooms: List[str] = []
|
newly_left_rooms: List[str] = []
|
||||||
room_entries: List[RoomSyncResultBuilder] = []
|
room_entries: List[RoomSyncResultBuilder] = []
|
||||||
invited: List[InvitedSyncResult] = []
|
invited: List[InvitedSyncResult] = []
|
||||||
|
@ -2153,7 +2189,7 @@ class SyncHandler:
|
||||||
newly_left_rooms,
|
newly_left_rooms,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _get_all_rooms(
|
async def _get_room_changes_for_initial_sync(
|
||||||
self,
|
self,
|
||||||
sync_result_builder: "SyncResultBuilder",
|
sync_result_builder: "SyncResultBuilder",
|
||||||
ignored_users: FrozenSet[str],
|
ignored_users: FrozenSet[str],
|
||||||
|
@ -2178,7 +2214,7 @@ class SyncHandler:
|
||||||
room_list = await self.store.get_rooms_for_local_user_where_membership_is(
|
room_list = await self.store.get_rooms_for_local_user_where_membership_is(
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
membership_list=Membership.LIST,
|
membership_list=Membership.LIST,
|
||||||
excluded_rooms=self.rooms_to_exclude,
|
excluded_rooms=sync_result_builder.excluded_room_ids,
|
||||||
)
|
)
|
||||||
|
|
||||||
room_entries = []
|
room_entries = []
|
||||||
|
@ -2549,6 +2585,13 @@ class SyncResultBuilder:
|
||||||
since_token: The token supplied by user, or None.
|
since_token: The token supplied by user, or None.
|
||||||
now_token: The token to sync up to.
|
now_token: The token to sync up to.
|
||||||
joined_room_ids: List of rooms the user is joined to
|
joined_room_ids: List of rooms the user is joined to
|
||||||
|
excluded_room_ids: Set of room ids we should omit from the /sync response.
|
||||||
|
forced_newly_joined_room_ids:
|
||||||
|
Rooms that should be presented in the /sync response as if they were
|
||||||
|
newly joined during the sync period, even if that's not the case.
|
||||||
|
(This is useful if the room was previously excluded from a /sync response,
|
||||||
|
and now the client should be made aware of it.)
|
||||||
|
Only used by incremental syncs.
|
||||||
|
|
||||||
# The following mirror the fields in a sync response
|
# The following mirror the fields in a sync response
|
||||||
presence
|
presence
|
||||||
|
@ -2565,6 +2608,8 @@ class SyncResultBuilder:
|
||||||
since_token: Optional[StreamToken]
|
since_token: Optional[StreamToken]
|
||||||
now_token: StreamToken
|
now_token: StreamToken
|
||||||
joined_room_ids: FrozenSet[str]
|
joined_room_ids: FrozenSet[str]
|
||||||
|
excluded_room_ids: FrozenSet[str]
|
||||||
|
forced_newly_joined_room_ids: FrozenSet[str]
|
||||||
membership_change_events: List[EventBase]
|
membership_change_events: List[EventBase]
|
||||||
|
|
||||||
presence: List[UserPresenceState] = attr.Factory(list)
|
presence: List[UserPresenceState] = attr.Factory(list)
|
||||||
|
|
|
@ -314,6 +314,32 @@ class Notifier:
|
||||||
event_entries.append((entry, event.event_id))
|
event_entries.append((entry, event.event_id))
|
||||||
await self.notify_new_room_events(event_entries, max_room_stream_token)
|
await self.notify_new_room_events(event_entries, max_room_stream_token)
|
||||||
|
|
||||||
|
async def on_un_partial_stated_room(
|
||||||
|
self,
|
||||||
|
room_id: str,
|
||||||
|
new_token: int,
|
||||||
|
) -> None:
|
||||||
|
"""Used by the resync background processes to wake up all listeners
|
||||||
|
of this room when it is un-partial-stated.
|
||||||
|
|
||||||
|
It will also notify replication listeners of the change in stream.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Wake up all related user stream notifiers
|
||||||
|
user_streams = self.room_to_user_streams.get(room_id, set())
|
||||||
|
time_now_ms = self.clock.time_msec()
|
||||||
|
for user_stream in user_streams:
|
||||||
|
try:
|
||||||
|
user_stream.notify(
|
||||||
|
StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Failed to notify listener")
|
||||||
|
|
||||||
|
# Poke the replication so that other workers also see the write to
|
||||||
|
# the un-partial-stated rooms stream.
|
||||||
|
self.notify_replication()
|
||||||
|
|
||||||
async def notify_new_room_events(
|
async def notify_new_room_events(
|
||||||
self,
|
self,
|
||||||
event_entries: List[Tuple[_PendingRoomEventEntry, str]],
|
event_entries: List[Tuple[_PendingRoomEventEntry, str]],
|
||||||
|
|
|
@ -260,6 +260,7 @@ class ReplicationDataHandler:
|
||||||
self._state_storage_controller.notify_room_un_partial_stated(
|
self._state_storage_controller.notify_room_un_partial_stated(
|
||||||
row.room_id
|
row.room_id
|
||||||
)
|
)
|
||||||
|
await self.notifier.on_un_partial_stated_room(row.room_id, token)
|
||||||
elif stream_name == UnPartialStatedEventStream.NAME:
|
elif stream_name == UnPartialStatedEventStream.NAME:
|
||||||
for row in rows:
|
for row in rows:
|
||||||
assert isinstance(row, UnPartialStatedEventStreamRow)
|
assert isinstance(row, UnPartialStatedEventStreamRow)
|
||||||
|
|
|
@ -292,6 +292,7 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||||
to_device_key=0,
|
to_device_key=0,
|
||||||
device_list_key=0,
|
device_list_key=0,
|
||||||
groups_key=0,
|
groups_key=0,
|
||||||
|
un_partial_stated_rooms_key=0,
|
||||||
)
|
)
|
||||||
|
|
||||||
return events[:limit], next_token
|
return events[:limit], next_token
|
||||||
|
|
|
@ -26,6 +26,7 @@ from typing import (
|
||||||
Mapping,
|
Mapping,
|
||||||
Optional,
|
Optional,
|
||||||
Sequence,
|
Sequence,
|
||||||
|
Set,
|
||||||
Tuple,
|
Tuple,
|
||||||
Union,
|
Union,
|
||||||
cast,
|
cast,
|
||||||
|
@ -1294,10 +1295,44 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||||
instance_name
|
instance_name
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def get_un_partial_stated_rooms_between(
|
||||||
|
self, last_id: int, current_id: int, room_ids: Collection[str]
|
||||||
|
) -> Set[str]:
|
||||||
|
"""Get all rooms that got un partial stated between `last_id` exclusive and
|
||||||
|
`current_id` inclusive.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The list of room ids.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if last_id == current_id:
|
||||||
|
return set()
|
||||||
|
|
||||||
|
def _get_un_partial_stated_rooms_between_txn(
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
) -> Set[str]:
|
||||||
|
sql = """
|
||||||
|
SELECT DISTINCT room_id FROM un_partial_stated_room_stream
|
||||||
|
WHERE ? < stream_id AND stream_id <= ? AND
|
||||||
|
"""
|
||||||
|
|
||||||
|
clause, args = make_in_list_sql_clause(
|
||||||
|
self.database_engine, "room_id", room_ids
|
||||||
|
)
|
||||||
|
|
||||||
|
txn.execute(sql + clause, [last_id, current_id] + args)
|
||||||
|
|
||||||
|
return {r[0] for r in txn}
|
||||||
|
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"get_un_partial_stated_rooms_between",
|
||||||
|
_get_un_partial_stated_rooms_between_txn,
|
||||||
|
)
|
||||||
|
|
||||||
async def get_un_partial_stated_rooms_from_stream(
|
async def get_un_partial_stated_rooms_from_stream(
|
||||||
self, instance_name: str, last_id: int, current_id: int, limit: int
|
self, instance_name: str, last_id: int, current_id: int, limit: int
|
||||||
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
|
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
|
||||||
"""Get updates for caches replication stream.
|
"""Get updates for un partial stated rooms replication stream.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
instance_name: The writer we want to fetch updates from. Unused
|
instance_name: The writer we want to fetch updates from. Unused
|
||||||
|
@ -2304,16 +2339,16 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
|
||||||
(room_id,),
|
(room_id,),
|
||||||
)
|
)
|
||||||
|
|
||||||
async def clear_partial_state_room(self, room_id: str) -> bool:
|
async def clear_partial_state_room(self, room_id: str) -> Optional[int]:
|
||||||
"""Clears the partial state flag for a room.
|
"""Clears the partial state flag for a room.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
room_id: The room whose partial state flag is to be cleared.
|
room_id: The room whose partial state flag is to be cleared.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
`True` if the partial state flag has been cleared successfully.
|
The corresponding stream id for the un-partial-stated rooms stream.
|
||||||
|
|
||||||
`False` if the partial state flag could not be cleared because the room
|
`None` if the partial state flag could not be cleared because the room
|
||||||
still contains events with partial state.
|
still contains events with partial state.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
@ -2324,7 +2359,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
|
||||||
room_id,
|
room_id,
|
||||||
un_partial_state_room_stream_id,
|
un_partial_state_room_stream_id,
|
||||||
)
|
)
|
||||||
return True
|
return un_partial_state_room_stream_id
|
||||||
except self.db_pool.engine.module.IntegrityError as e:
|
except self.db_pool.engine.module.IntegrityError as e:
|
||||||
# Assume that any `IntegrityError`s are due to partial state events.
|
# Assume that any `IntegrityError`s are due to partial state events.
|
||||||
logger.info(
|
logger.info(
|
||||||
|
@ -2332,7 +2367,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
|
||||||
room_id,
|
room_id,
|
||||||
e,
|
e,
|
||||||
)
|
)
|
||||||
return False
|
return None
|
||||||
|
|
||||||
def _clear_partial_state_room_txn(
|
def _clear_partial_state_room_txn(
|
||||||
self,
|
self,
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
import logging
|
import logging
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
AbstractSet,
|
||||||
Collection,
|
Collection,
|
||||||
Dict,
|
Dict,
|
||||||
FrozenSet,
|
FrozenSet,
|
||||||
|
@ -47,7 +48,13 @@ from synapse.storage.roommember import (
|
||||||
ProfileInfo,
|
ProfileInfo,
|
||||||
RoomsForUser,
|
RoomsForUser,
|
||||||
)
|
)
|
||||||
from synapse.types import JsonDict, PersistedEventPosition, StateMap, get_domain_from_id
|
from synapse.types import (
|
||||||
|
JsonDict,
|
||||||
|
PersistedEventPosition,
|
||||||
|
StateMap,
|
||||||
|
StrCollection,
|
||||||
|
get_domain_from_id,
|
||||||
|
)
|
||||||
from synapse.util.async_helpers import Linearizer
|
from synapse.util.async_helpers import Linearizer
|
||||||
from synapse.util.caches import intern_string
|
from synapse.util.caches import intern_string
|
||||||
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
|
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
|
||||||
|
@ -385,7 +392,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||||
self,
|
self,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
membership_list: Collection[str],
|
membership_list: Collection[str],
|
||||||
excluded_rooms: Optional[List[str]] = None,
|
excluded_rooms: StrCollection = (),
|
||||||
) -> List[RoomsForUser]:
|
) -> List[RoomsForUser]:
|
||||||
"""Get all the rooms for this *local* user where the membership for this user
|
"""Get all the rooms for this *local* user where the membership for this user
|
||||||
matches one in the membership list.
|
matches one in the membership list.
|
||||||
|
@ -412,10 +419,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||||
)
|
)
|
||||||
|
|
||||||
# Now we filter out forgotten and excluded rooms
|
# Now we filter out forgotten and excluded rooms
|
||||||
rooms_to_exclude: Set[str] = await self.get_forgotten_rooms_for_user(user_id)
|
rooms_to_exclude = await self.get_forgotten_rooms_for_user(user_id)
|
||||||
|
|
||||||
if excluded_rooms is not None:
|
if excluded_rooms is not None:
|
||||||
rooms_to_exclude.update(set(excluded_rooms))
|
# Take a copy to avoid mutating the in-cache set
|
||||||
|
rooms_to_exclude = set(rooms_to_exclude)
|
||||||
|
rooms_to_exclude.update(excluded_rooms)
|
||||||
|
|
||||||
return [room for room in rooms if room.room_id not in rooms_to_exclude]
|
return [room for room in rooms if room.room_id not in rooms_to_exclude]
|
||||||
|
|
||||||
|
@ -1169,7 +1178,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||||
return count == 0
|
return count == 0
|
||||||
|
|
||||||
@cached()
|
@cached()
|
||||||
async def get_forgotten_rooms_for_user(self, user_id: str) -> Set[str]:
|
async def get_forgotten_rooms_for_user(self, user_id: str) -> AbstractSet[str]:
|
||||||
"""Gets all rooms the user has forgotten.
|
"""Gets all rooms the user has forgotten.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
|
|
@ -53,11 +53,15 @@ class EventSources:
|
||||||
*(attribute.type(hs) for attribute in attr.fields(_EventSourcesInner))
|
*(attribute.type(hs) for attribute in attr.fields(_EventSourcesInner))
|
||||||
)
|
)
|
||||||
self.store = hs.get_datastores().main
|
self.store = hs.get_datastores().main
|
||||||
|
self._instance_name = hs.get_instance_name()
|
||||||
|
|
||||||
def get_current_token(self) -> StreamToken:
|
def get_current_token(self) -> StreamToken:
|
||||||
push_rules_key = self.store.get_max_push_rules_stream_id()
|
push_rules_key = self.store.get_max_push_rules_stream_id()
|
||||||
to_device_key = self.store.get_to_device_stream_token()
|
to_device_key = self.store.get_to_device_stream_token()
|
||||||
device_list_key = self.store.get_device_stream_token()
|
device_list_key = self.store.get_device_stream_token()
|
||||||
|
un_partial_stated_rooms_key = self.store.get_un_partial_stated_rooms_token(
|
||||||
|
self._instance_name
|
||||||
|
)
|
||||||
|
|
||||||
token = StreamToken(
|
token = StreamToken(
|
||||||
room_key=self.sources.room.get_current_key(),
|
room_key=self.sources.room.get_current_key(),
|
||||||
|
@ -70,6 +74,7 @@ class EventSources:
|
||||||
device_list_key=device_list_key,
|
device_list_key=device_list_key,
|
||||||
# Groups key is unused.
|
# Groups key is unused.
|
||||||
groups_key=0,
|
groups_key=0,
|
||||||
|
un_partial_stated_rooms_key=un_partial_stated_rooms_key,
|
||||||
)
|
)
|
||||||
return token
|
return token
|
||||||
|
|
||||||
|
@ -107,5 +112,6 @@ class EventSources:
|
||||||
to_device_key=0,
|
to_device_key=0,
|
||||||
device_list_key=0,
|
device_list_key=0,
|
||||||
groups_key=0,
|
groups_key=0,
|
||||||
|
un_partial_stated_rooms_key=0,
|
||||||
)
|
)
|
||||||
return token
|
return token
|
||||||
|
|
|
@ -17,6 +17,7 @@ import re
|
||||||
import string
|
import string
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
AbstractSet,
|
||||||
Any,
|
Any,
|
||||||
ClassVar,
|
ClassVar,
|
||||||
Dict,
|
Dict,
|
||||||
|
@ -79,7 +80,7 @@ JsonSerializable = object
|
||||||
|
|
||||||
# Collection[str] that does not include str itself; str being a Sequence[str]
|
# Collection[str] that does not include str itself; str being a Sequence[str]
|
||||||
# is very misleading and results in bugs.
|
# is very misleading and results in bugs.
|
||||||
StrCollection = Union[Tuple[str, ...], List[str], Set[str]]
|
StrCollection = Union[Tuple[str, ...], List[str], AbstractSet[str]]
|
||||||
|
|
||||||
|
|
||||||
# Note that this seems to require inheriting *directly* from Interface in order
|
# Note that this seems to require inheriting *directly* from Interface in order
|
||||||
|
@ -633,6 +634,7 @@ class StreamKeyType:
|
||||||
PUSH_RULES: Final = "push_rules_key"
|
PUSH_RULES: Final = "push_rules_key"
|
||||||
TO_DEVICE: Final = "to_device_key"
|
TO_DEVICE: Final = "to_device_key"
|
||||||
DEVICE_LIST: Final = "device_list_key"
|
DEVICE_LIST: Final = "device_list_key"
|
||||||
|
UN_PARTIAL_STATED_ROOMS = "un_partial_stated_rooms_key"
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
@ -640,7 +642,7 @@ class StreamToken:
|
||||||
"""A collection of keys joined together by underscores in the following
|
"""A collection of keys joined together by underscores in the following
|
||||||
order and which represent the position in their respective streams.
|
order and which represent the position in their respective streams.
|
||||||
|
|
||||||
ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1`
|
ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1_379`
|
||||||
1. `room_key`: `s2633508` which is a `RoomStreamToken`
|
1. `room_key`: `s2633508` which is a `RoomStreamToken`
|
||||||
- `RoomStreamToken`'s can also look like `t426-2633508` or `m56~2.58~3.59`
|
- `RoomStreamToken`'s can also look like `t426-2633508` or `m56~2.58~3.59`
|
||||||
- See the docstring for `RoomStreamToken` for more details.
|
- See the docstring for `RoomStreamToken` for more details.
|
||||||
|
@ -652,12 +654,13 @@ class StreamToken:
|
||||||
7. `to_device_key`: `274711`
|
7. `to_device_key`: `274711`
|
||||||
8. `device_list_key`: `265584`
|
8. `device_list_key`: `265584`
|
||||||
9. `groups_key`: `1` (note that this key is now unused)
|
9. `groups_key`: `1` (note that this key is now unused)
|
||||||
|
10. `un_partial_stated_rooms_key`: `379`
|
||||||
|
|
||||||
You can see how many of these keys correspond to the various
|
You can see how many of these keys correspond to the various
|
||||||
fields in a "/sync" response:
|
fields in a "/sync" response:
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"next_batch": "s12_4_0_1_1_1_1_4_1",
|
"next_batch": "s12_4_0_1_1_1_1_4_1_1",
|
||||||
"presence": {
|
"presence": {
|
||||||
"events": []
|
"events": []
|
||||||
},
|
},
|
||||||
|
@ -669,7 +672,7 @@ class StreamToken:
|
||||||
"!QrZlfIDQLNLdZHqTnt:hs1": {
|
"!QrZlfIDQLNLdZHqTnt:hs1": {
|
||||||
"timeline": {
|
"timeline": {
|
||||||
"events": [],
|
"events": [],
|
||||||
"prev_batch": "s10_4_0_1_1_1_1_4_1",
|
"prev_batch": "s10_4_0_1_1_1_1_4_1_1",
|
||||||
"limited": false
|
"limited": false
|
||||||
},
|
},
|
||||||
"state": {
|
"state": {
|
||||||
|
@ -705,6 +708,7 @@ class StreamToken:
|
||||||
device_list_key: int
|
device_list_key: int
|
||||||
# Note that the groups key is no longer used and may have bogus values.
|
# Note that the groups key is no longer used and may have bogus values.
|
||||||
groups_key: int
|
groups_key: int
|
||||||
|
un_partial_stated_rooms_key: int
|
||||||
|
|
||||||
_SEPARATOR = "_"
|
_SEPARATOR = "_"
|
||||||
START: ClassVar["StreamToken"]
|
START: ClassVar["StreamToken"]
|
||||||
|
@ -743,6 +747,7 @@ class StreamToken:
|
||||||
# serialized so that there will not be confusion in the future
|
# serialized so that there will not be confusion in the future
|
||||||
# if additional tokens are added.
|
# if additional tokens are added.
|
||||||
str(self.groups_key),
|
str(self.groups_key),
|
||||||
|
str(self.un_partial_stated_rooms_key),
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -775,7 +780,7 @@ class StreamToken:
|
||||||
return attr.evolve(self, **{key: new_value})
|
return attr.evolve(self, **{key: new_value})
|
||||||
|
|
||||||
|
|
||||||
StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0)
|
StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0)
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
|
|
@ -1831,7 +1831,7 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
|
||||||
|
|
||||||
def test_topo_token_is_accepted(self) -> None:
|
def test_topo_token_is_accepted(self) -> None:
|
||||||
"""Test Topo Token is accepted."""
|
"""Test Topo Token is accepted."""
|
||||||
token = "t1-0_0_0_0_0_0_0_0_0"
|
token = "t1-0_0_0_0_0_0_0_0_0_0"
|
||||||
channel = self.make_request(
|
channel = self.make_request(
|
||||||
"GET",
|
"GET",
|
||||||
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
|
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
|
||||||
|
@ -1845,7 +1845,7 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
|
||||||
|
|
||||||
def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
|
def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
|
||||||
"""Test that stream token is accepted for forward pagination."""
|
"""Test that stream token is accepted for forward pagination."""
|
||||||
token = "s0_0_0_0_0_0_0_0_0"
|
token = "s0_0_0_0_0_0_0_0_0_0"
|
||||||
channel = self.make_request(
|
channel = self.make_request(
|
||||||
"GET",
|
"GET",
|
||||||
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
|
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
|
||||||
|
|
|
@ -1987,7 +1987,7 @@ class RoomMessageListTestCase(RoomBase):
|
||||||
self.room_id = self.helper.create_room_as(self.user_id)
|
self.room_id = self.helper.create_room_as(self.user_id)
|
||||||
|
|
||||||
def test_topo_token_is_accepted(self) -> None:
|
def test_topo_token_is_accepted(self) -> None:
|
||||||
token = "t1-0_0_0_0_0_0_0_0_0"
|
token = "t1-0_0_0_0_0_0_0_0_0_0"
|
||||||
channel = self.make_request(
|
channel = self.make_request(
|
||||||
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
|
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
|
||||||
)
|
)
|
||||||
|
@ -1998,7 +1998,7 @@ class RoomMessageListTestCase(RoomBase):
|
||||||
self.assertTrue("end" in channel.json_body)
|
self.assertTrue("end" in channel.json_body)
|
||||||
|
|
||||||
def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
|
def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
|
||||||
token = "s0_0_0_0_0_0_0_0_0"
|
token = "s0_0_0_0_0_0_0_0_0_0"
|
||||||
channel = self.make_request(
|
channel = self.make_request(
|
||||||
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
|
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
|
||||||
)
|
)
|
||||||
|
@ -2728,7 +2728,7 @@ class LabelsTestCase(unittest.HomeserverTestCase):
|
||||||
"""Test that we can filter by a label on a /messages request."""
|
"""Test that we can filter by a label on a /messages request."""
|
||||||
self._send_labelled_messages_in_room()
|
self._send_labelled_messages_in_room()
|
||||||
|
|
||||||
token = "s0_0_0_0_0_0_0_0_0"
|
token = "s0_0_0_0_0_0_0_0_0_0"
|
||||||
channel = self.make_request(
|
channel = self.make_request(
|
||||||
"GET",
|
"GET",
|
||||||
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
|
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
|
||||||
|
@ -2745,7 +2745,7 @@ class LabelsTestCase(unittest.HomeserverTestCase):
|
||||||
"""Test that we can filter by the absence of a label on a /messages request."""
|
"""Test that we can filter by the absence of a label on a /messages request."""
|
||||||
self._send_labelled_messages_in_room()
|
self._send_labelled_messages_in_room()
|
||||||
|
|
||||||
token = "s0_0_0_0_0_0_0_0_0"
|
token = "s0_0_0_0_0_0_0_0_0_0"
|
||||||
channel = self.make_request(
|
channel = self.make_request(
|
||||||
"GET",
|
"GET",
|
||||||
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
|
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
|
||||||
|
@ -2768,7 +2768,7 @@ class LabelsTestCase(unittest.HomeserverTestCase):
|
||||||
"""
|
"""
|
||||||
self._send_labelled_messages_in_room()
|
self._send_labelled_messages_in_room()
|
||||||
|
|
||||||
token = "s0_0_0_0_0_0_0_0_0"
|
token = "s0_0_0_0_0_0_0_0_0_0"
|
||||||
channel = self.make_request(
|
channel = self.make_request(
|
||||||
"GET",
|
"GET",
|
||||||
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
|
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
|
||||||
|
|
|
@ -913,7 +913,9 @@ class ExcludeRoomTestCase(unittest.HomeserverTestCase):
|
||||||
|
|
||||||
# We need to manually append the room ID, because we can't know the ID before
|
# We need to manually append the room ID, because we can't know the ID before
|
||||||
# creating the room, and we can't set the config after starting the homeserver.
|
# creating the room, and we can't set the config after starting the homeserver.
|
||||||
self.hs.get_sync_handler().rooms_to_exclude.append(self.excluded_room_id)
|
self.hs.get_sync_handler().rooms_to_exclude_globally.append(
|
||||||
|
self.excluded_room_id
|
||||||
|
)
|
||||||
|
|
||||||
def test_join_leave(self) -> None:
|
def test_join_leave(self) -> None:
|
||||||
"""Tests that rooms are correctly excluded from the 'join' and 'leave' sections of
|
"""Tests that rooms are correctly excluded from the 'join' and 'leave' sections of
|
||||||
|
|
Loading…
Reference in New Issue