From dd2d32dcdb3238735aeeeaff18e5c754b1d50be9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 28 Apr 2021 11:07:47 +0100 Subject: [PATCH 01/21] Add type hints to presence handler (#9885) --- changelog.d/9885.misc | 1 + synapse/handlers/presence.py | 159 ++++++++++++++++++++--------------- 2 files changed, 90 insertions(+), 70 deletions(-) create mode 100644 changelog.d/9885.misc diff --git a/changelog.d/9885.misc b/changelog.d/9885.misc new file mode 100644 index 0000000000..492fccea46 --- /dev/null +++ b/changelog.d/9885.misc @@ -0,0 +1 @@ +Add type hints to presence handler. diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 969c73c1e7..e9f618bb5a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -28,6 +28,7 @@ from bisect import bisect from contextlib import contextmanager from typing import ( TYPE_CHECKING, + Callable, Collection, Dict, FrozenSet, @@ -232,23 +233,23 @@ class BasePresenceHandler(abc.ABC): """ async def update_external_syncs_row( - self, process_id, user_id, is_syncing, sync_time_msec - ): + self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int + ) -> None: """Update the syncing users for an external process as a delta. This is a no-op when presence is handled by a different worker. Args: - process_id (str): An identifier for the process the users are + process_id: An identifier for the process the users are syncing against. This allows synapse to process updates as user start and stop syncing against a given process. - user_id (str): The user who has started or stopped syncing - is_syncing (bool): Whether or not the user is now syncing - sync_time_msec(int): Time in ms when the user was last syncing + user_id: The user who has started or stopped syncing + is_syncing: Whether or not the user is now syncing + sync_time_msec: Time in ms when the user was last syncing """ pass - async def update_external_syncs_clear(self, process_id): + async def update_external_syncs_clear(self, process_id: str) -> None: """Marks all users that had been marked as syncing by a given process as offline. @@ -304,7 +305,7 @@ class _NullContextManager(ContextManager[None]): class WorkerPresenceHandler(BasePresenceHandler): - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.hs = hs @@ -327,7 +328,7 @@ class WorkerPresenceHandler(BasePresenceHandler): # user_id -> last_sync_ms. Lists the users that have stopped syncing but # we haven't notified the presence writer of that yet - self.users_going_offline = {} + self.users_going_offline = {} # type: Dict[str, int] self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs) self._set_state_client = ReplicationPresenceSetState.make_client(hs) @@ -346,24 +347,21 @@ class WorkerPresenceHandler(BasePresenceHandler): self._on_shutdown, ) - def _on_shutdown(self): + def _on_shutdown(self) -> None: if self._presence_enabled: self.hs.get_tcp_replication().send_command( ClearUserSyncsCommand(self.instance_id) ) - def send_user_sync(self, user_id, is_syncing, last_sync_ms): + def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None: if self._presence_enabled: self.hs.get_tcp_replication().send_user_sync( self.instance_id, user_id, is_syncing, last_sync_ms ) - def mark_as_coming_online(self, user_id): + def mark_as_coming_online(self, user_id: str) -> None: """A user has started syncing. Send a UserSync to the presence writer, unless they had recently stopped syncing. - - Args: - user_id (str) """ going_offline = self.users_going_offline.pop(user_id, None) if not going_offline: @@ -371,18 +369,15 @@ class WorkerPresenceHandler(BasePresenceHandler): # were offline self.send_user_sync(user_id, True, self.clock.time_msec()) - def mark_as_going_offline(self, user_id): + def mark_as_going_offline(self, user_id: str) -> None: """A user has stopped syncing. We wait before notifying the presence writer as its likely they'll come back soon. This allows us to avoid sending a stopped syncing immediately followed by a started syncing notification to the presence writer - - Args: - user_id (str) """ self.users_going_offline[user_id] = self.clock.time_msec() - def send_stop_syncing(self): + def send_stop_syncing(self) -> None: """Check if there are any users who have stopped syncing a while ago and haven't come back yet. If there are poke the presence writer about them. """ @@ -430,7 +425,9 @@ class WorkerPresenceHandler(BasePresenceHandler): return _user_syncing() - async def notify_from_replication(self, states, stream_id): + async def notify_from_replication( + self, states: List[UserPresenceState], stream_id: int + ) -> None: parties = await get_interested_parties(self.store, self.presence_router, states) room_ids_to_states, users_to_states = parties @@ -478,7 +475,12 @@ class WorkerPresenceHandler(BasePresenceHandler): if count > 0 ] - async def set_state(self, target_user, state, ignore_status_msg=False): + async def set_state( + self, + target_user: UserID, + state: JsonDict, + ignore_status_msg: bool = False, + ) -> None: """Set the presence state of the user.""" presence = state["presence"] @@ -508,7 +510,7 @@ class WorkerPresenceHandler(BasePresenceHandler): ignore_status_msg=ignore_status_msg, ) - async def bump_presence_active_time(self, user): + async def bump_presence_active_time(self, user: UserID) -> None: """We've seen the user do something that indicates they're interacting with the app. """ @@ -592,8 +594,8 @@ class PresenceHandler(BasePresenceHandler): # we assume that all the sync requests on that process have stopped. # Stored as a dict from process_id to set of user_id, and a dict of # process_id to millisecond timestamp last updated. - self.external_process_to_current_syncs = {} # type: Dict[int, Set[str]] - self.external_process_last_updated_ms = {} # type: Dict[int, int] + self.external_process_to_current_syncs = {} # type: Dict[str, Set[str]] + self.external_process_last_updated_ms = {} # type: Dict[str, int] self.external_sync_linearizer = Linearizer(name="external_sync_linearizer") @@ -633,7 +635,7 @@ class PresenceHandler(BasePresenceHandler): self._event_pos = self.store.get_current_events_token() self._event_processing = False - async def _on_shutdown(self): + async def _on_shutdown(self) -> None: """Gets called when shutting down. This lets us persist any updates that we haven't yet persisted, e.g. updates that only changes some internal timers. This allows changes to persist across startup without having to @@ -662,7 +664,7 @@ class PresenceHandler(BasePresenceHandler): ) logger.info("Finished _on_shutdown") - async def _persist_unpersisted_changes(self): + async def _persist_unpersisted_changes(self) -> None: """We periodically persist the unpersisted changes, as otherwise they may stack up and slow down shutdown times. """ @@ -762,7 +764,7 @@ class PresenceHandler(BasePresenceHandler): states, destinations ) - async def _handle_timeouts(self): + async def _handle_timeouts(self) -> None: """Checks the presence of users that have timed out and updates as appropriate. """ @@ -814,7 +816,7 @@ class PresenceHandler(BasePresenceHandler): return await self._update_states(changes) - async def bump_presence_active_time(self, user): + async def bump_presence_active_time(self, user: UserID) -> None: """We've seen the user do something that indicates they're interacting with the app. """ @@ -911,17 +913,17 @@ class PresenceHandler(BasePresenceHandler): return [] async def update_external_syncs_row( - self, process_id, user_id, is_syncing, sync_time_msec - ): + self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int + ) -> None: """Update the syncing users for an external process as a delta. Args: - process_id (str): An identifier for the process the users are + process_id: An identifier for the process the users are syncing against. This allows synapse to process updates as user start and stop syncing against a given process. - user_id (str): The user who has started or stopped syncing - is_syncing (bool): Whether or not the user is now syncing - sync_time_msec(int): Time in ms when the user was last syncing + user_id: The user who has started or stopped syncing + is_syncing: Whether or not the user is now syncing + sync_time_msec: Time in ms when the user was last syncing """ with (await self.external_sync_linearizer.queue(process_id)): prev_state = await self.current_state_for_user(user_id) @@ -958,7 +960,7 @@ class PresenceHandler(BasePresenceHandler): self.external_process_last_updated_ms[process_id] = self.clock.time_msec() - async def update_external_syncs_clear(self, process_id): + async def update_external_syncs_clear(self, process_id: str) -> None: """Marks all users that had been marked as syncing by a given process as offline. @@ -979,12 +981,12 @@ class PresenceHandler(BasePresenceHandler): ) self.external_process_last_updated_ms.pop(process_id, None) - async def current_state_for_user(self, user_id): + async def current_state_for_user(self, user_id: str) -> UserPresenceState: """Get the current presence state for a user.""" res = await self.current_state_for_users([user_id]) return res[user_id] - async def _persist_and_notify(self, states): + async def _persist_and_notify(self, states: List[UserPresenceState]) -> None: """Persist states in the database, poke the notifier and send to interested remote servers """ @@ -1005,7 +1007,7 @@ class PresenceHandler(BasePresenceHandler): # stream (which is updated by `store.update_presence`). await self.maybe_send_presence_to_interested_destinations(states) - async def incoming_presence(self, origin, content): + async def incoming_presence(self, origin: str, content: JsonDict) -> None: """Called when we receive a `m.presence` EDU from a remote server.""" if not self._presence_enabled: return @@ -1055,7 +1057,9 @@ class PresenceHandler(BasePresenceHandler): federation_presence_counter.inc(len(updates)) await self._update_states(updates) - async def set_state(self, target_user, state, ignore_status_msg=False): + async def set_state( + self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False + ) -> None: """Set the presence state of the user.""" status_msg = state.get("status_msg", None) presence = state["presence"] @@ -1089,7 +1093,7 @@ class PresenceHandler(BasePresenceHandler): await self._update_states([prev_state.copy_and_replace(**new_fields)]) - async def is_visible(self, observed_user, observer_user): + async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool: """Returns whether a user can see another user's presence.""" observer_room_ids = await self.store.get_rooms_for_user( observer_user.to_string() @@ -1144,7 +1148,7 @@ class PresenceHandler(BasePresenceHandler): ) return rows - def notify_new_event(self): + def notify_new_event(self) -> None: """Called when new events have happened. Handles users and servers joining rooms and require being sent presence. """ @@ -1163,7 +1167,7 @@ class PresenceHandler(BasePresenceHandler): run_as_background_process("presence.notify_new_event", _process_presence) - async def _unsafe_process(self): + async def _unsafe_process(self) -> None: # Loop round handling deltas until we're up to date while True: with Measure(self.clock, "presence_delta"): @@ -1188,7 +1192,7 @@ class PresenceHandler(BasePresenceHandler): max_pos ) - async def _handle_state_delta(self, deltas): + async def _handle_state_delta(self, deltas: List[JsonDict]) -> None: """Process current state deltas to find new joins that need to be handled. """ @@ -1311,7 +1315,7 @@ class PresenceHandler(BasePresenceHandler): return [remote_host], states -def should_notify(old_state, new_state): +def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool: """Decides if a presence state change should be sent to interested parties.""" if old_state == new_state: return False @@ -1347,7 +1351,9 @@ def should_notify(old_state, new_state): return False -def format_user_presence_state(state, now, include_user_id=True): +def format_user_presence_state( + state: UserPresenceState, now: int, include_user_id: bool = True +) -> JsonDict: """Convert UserPresenceState to a format that can be sent down to clients and to other servers. @@ -1385,11 +1391,11 @@ class PresenceEventSource: @log_function async def get_new_events( self, - user, - from_key, - room_ids=None, - include_offline=True, - explicit_room_id=None, + user: UserID, + from_key: Optional[int], + room_ids: Optional[List[str]] = None, + include_offline: bool = True, + explicit_room_id: Optional[str] = None, **kwargs, ) -> Tuple[List[UserPresenceState], int]: # The process for getting presence events are: @@ -1594,7 +1600,7 @@ class PresenceEventSource: if update.state != PresenceState.OFFLINE ] - def get_current_key(self): + def get_current_key(self) -> int: return self.store.get_current_presence_token() @cached(num_args=2, cache_context=True) @@ -1654,15 +1660,20 @@ class PresenceEventSource: return users_interested_in -def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now): +def handle_timeouts( + user_states: List[UserPresenceState], + is_mine_fn: Callable[[str], bool], + syncing_user_ids: Set[str], + now: int, +) -> List[UserPresenceState]: """Checks the presence of users that have timed out and updates as appropriate. Args: - user_states(list): List of UserPresenceState's to check. - is_mine_fn (fn): Function that returns if a user_id is ours - syncing_user_ids (set): Set of user_ids with active syncs. - now (int): Current time in ms. + user_states: List of UserPresenceState's to check. + is_mine_fn: Function that returns if a user_id is ours + syncing_user_ids: Set of user_ids with active syncs. + now: Current time in ms. Returns: List of UserPresenceState updates @@ -1679,14 +1690,16 @@ def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now): return list(changes.values()) -def handle_timeout(state, is_mine, syncing_user_ids, now): +def handle_timeout( + state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int +) -> Optional[UserPresenceState]: """Checks the presence of the user to see if any of the timers have elapsed Args: - state (UserPresenceState) - is_mine (bool): Whether the user is ours - syncing_user_ids (set): Set of user_ids with active syncs. - now (int): Current time in ms. + state + is_mine: Whether the user is ours + syncing_user_ids: Set of user_ids with active syncs. + now: Current time in ms. Returns: A UserPresenceState update or None if no update. @@ -1738,23 +1751,29 @@ def handle_timeout(state, is_mine, syncing_user_ids, now): return state if changed else None -def handle_update(prev_state, new_state, is_mine, wheel_timer, now): +def handle_update( + prev_state: UserPresenceState, + new_state: UserPresenceState, + is_mine: bool, + wheel_timer: WheelTimer, + now: int, +) -> Tuple[UserPresenceState, bool, bool]: """Given a presence update: 1. Add any appropriate timers. 2. Check if we should notify anyone. Args: - prev_state (UserPresenceState) - new_state (UserPresenceState) - is_mine (bool): Whether the user is ours - wheel_timer (WheelTimer) - now (int): Time now in ms + prev_state + new_state + is_mine: Whether the user is ours + wheel_timer + now: Time now in ms Returns: 3-tuple: `(new_state, persist_and_notify, federation_ping)` where: - new_state: is the state to actually persist - - persist_and_notify (bool): whether to persist and notify people - - federation_ping (bool): whether we should send a ping over federation + - persist_and_notify: whether to persist and notify people + - federation_ping: whether we should send a ping over federation """ user_id = new_state.user_id From 391bfe9a7b7b22c3dbee9f9e02071fd5c1730ab5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 28 Apr 2021 11:59:28 +0100 Subject: [PATCH 02/21] Reduce memory footprint of caches (#9886) --- changelog.d/9886.misc | 1 + synapse/util/caches/lrucache.py | 77 +++++++++++++++++++++++++-------- 2 files changed, 60 insertions(+), 18 deletions(-) create mode 100644 changelog.d/9886.misc diff --git a/changelog.d/9886.misc b/changelog.d/9886.misc new file mode 100644 index 0000000000..8ff869e659 --- /dev/null +++ b/changelog.d/9886.misc @@ -0,0 +1 @@ +Reduce memory usage of the LRU caches. diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index a21d34fcb4..10b0ec6b75 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -17,8 +17,10 @@ from functools import wraps from typing import ( Any, Callable, + Collection, Generic, Iterable, + List, Optional, Type, TypeVar, @@ -57,13 +59,56 @@ class _Node: __slots__ = ["prev_node", "next_node", "key", "value", "callbacks"] def __init__( - self, prev_node, next_node, key, value, callbacks: Optional[set] = None + self, + prev_node, + next_node, + key, + value, + callbacks: Collection[Callable[[], None]] = (), ): self.prev_node = prev_node self.next_node = next_node self.key = key self.value = value - self.callbacks = callbacks or set() + + # Set of callbacks to run when the node gets deleted. We store as a list + # rather than a set to keep memory usage down (and since we expect few + # entries per node, the performance of checking for duplication in a + # list vs using a set is negligible). + # + # Note that we store this as an optional list to keep the memory + # footprint down. Storing `None` is free as its a singleton, while empty + # lists are 56 bytes (and empty sets are 216 bytes, if we did the naive + # thing and used sets). + self.callbacks = None # type: Optional[List[Callable[[], None]]] + + self.add_callbacks(callbacks) + + def add_callbacks(self, callbacks: Collection[Callable[[], None]]) -> None: + """Add to stored list of callbacks, removing duplicates.""" + + if not callbacks: + return + + if not self.callbacks: + self.callbacks = [] + + for callback in callbacks: + if callback not in self.callbacks: + self.callbacks.append(callback) + + def run_and_clear_callbacks(self) -> None: + """Run all callbacks and clear the stored list of callbacks. Used when + the node is being deleted. + """ + + if not self.callbacks: + return + + for callback in self.callbacks: + callback() + + self.callbacks = None class LruCache(Generic[KT, VT]): @@ -177,10 +222,10 @@ class LruCache(Generic[KT, VT]): self.len = synchronized(cache_len) - def add_node(key, value, callbacks: Optional[set] = None): + def add_node(key, value, callbacks: Collection[Callable[[], None]] = ()): prev_node = list_root next_node = prev_node.next_node - node = _Node(prev_node, next_node, key, value, callbacks or set()) + node = _Node(prev_node, next_node, key, value, callbacks) prev_node.next_node = node next_node.prev_node = node cache[key] = node @@ -211,16 +256,15 @@ class LruCache(Generic[KT, VT]): deleted_len = size_callback(node.value) cached_cache_len[0] -= deleted_len - for cb in node.callbacks: - cb() - node.callbacks.clear() + node.run_and_clear_callbacks() + return deleted_len @overload def cache_get( key: KT, default: Literal[None] = None, - callbacks: Iterable[Callable[[], None]] = ..., + callbacks: Collection[Callable[[], None]] = ..., update_metrics: bool = ..., ) -> Optional[VT]: ... @@ -229,7 +273,7 @@ class LruCache(Generic[KT, VT]): def cache_get( key: KT, default: T, - callbacks: Iterable[Callable[[], None]] = ..., + callbacks: Collection[Callable[[], None]] = ..., update_metrics: bool = ..., ) -> Union[T, VT]: ... @@ -238,13 +282,13 @@ class LruCache(Generic[KT, VT]): def cache_get( key: KT, default: Optional[T] = None, - callbacks: Iterable[Callable[[], None]] = (), + callbacks: Collection[Callable[[], None]] = (), update_metrics: bool = True, ): node = cache.get(key, None) if node is not None: move_node_to_front(node) - node.callbacks.update(callbacks) + node.add_callbacks(callbacks) if update_metrics and metrics: metrics.inc_hits() return node.value @@ -260,10 +304,8 @@ class LruCache(Generic[KT, VT]): # We sometimes store large objects, e.g. dicts, which cause # the inequality check to take a long time. So let's only do # the check if we have some callbacks to call. - if node.callbacks and value != node.value: - for cb in node.callbacks: - cb() - node.callbacks.clear() + if value != node.value: + node.run_and_clear_callbacks() # We don't bother to protect this by value != node.value as # generally size_callback will be cheap compared with equality @@ -273,7 +315,7 @@ class LruCache(Generic[KT, VT]): cached_cache_len[0] -= size_callback(node.value) cached_cache_len[0] += size_callback(value) - node.callbacks.update(callbacks) + node.add_callbacks(callbacks) move_node_to_front(node) node.value = value @@ -326,8 +368,7 @@ class LruCache(Generic[KT, VT]): list_root.next_node = list_root list_root.prev_node = list_root for node in cache.values(): - for cb in node.callbacks: - cb() + node.run_and_clear_callbacks() cache.clear() if size_callback: cached_cache_len[0] = 0 From 10a08ab88ad423bfca86983808c47f34a601ec9c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 28 Apr 2021 07:44:52 -0400 Subject: [PATCH 03/21] Use the parent's logging context name for runWithConnection. (#9895) This fixes a regression where the logging context for runWithConnection was reported as runWithConnection instead of the connection name, e.g. "POST-XYZ". --- changelog.d/9895.bugfix | 1 + synapse/storage/database.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) create mode 100644 changelog.d/9895.bugfix diff --git a/changelog.d/9895.bugfix b/changelog.d/9895.bugfix new file mode 100644 index 0000000000..1053f975bf --- /dev/null +++ b/changelog.d/9895.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in v1.32.0 where the associated connection was improperly logged for SQL logging statements. diff --git a/synapse/storage/database.py b/synapse/storage/database.py index bd39c095af..a761ad603b 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -715,7 +715,9 @@ class DatabasePool: # pool). assert not self.engine.in_transaction(conn) - with LoggingContext("runWithConnection", parent_context) as context: + with LoggingContext( + str(curr_context), parent_context=parent_context + ) as context: sched_duration_sec = monotonic_time() - start_time sql_scheduling_timer.observe(sched_duration_sec) context.add_database_scheduled(sched_duration_sec) From 0085dc5abc614579f3adbd9e6d2cbdd41facef00 Mon Sep 17 00:00:00 2001 From: ThibF Date: Thu, 29 Apr 2021 09:31:45 +0000 Subject: [PATCH 04/21] Delete room endpoint (#9889) Support the delete of a room through DELETE request and mark previous request as deprecated through documentation. Signed-off-by: Thibault Ferrante --- changelog.d/9889.feature | 1 + changelog.d/9889.removal | 1 + docs/admin_api/rooms.md | 11 ++- synapse/rest/admin/rooms.py | 134 +++++++++++++++++++++++----------- tests/rest/admin/test_room.py | 45 +++++++----- 5 files changed, 128 insertions(+), 64 deletions(-) create mode 100644 changelog.d/9889.feature create mode 100644 changelog.d/9889.removal diff --git a/changelog.d/9889.feature b/changelog.d/9889.feature new file mode 100644 index 0000000000..74d46f222e --- /dev/null +++ b/changelog.d/9889.feature @@ -0,0 +1 @@ +Add support for `DELETE /_synapse/admin/v1/rooms/`. \ No newline at end of file diff --git a/changelog.d/9889.removal b/changelog.d/9889.removal new file mode 100644 index 0000000000..398b9e129b --- /dev/null +++ b/changelog.d/9889.removal @@ -0,0 +1 @@ +Mark as deprecated `POST /_synapse/admin/v1/rooms//delete`. \ No newline at end of file diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md index bc737b30f5..01d3882426 100644 --- a/docs/admin_api/rooms.md +++ b/docs/admin_api/rooms.md @@ -427,7 +427,7 @@ the new room. Users on other servers will be unaffected. The API is: ``` -POST /_synapse/admin/v1/rooms//delete +DELETE /_synapse/admin/v1/rooms/ ``` with a body of: @@ -528,6 +528,15 @@ You will have to manually handle, if you so choose, the following: * Users that would have been booted from the room (and will have been force-joined to the Content Violation room). * Removal of the Content Violation room if desired. +## Deprecated endpoint + +The previous deprecated API will be removed in a future release, it was: + +``` +POST /_synapse/admin/v1/rooms//delete +``` + +It behaves the same way than the current endpoint except the path and the method. # Make Room Admin API diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index d0cf121743..f289ffe3d0 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -37,9 +37,11 @@ from synapse.types import JsonDict, RoomAlias, RoomID, UserID, create_requester from synapse.util import json_decoder if TYPE_CHECKING: + from synapse.api.auth import Auth + from synapse.handlers.pagination import PaginationHandler + from synapse.handlers.room import RoomShutdownHandler from synapse.server import HomeServer - logger = logging.getLogger(__name__) @@ -146,50 +148,14 @@ class DeleteRoomRestServlet(RestServlet): async def on_POST( self, request: SynapseRequest, room_id: str ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request) - await assert_user_is_admin(self.auth, requester.user) - - content = parse_json_object_from_request(request) - - block = content.get("block", False) - if not isinstance(block, bool): - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "Param 'block' must be a boolean, if given", - Codes.BAD_JSON, - ) - - purge = content.get("purge", True) - if not isinstance(purge, bool): - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "Param 'purge' must be a boolean, if given", - Codes.BAD_JSON, - ) - - force_purge = content.get("force_purge", False) - if not isinstance(force_purge, bool): - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "Param 'force_purge' must be a boolean, if given", - Codes.BAD_JSON, - ) - - ret = await self.room_shutdown_handler.shutdown_room( - room_id=room_id, - new_room_user_id=content.get("new_room_user_id"), - new_room_name=content.get("room_name"), - message=content.get("message"), - requester_user_id=requester.user.to_string(), - block=block, + return await _delete_room( + request, + room_id, + self.auth, + self.room_shutdown_handler, + self.pagination_handler, ) - # Purge room - if purge: - await self.pagination_handler.purge_room(room_id, force=force_purge) - - return (200, ret) - class ListRoomRestServlet(RestServlet): """ @@ -282,7 +248,22 @@ class ListRoomRestServlet(RestServlet): class RoomRestServlet(RestServlet): - """Get room details. + """Manage a room. + + On GET : Get details of a room. + + On DELETE : Delete a room from server. + + It is a combination and improvement of shutdown and purge room. + + Shuts down a room by removing all local users from the room. + Blocking all future invites and joins to the room is optional. + + If desired any local aliases will be repointed to a new room + created by `new_room_user_id` and kicked users will be auto- + joined to the new room. + + If 'purge' is true, it will remove all traces of a room from the database. TODO: Add on_POST to allow room creation without joining the room """ @@ -293,6 +274,8 @@ class RoomRestServlet(RestServlet): self.hs = hs self.auth = hs.get_auth() self.store = hs.get_datastore() + self.room_shutdown_handler = hs.get_room_shutdown_handler() + self.pagination_handler = hs.get_pagination_handler() async def on_GET( self, request: SynapseRequest, room_id: str @@ -308,6 +291,17 @@ class RoomRestServlet(RestServlet): return (200, ret) + async def on_DELETE( + self, request: SynapseRequest, room_id: str + ) -> Tuple[int, JsonDict]: + return await _delete_room( + request, + room_id, + self.auth, + self.room_shutdown_handler, + self.pagination_handler, + ) + class RoomMembersRestServlet(RestServlet): """ @@ -694,3 +688,55 @@ class RoomEventContextServlet(RestServlet): ) return 200, results + + +async def _delete_room( + request: SynapseRequest, + room_id: str, + auth: "Auth", + room_shutdown_handler: "RoomShutdownHandler", + pagination_handler: "PaginationHandler", +) -> Tuple[int, JsonDict]: + requester = await auth.get_user_by_req(request) + await assert_user_is_admin(auth, requester.user) + + content = parse_json_object_from_request(request) + + block = content.get("block", False) + if not isinstance(block, bool): + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Param 'block' must be a boolean, if given", + Codes.BAD_JSON, + ) + + purge = content.get("purge", True) + if not isinstance(purge, bool): + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Param 'purge' must be a boolean, if given", + Codes.BAD_JSON, + ) + + force_purge = content.get("force_purge", False) + if not isinstance(force_purge, bool): + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Param 'force_purge' must be a boolean, if given", + Codes.BAD_JSON, + ) + + ret = await room_shutdown_handler.shutdown_room( + room_id=room_id, + new_room_user_id=content.get("new_room_user_id"), + new_room_name=content.get("room_name"), + message=content.get("message"), + requester_user_id=requester.user.to_string(), + block=block, + ) + + # Purge room + if purge: + await pagination_handler.purge_room(room_id, force=force_purge) + + return (200, ret) diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index 6b84188120..ee071c2477 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -17,6 +17,8 @@ import urllib.parse from typing import List, Optional from unittest.mock import Mock +from parameterized import parameterized_class + import synapse.rest.admin from synapse.api.constants import EventTypes, Membership from synapse.api.errors import Codes @@ -144,6 +146,13 @@ class ShutdownRoomTestCase(unittest.HomeserverTestCase): ) +@parameterized_class( + ("method", "url_template"), + [ + ("POST", "/_synapse/admin/v1/rooms/%s/delete"), + ("DELETE", "/_synapse/admin/v1/rooms/%s"), + ], +) class DeleteRoomTestCase(unittest.HomeserverTestCase): servlets = [ synapse.rest.admin.register_servlets, @@ -175,7 +184,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): self.room_id = self.helper.create_room_as( self.other_user, tok=self.other_user_tok ) - self.url = "/_synapse/admin/v1/rooms/%s/delete" % self.room_id + self.url = self.url_template % self.room_id def test_requester_is_no_admin(self): """ @@ -183,7 +192,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): """ channel = self.make_request( - "POST", + self.method, self.url, json.dumps({}), access_token=self.other_user_tok, @@ -196,10 +205,10 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): """ Check that unknown rooms/server return error 404. """ - url = "/_synapse/admin/v1/rooms/!unknown:test/delete" + url = self.url_template % "!unknown:test" channel = self.make_request( - "POST", + self.method, url, json.dumps({}), access_token=self.admin_user_tok, @@ -212,10 +221,10 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): """ Check that invalid room names, return an error 400. """ - url = "/_synapse/admin/v1/rooms/invalidroom/delete" + url = self.url_template % "invalidroom" channel = self.make_request( - "POST", + self.method, url, json.dumps({}), access_token=self.admin_user_tok, @@ -234,7 +243,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): body = json.dumps({"new_room_user_id": "@unknown:test"}) channel = self.make_request( - "POST", + self.method, self.url, content=body.encode(encoding="utf_8"), access_token=self.admin_user_tok, @@ -253,7 +262,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): body = json.dumps({"new_room_user_id": "@not:exist.bla"}) channel = self.make_request( - "POST", + self.method, self.url, content=body.encode(encoding="utf_8"), access_token=self.admin_user_tok, @@ -272,7 +281,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): body = json.dumps({"block": "NotBool"}) channel = self.make_request( - "POST", + self.method, self.url, content=body.encode(encoding="utf_8"), access_token=self.admin_user_tok, @@ -288,7 +297,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): body = json.dumps({"purge": "NotBool"}) channel = self.make_request( - "POST", + self.method, self.url, content=body.encode(encoding="utf_8"), access_token=self.admin_user_tok, @@ -314,7 +323,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): body = json.dumps({"block": True, "purge": True}) channel = self.make_request( - "POST", + self.method, self.url.encode("ascii"), content=body.encode(encoding="utf_8"), access_token=self.admin_user_tok, @@ -347,7 +356,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): body = json.dumps({"block": False, "purge": True}) channel = self.make_request( - "POST", + self.method, self.url.encode("ascii"), content=body.encode(encoding="utf_8"), access_token=self.admin_user_tok, @@ -381,7 +390,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): body = json.dumps({"block": False, "purge": False}) channel = self.make_request( - "POST", + self.method, self.url.encode("ascii"), content=body.encode(encoding="utf_8"), access_token=self.admin_user_tok, @@ -426,10 +435,9 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): self._is_member(room_id=self.room_id, user_id=self.other_user) # Test that the admin can still send shutdown - url = "/_synapse/admin/v1/rooms/%s/delete" % self.room_id channel = self.make_request( - "POST", - url.encode("ascii"), + self.method, + self.url, json.dumps({"new_room_user_id": self.admin_user}), access_token=self.admin_user_tok, ) @@ -473,10 +481,9 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): self._is_member(room_id=self.room_id, user_id=self.other_user) # Test that the admin can still send shutdown - url = "/_synapse/admin/v1/rooms/%s/delete" % self.room_id channel = self.make_request( - "POST", - url.encode("ascii"), + self.method, + self.url, json.dumps({"new_room_user_id": self.admin_user}), access_token=self.admin_user_tok, ) From e9444cc74d73f6367dedcfe406e3f1d9ff3d5414 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 29 Apr 2021 11:45:37 +0100 Subject: [PATCH 05/21] 1.33.0rc2 --- CHANGES.md | 9 +++++++++ changelog.d/9900.bugfix | 1 - synapse/__init__.py | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) delete mode 100644 changelog.d/9900.bugfix diff --git a/CHANGES.md b/CHANGES.md index 9a41607679..629d4a180d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,12 @@ +Synapse 1.33.0rc2 (2021-04-29) +============================== + +Bugfixes +-------- + +- Fix tight loop handling presence replication when using workers. Introduced in v1.33.0rc1. ([\#9900](https://github.com/matrix-org/synapse/issues/9900)) + + Synapse 1.33.0rc1 (2021-04-28) ============================== diff --git a/changelog.d/9900.bugfix b/changelog.d/9900.bugfix deleted file mode 100644 index a8470fca3f..0000000000 --- a/changelog.d/9900.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix tight loop handling presence replication when using workers. Introduced in v1.33.0rc1. diff --git a/synapse/__init__.py b/synapse/__init__.py index 5bbaa62de2..319c52be2c 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -47,7 +47,7 @@ try: except ImportError: pass -__version__ = "1.33.0rc1" +__version__ = "1.33.0rc2" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when From bb4b11846f3bdd539a1671eb8f1db8ee1a0bf57a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 29 Apr 2021 07:17:28 -0400 Subject: [PATCH 06/21] Add missing type hints to handlers and fix a Spam Checker type hint. (#9896) The user_may_create_room_alias method on spam checkers declared the room_alias parameter as a str when in reality it is passed a RoomAlias object. --- changelog.d/9896.bugfix | 1 + changelog.d/9896.misc | 1 + synapse/events/spamcheck.py | 5 ++- synapse/handlers/directory.py | 59 ++++++++++++++++------------ synapse/handlers/identity.py | 9 +++-- synapse/handlers/message.py | 24 +++++++---- synapse/handlers/room_member.py | 2 +- synapse/handlers/ui_auth/checkers.py | 35 +++++++++-------- 8 files changed, 82 insertions(+), 54 deletions(-) create mode 100644 changelog.d/9896.bugfix create mode 100644 changelog.d/9896.misc diff --git a/changelog.d/9896.bugfix b/changelog.d/9896.bugfix new file mode 100644 index 0000000000..07a8e87f9f --- /dev/null +++ b/changelog.d/9896.bugfix @@ -0,0 +1 @@ +Correct the type hint for the `user_may_create_room_alias` method of spam checkers. It is provided a `RoomAlias`, not a `str`. diff --git a/changelog.d/9896.misc b/changelog.d/9896.misc new file mode 100644 index 0000000000..e41c7d1f02 --- /dev/null +++ b/changelog.d/9896.misc @@ -0,0 +1 @@ +Add type hints to the `synapse.handlers` module. diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index 7118d5f52d..d5fa195094 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -20,6 +20,7 @@ from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Tuple, from synapse.rest.media.v1._base import FileInfo from synapse.rest.media.v1.media_storage import ReadableFileWrapper from synapse.spam_checker_api import RegistrationBehaviour +from synapse.types import RoomAlias from synapse.util.async_helpers import maybe_awaitable if TYPE_CHECKING: @@ -113,7 +114,9 @@ class SpamChecker: return True - async def user_may_create_room_alias(self, userid: str, room_alias: str) -> bool: + async def user_may_create_room_alias( + self, userid: str, room_alias: RoomAlias + ) -> bool: """Checks if a given user may create a room alias If this method returns false, the association request will be rejected. diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 90932316f3..de1b14cde3 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -14,7 +14,7 @@ import logging import string -from typing import Iterable, List, Optional +from typing import TYPE_CHECKING, Iterable, List, Optional from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes from synapse.api.errors import ( @@ -27,15 +27,19 @@ from synapse.api.errors import ( SynapseError, ) from synapse.appservice import ApplicationService -from synapse.types import Requester, RoomAlias, UserID, get_domain_from_id +from synapse.storage.databases.main.directory import RoomAliasMapping +from synapse.types import JsonDict, Requester, RoomAlias, UserID, get_domain_from_id from ._base import BaseHandler +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) class DirectoryHandler(BaseHandler): - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.state = hs.get_state_handler() @@ -60,7 +64,7 @@ class DirectoryHandler(BaseHandler): room_id: str, servers: Optional[Iterable[str]] = None, creator: Optional[str] = None, - ): + ) -> None: # general association creation for both human users and app services for wchar in string.whitespace: @@ -104,8 +108,9 @@ class DirectoryHandler(BaseHandler): """ user_id = requester.user.to_string() + room_alias_str = room_alias.to_string() - if len(room_alias.to_string()) > MAX_ALIAS_LENGTH: + if len(room_alias_str) > MAX_ALIAS_LENGTH: raise SynapseError( 400, "Can't create aliases longer than %s characters" % MAX_ALIAS_LENGTH, @@ -114,7 +119,7 @@ class DirectoryHandler(BaseHandler): service = requester.app_service if service: - if not service.is_interested_in_alias(room_alias.to_string()): + if not service.is_interested_in_alias(room_alias_str): raise SynapseError( 400, "This application service has not reserved this kind of alias.", @@ -138,7 +143,7 @@ class DirectoryHandler(BaseHandler): raise AuthError(403, "This user is not permitted to create this alias") if not self.config.is_alias_creation_allowed( - user_id, room_id, room_alias.to_string() + user_id, room_id, room_alias_str ): # Lets just return a generic message, as there may be all sorts of # reasons why we said no. TODO: Allow configurable error messages @@ -211,7 +216,7 @@ class DirectoryHandler(BaseHandler): async def delete_appservice_association( self, service: ApplicationService, room_alias: RoomAlias - ): + ) -> None: if not service.is_interested_in_alias(room_alias.to_string()): raise SynapseError( 400, @@ -220,7 +225,7 @@ class DirectoryHandler(BaseHandler): ) await self._delete_association(room_alias) - async def _delete_association(self, room_alias: RoomAlias): + async def _delete_association(self, room_alias: RoomAlias) -> str: if not self.hs.is_mine(room_alias): raise SynapseError(400, "Room alias must be local") @@ -228,17 +233,19 @@ class DirectoryHandler(BaseHandler): return room_id - async def get_association(self, room_alias: RoomAlias): + async def get_association(self, room_alias: RoomAlias) -> JsonDict: room_id = None if self.hs.is_mine(room_alias): - result = await self.get_association_from_room_alias(room_alias) + result = await self.get_association_from_room_alias( + room_alias + ) # type: Optional[RoomAliasMapping] if result: room_id = result.room_id servers = result.servers else: try: - result = await self.federation.make_query( + fed_result = await self.federation.make_query( destination=room_alias.domain, query_type="directory", args={"room_alias": room_alias.to_string()}, @@ -248,13 +255,13 @@ class DirectoryHandler(BaseHandler): except CodeMessageException as e: logging.warning("Error retrieving alias") if e.code == 404: - result = None + fed_result = None else: raise - if result and "room_id" in result and "servers" in result: - room_id = result["room_id"] - servers = result["servers"] + if fed_result and "room_id" in fed_result and "servers" in fed_result: + room_id = fed_result["room_id"] + servers = fed_result["servers"] if not room_id: raise SynapseError( @@ -275,7 +282,7 @@ class DirectoryHandler(BaseHandler): return {"room_id": room_id, "servers": servers} - async def on_directory_query(self, args): + async def on_directory_query(self, args: JsonDict) -> JsonDict: room_alias = RoomAlias.from_string(args["room_alias"]) if not self.hs.is_mine(room_alias): raise SynapseError(400, "Room Alias is not hosted on this homeserver") @@ -293,7 +300,7 @@ class DirectoryHandler(BaseHandler): async def _update_canonical_alias( self, requester: Requester, user_id: str, room_id: str, room_alias: RoomAlias - ): + ) -> None: """ Send an updated canonical alias event if the removed alias was set as the canonical alias or listed in the alt_aliases field. @@ -344,7 +351,9 @@ class DirectoryHandler(BaseHandler): ratelimit=False, ) - async def get_association_from_room_alias(self, room_alias: RoomAlias): + async def get_association_from_room_alias( + self, room_alias: RoomAlias + ) -> Optional[RoomAliasMapping]: result = await self.store.get_association_from_room_alias(room_alias) if not result: # Query AS to see if it exists @@ -372,7 +381,7 @@ class DirectoryHandler(BaseHandler): # either no interested services, or no service with an exclusive lock return True - async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str): + async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str) -> bool: """Determine whether a user can delete an alias. One of the following must be true: @@ -394,14 +403,13 @@ class DirectoryHandler(BaseHandler): if not room_id: return False - res = await self.auth.check_can_change_room_list( + return await self.auth.check_can_change_room_list( room_id, UserID.from_string(user_id) ) - return res async def edit_published_room_list( self, requester: Requester, room_id: str, visibility: str - ): + ) -> None: """Edit the entry of the room in the published room list. requester @@ -469,7 +477,7 @@ class DirectoryHandler(BaseHandler): async def edit_published_appservice_room_list( self, appservice_id: str, network_id: str, room_id: str, visibility: str - ): + ) -> None: """Add or remove a room from the appservice/network specific public room list. @@ -499,5 +507,4 @@ class DirectoryHandler(BaseHandler): room_id, requester.user.to_string() ) - aliases = await self.store.get_aliases_for_room(room_id) - return aliases + return await self.store.get_aliases_for_room(room_id) diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 0b3b1fadb5..33d16fbf9c 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -17,7 +17,7 @@ """Utilities for interacting with Identity Servers""" import logging import urllib.parse -from typing import Awaitable, Callable, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Tuple from synapse.api.errors import ( CodeMessageException, @@ -41,13 +41,16 @@ from synapse.util.stringutils import ( from ._base import BaseHandler +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) id_server_scheme = "https://" class IdentityHandler(BaseHandler): - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) # An HTTP client for contacting trusted URLs. @@ -80,7 +83,7 @@ class IdentityHandler(BaseHandler): request: SynapseRequest, medium: str, address: str, - ): + ) -> None: """Used to ratelimit requests to `/requestToken` by IP and address. Args: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index ec8eb21674..49f8aa25ea 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -15,7 +15,7 @@ # limitations under the License. import logging import random -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple from canonicaljson import encode_canonical_json @@ -66,7 +66,7 @@ logger = logging.getLogger(__name__) class MessageHandler: """Contains some read only APIs to get state about a room""" - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.auth = hs.get_auth() self.clock = hs.get_clock() self.state = hs.get_state_handler() @@ -91,7 +91,7 @@ class MessageHandler: room_id: str, event_type: str, state_key: str, - ) -> dict: + ) -> Optional[EventBase]: """Get data from a room. Args: @@ -115,6 +115,10 @@ class MessageHandler: data = await self.state.get_current_state(room_id, event_type, state_key) elif membership == Membership.LEAVE: key = (event_type, state_key) + # If the membership is not JOIN, then the event ID should exist. + assert ( + membership_event_id is not None + ), "check_user_in_room_or_world_readable returned invalid data" room_state = await self.state_store.get_state_for_events( [membership_event_id], StateFilter.from_types([key]) ) @@ -186,10 +190,12 @@ class MessageHandler: event = last_events[0] if visible_events: - room_state = await self.state_store.get_state_for_events( + room_state_events = await self.state_store.get_state_for_events( [event.event_id], state_filter=state_filter ) - room_state = room_state[event.event_id] + room_state = room_state_events[ + event.event_id + ] # type: Mapping[Any, EventBase] else: raise AuthError( 403, @@ -210,10 +216,14 @@ class MessageHandler: ) room_state = await self.store.get_events(state_ids.values()) elif membership == Membership.LEAVE: - room_state = await self.state_store.get_state_for_events( + # If the membership is not JOIN, then the event ID should exist. + assert ( + membership_event_id is not None + ), "check_user_in_room_or_world_readable returned invalid data" + room_state_events = await self.state_store.get_state_for_events( [membership_event_id], state_filter=state_filter ) - room_state = room_state[membership_event_id] + room_state = room_state_events[membership_event_id] now = self.clock.time_msec() events = await self._event_serializer.serialize_events( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 2c5bada1d8..20700fc5a8 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -1044,7 +1044,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): class RoomMemberMasterHandler(RoomMemberHandler): - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.distributor = hs.get_distributor() diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py index 0eeb7c03f2..5414ce77d8 100644 --- a/synapse/handlers/ui_auth/checkers.py +++ b/synapse/handlers/ui_auth/checkers.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import Any +from typing import TYPE_CHECKING, Any from twisted.web.client import PartialDownloadError @@ -22,13 +22,16 @@ from synapse.api.errors import Codes, LoginError, SynapseError from synapse.config.emailconfig import ThreepidBehaviour from synapse.util import json_decoder +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) class UserInteractiveAuthChecker: """Abstract base class for an interactive auth checker""" - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): pass def is_enabled(self) -> bool: @@ -57,10 +60,10 @@ class UserInteractiveAuthChecker: class DummyAuthChecker(UserInteractiveAuthChecker): AUTH_TYPE = LoginType.DUMMY - def is_enabled(self): + def is_enabled(self) -> bool: return True - async def check_auth(self, authdict, clientip): + async def check_auth(self, authdict: dict, clientip: str) -> Any: return True @@ -70,24 +73,24 @@ class TermsAuthChecker(UserInteractiveAuthChecker): def is_enabled(self): return True - async def check_auth(self, authdict, clientip): + async def check_auth(self, authdict: dict, clientip: str) -> Any: return True class RecaptchaAuthChecker(UserInteractiveAuthChecker): AUTH_TYPE = LoginType.RECAPTCHA - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self._enabled = bool(hs.config.recaptcha_private_key) self._http_client = hs.get_proxied_http_client() self._url = hs.config.recaptcha_siteverify_api self._secret = hs.config.recaptcha_private_key - def is_enabled(self): + def is_enabled(self) -> bool: return self._enabled - async def check_auth(self, authdict, clientip): + async def check_auth(self, authdict: dict, clientip: str) -> Any: try: user_response = authdict["response"] except KeyError: @@ -132,11 +135,11 @@ class RecaptchaAuthChecker(UserInteractiveAuthChecker): class _BaseThreepidAuthChecker: - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.hs = hs self.store = hs.get_datastore() - async def _check_threepid(self, medium, authdict): + async def _check_threepid(self, medium: str, authdict: dict) -> dict: if "threepid_creds" not in authdict: raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM) @@ -206,31 +209,31 @@ class _BaseThreepidAuthChecker: class EmailIdentityAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker): AUTH_TYPE = LoginType.EMAIL_IDENTITY - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): UserInteractiveAuthChecker.__init__(self, hs) _BaseThreepidAuthChecker.__init__(self, hs) - def is_enabled(self): + def is_enabled(self) -> bool: return self.hs.config.threepid_behaviour_email in ( ThreepidBehaviour.REMOTE, ThreepidBehaviour.LOCAL, ) - async def check_auth(self, authdict, clientip): + async def check_auth(self, authdict: dict, clientip: str) -> Any: return await self._check_threepid("email", authdict) class MsisdnAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker): AUTH_TYPE = LoginType.MSISDN - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): UserInteractiveAuthChecker.__init__(self, hs) _BaseThreepidAuthChecker.__init__(self, hs) - def is_enabled(self): + def is_enabled(self) -> bool: return bool(self.hs.config.account_threepid_delegate_msisdn) - async def check_auth(self, authdict, clientip): + async def check_auth(self, authdict: dict, clientip: str) -> Any: return await self._check_threepid("msisdn", authdict) From d11f2dfee519a4136def4169cef0ef218ebf1e19 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 29 Apr 2021 14:31:14 +0100 Subject: [PATCH 07/21] typo in changelog --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 629d4a180d..bdeb614b9e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,7 +4,7 @@ Synapse 1.33.0rc2 (2021-04-29) Bugfixes -------- -- Fix tight loop handling presence replication when using workers. Introduced in v1.33.0rc1. ([\#9900](https://github.com/matrix-org/synapse/issues/9900)) +- Fix tight loop when handling presence replication when using workers. Introduced in v1.33.0rc1. ([\#9900](https://github.com/matrix-org/synapse/issues/9900)) Synapse 1.33.0rc1 (2021-04-28) From 56c4b47df36a99d2fc57a64013a4f482e25ee097 Mon Sep 17 00:00:00 2001 From: Dan Callahan Date: Fri, 30 Apr 2021 15:36:05 +0100 Subject: [PATCH 08/21] Build Debian packages for Ubuntu 21.04 Hirsute (#9909) Signed-off-by: Dan Callahan --- changelog.d/9909.feature | 1 + scripts-dev/build_debian_packages | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) create mode 100644 changelog.d/9909.feature diff --git a/changelog.d/9909.feature b/changelog.d/9909.feature new file mode 100644 index 0000000000..41aba5cca6 --- /dev/null +++ b/changelog.d/9909.feature @@ -0,0 +1 @@ +Build Debian packages for Ubuntu 21.04 (Hirsute Hippo). diff --git a/scripts-dev/build_debian_packages b/scripts-dev/build_debian_packages index 3bb6e2c7ea..07d018db99 100755 --- a/scripts-dev/build_debian_packages +++ b/scripts-dev/build_debian_packages @@ -21,9 +21,10 @@ DISTS = ( "debian:buster", "debian:bullseye", "debian:sid", - "ubuntu:bionic", - "ubuntu:focal", - "ubuntu:groovy", + "ubuntu:bionic", # 18.04 LTS (our EOL forced by Py36 on 2021-12-23) + "ubuntu:focal", # 20.04 LTS (our EOL forced by Py38 on 2024-10-14) + "ubuntu:groovy", # 20.10 (EOL 2021-07-07) + "ubuntu:hirsute", # 21.04 (EOL 2022-01-05) ) DESC = '''\ From b85821aca2c3cfc1c732dfcc0c1d6758a263487a Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 4 May 2021 13:28:59 +0100 Subject: [PATCH 09/21] Add port parameter to the sample config for psycopg2 args (#9911) Adds the `port` option with the default value to the sample config file. --- changelog.d/9911.doc | 1 + docs/sample_config.yaml | 1 + synapse/config/database.py | 1 + 3 files changed, 3 insertions(+) create mode 100644 changelog.d/9911.doc diff --git a/changelog.d/9911.doc b/changelog.d/9911.doc new file mode 100644 index 0000000000..f7fd9f1ba9 --- /dev/null +++ b/changelog.d/9911.doc @@ -0,0 +1 @@ +Add `port` argument to the Postgres database sample config section. \ No newline at end of file diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index e0350279ad..d013725cdc 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -810,6 +810,7 @@ caches: # password: secretpassword # database: synapse # host: localhost +# port: 5432 # cp_min: 5 # cp_max: 10 # diff --git a/synapse/config/database.py b/synapse/config/database.py index 79a02706b4..c76ef1e1de 100644 --- a/synapse/config/database.py +++ b/synapse/config/database.py @@ -58,6 +58,7 @@ DEFAULT_CONFIG = """\ # password: secretpassword # database: synapse # host: localhost +# port: 5432 # cp_min: 5 # cp_max: 10 # From e3bc4617fcd5c858ff02cf2d443b898db87ae8a5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 May 2021 15:14:22 +0100 Subject: [PATCH 10/21] Time external cache response time (#9904) --- changelog.d/9904.misc | 1 + synapse/replication/tcp/external_cache.py | 36 ++++++++++++++++------- 2 files changed, 27 insertions(+), 10 deletions(-) create mode 100644 changelog.d/9904.misc diff --git a/changelog.d/9904.misc b/changelog.d/9904.misc new file mode 100644 index 0000000000..3db1e625ae --- /dev/null +++ b/changelog.d/9904.misc @@ -0,0 +1 @@ +Time response time for external cache requests. diff --git a/synapse/replication/tcp/external_cache.py b/synapse/replication/tcp/external_cache.py index 1a3b051e3c..b402f82810 100644 --- a/synapse/replication/tcp/external_cache.py +++ b/synapse/replication/tcp/external_cache.py @@ -15,7 +15,7 @@ import logging from typing import TYPE_CHECKING, Any, Optional -from prometheus_client import Counter +from prometheus_client import Counter, Histogram from synapse.logging.context import make_deferred_yieldable from synapse.util import json_decoder, json_encoder @@ -35,6 +35,20 @@ get_counter = Counter( labelnames=["cache_name", "hit"], ) +response_timer = Histogram( + "synapse_external_cache_response_time_seconds", + "Time taken to get a response from Redis for a cache get/set request", + labelnames=["method"], + buckets=( + 0.001, + 0.002, + 0.005, + 0.01, + 0.02, + 0.05, + ), +) + logger = logging.getLogger(__name__) @@ -72,13 +86,14 @@ class ExternalCache: logger.debug("Caching %s %s: %r", cache_name, key, encoded_value) - return await make_deferred_yieldable( - self._redis_connection.set( - self._get_redis_key(cache_name, key), - encoded_value, - pexpire=expiry_ms, + with response_timer.labels("set").time(): + return await make_deferred_yieldable( + self._redis_connection.set( + self._get_redis_key(cache_name, key), + encoded_value, + pexpire=expiry_ms, + ) ) - ) async def get(self, cache_name: str, key: str) -> Optional[Any]: """Look up a key/value in the named cache.""" @@ -86,9 +101,10 @@ class ExternalCache: if self._redis_connection is None: return None - result = await make_deferred_yieldable( - self._redis_connection.get(self._get_redis_key(cache_name, key)) - ) + with response_timer.labels("get").time(): + result = await make_deferred_yieldable( + self._redis_connection.get(self._get_redis_key(cache_name, key)) + ) logger.debug("Got cache result %s %s: %r", cache_name, key, result) From 0644ac0989d06fc0ba5d08a5412d65d75f7d8db2 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 5 May 2021 14:15:54 +0100 Subject: [PATCH 11/21] 1.33.0 --- CHANGES.md | 9 +++++++++ changelog.d/9909.feature | 1 - debian/changelog | 6 ++++++ synapse/__init__.py | 2 +- 4 files changed, 16 insertions(+), 2 deletions(-) delete mode 100644 changelog.d/9909.feature diff --git a/CHANGES.md b/CHANGES.md index bdeb614b9e..206859cff7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,12 @@ +Synapse 1.33.0 (2021-05-05) +=========================== + +Features +-------- + +- Build Debian packages for Ubuntu 21.04 (Hirsute Hippo). ([\#9909](https://github.com/matrix-org/synapse/issues/9909)) + + Synapse 1.33.0rc2 (2021-04-29) ============================== diff --git a/changelog.d/9909.feature b/changelog.d/9909.feature deleted file mode 100644 index 41aba5cca6..0000000000 --- a/changelog.d/9909.feature +++ /dev/null @@ -1 +0,0 @@ -Build Debian packages for Ubuntu 21.04 (Hirsute Hippo). diff --git a/debian/changelog b/debian/changelog index fd33bfda5c..b54d3f2bc6 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +matrix-synapse-py3 (1.33.0) stable; urgency=medium + + * New synapse release 1.33.0. + + -- Synapse Packaging team Wed, 05 May 2021 14:15:27 +0100 + matrix-synapse-py3 (1.32.2) stable; urgency=medium * New synapse release 1.32.2. diff --git a/synapse/__init__.py b/synapse/__init__.py index 319c52be2c..5eac40730a 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -47,7 +47,7 @@ try: except ImportError: pass -__version__ = "1.33.0rc2" +__version__ = "1.33.0" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when From e9eb3549d32a6f93d07de8dbd5e1ebe54c8d8278 Mon Sep 17 00:00:00 2001 From: "DeepBlueV7.X" Date: Wed, 5 May 2021 13:37:56 +0000 Subject: [PATCH 12/21] Leave out optional keys from /sync (#9919) This leaves out all optional keys from /sync. This should be fine for all clients tested against conduit already, but it may break some clients, as such we should check, that at least most of them don't break horribly and maybe back out some of the individual changes. (We can probably always leave out groups for example, while the others may cause more issues.) Signed-off-by: Nicolas Werner --- changelog.d/9919.feature | 1 + synapse/rest/client/v2_alpha/sync.py | 62 +++++++++++++------ tests/rest/client/v2_alpha/test_sync.py | 30 +-------- .../test_resource_limits_server_notices.py | 8 ++- 4 files changed, 51 insertions(+), 50 deletions(-) create mode 100644 changelog.d/9919.feature diff --git a/changelog.d/9919.feature b/changelog.d/9919.feature new file mode 100644 index 0000000000..07747505d2 --- /dev/null +++ b/changelog.d/9919.feature @@ -0,0 +1 @@ +Omit empty fields from the `/sync` response. Contributed by @deepbluev7. diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 95ee3f1b84..5f85653330 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -14,6 +14,7 @@ import itertools import logging +from collections import defaultdict from typing import TYPE_CHECKING, Tuple from synapse.api.constants import PresenceState @@ -229,24 +230,49 @@ class SyncRestServlet(RestServlet): ) logger.debug("building sync response dict") - return { - "account_data": {"events": sync_result.account_data}, - "to_device": {"events": sync_result.to_device}, - "device_lists": { - "changed": list(sync_result.device_lists.changed), - "left": list(sync_result.device_lists.left), - }, - "presence": SyncRestServlet.encode_presence(sync_result.presence, time_now), - "rooms": {"join": joined, "invite": invited, "leave": archived}, - "groups": { - "join": sync_result.groups.join, - "invite": sync_result.groups.invite, - "leave": sync_result.groups.leave, - }, - "device_one_time_keys_count": sync_result.device_one_time_keys_count, - "org.matrix.msc2732.device_unused_fallback_key_types": sync_result.device_unused_fallback_key_types, - "next_batch": await sync_result.next_batch.to_string(self.store), - } + + response: dict = defaultdict(dict) + response["next_batch"] = await sync_result.next_batch.to_string(self.store) + + if sync_result.account_data: + response["account_data"] = {"events": sync_result.account_data} + if sync_result.presence: + response["presence"] = SyncRestServlet.encode_presence( + sync_result.presence, time_now + ) + + if sync_result.to_device: + response["to_device"] = {"events": sync_result.to_device} + + if sync_result.device_lists.changed: + response["device_lists"]["changed"] = list(sync_result.device_lists.changed) + if sync_result.device_lists.left: + response["device_lists"]["left"] = list(sync_result.device_lists.left) + + if sync_result.device_one_time_keys_count: + response[ + "device_one_time_keys_count" + ] = sync_result.device_one_time_keys_count + if sync_result.device_unused_fallback_key_types: + response[ + "org.matrix.msc2732.device_unused_fallback_key_types" + ] = sync_result.device_unused_fallback_key_types + + if joined: + response["rooms"]["join"] = joined + if invited: + response["rooms"]["invite"] = invited + if archived: + response["rooms"]["leave"] = archived + + if sync_result.groups.join: + response["groups"]["join"] = sync_result.groups.join + if sync_result.groups.invite: + response["groups"]["invite"] = sync_result.groups.invite + if sync_result.groups.leave: + response["groups"]["leave"] = sync_result.groups.leave + + return response @staticmethod def encode_presence(events, time_now): diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py index dbcbdf159a..74be5176d0 100644 --- a/tests/rest/client/v2_alpha/test_sync.py +++ b/tests/rest/client/v2_alpha/test_sync.py @@ -37,35 +37,7 @@ class FilterTestCase(unittest.HomeserverTestCase): channel = self.make_request("GET", "/sync") self.assertEqual(channel.code, 200) - self.assertTrue( - { - "next_batch", - "rooms", - "presence", - "account_data", - "to_device", - "device_lists", - }.issubset(set(channel.json_body.keys())) - ) - - def test_sync_presence_disabled(self): - """ - When presence is disabled, the key does not appear in /sync. - """ - self.hs.config.use_presence = False - - channel = self.make_request("GET", "/sync") - - self.assertEqual(channel.code, 200) - self.assertTrue( - { - "next_batch", - "rooms", - "account_data", - "to_device", - "device_lists", - }.issubset(set(channel.json_body.keys())) - ) + self.assertIn("next_batch", channel.json_body) class SyncFilterTestCase(unittest.HomeserverTestCase): diff --git a/tests/server_notices/test_resource_limits_server_notices.py b/tests/server_notices/test_resource_limits_server_notices.py index d46521ccdc..3245aa91ca 100644 --- a/tests/server_notices/test_resource_limits_server_notices.py +++ b/tests/server_notices/test_resource_limits_server_notices.py @@ -306,8 +306,9 @@ class TestResourceLimitsServerNoticesWithRealRooms(unittest.HomeserverTestCase): channel = self.make_request("GET", "/sync?timeout=0", access_token=tok) - invites = channel.json_body["rooms"]["invite"] - self.assertEqual(len(invites), 0, invites) + self.assertNotIn( + "rooms", channel.json_body, "Got invites without server notice" + ) def test_invite_with_notice(self): """Tests that, if the MAU limit is hit, the server notices user invites each user @@ -364,7 +365,8 @@ class TestResourceLimitsServerNoticesWithRealRooms(unittest.HomeserverTestCase): # We could also pick another user and sync with it, which would return an # invite to a system notices room, but it doesn't matter which user we're # using so we use the last one because it saves us an extra sync. - invites = channel.json_body["rooms"]["invite"] + if "rooms" in channel.json_body: + invites = channel.json_body["rooms"]["invite"] # Make sure we have an invite to process. self.assertEqual(len(invites), 1, invites) From d5305000f1c5799ffb6fcd64ad27e7bfd8ba2113 Mon Sep 17 00:00:00 2001 From: Christopher May-Townsend Date: Wed, 5 May 2021 16:33:04 +0100 Subject: [PATCH 13/21] Docker healthcheck timings - add startup delay and changed interval (#9913) * Add healthcheck startup delay by 5secs and reduced interval check to 15s to reduce waiting time for docker aware edge routers bringing an instance online --- changelog.d/9913.docker | 1 + docker/Dockerfile | 2 +- docker/README.md | 17 ++++++++++++++--- 3 files changed, 16 insertions(+), 4 deletions(-) create mode 100644 changelog.d/9913.docker diff --git a/changelog.d/9913.docker b/changelog.d/9913.docker new file mode 100644 index 0000000000..93835e14cb --- /dev/null +++ b/changelog.d/9913.docker @@ -0,0 +1 @@ +Added startup_delay to docker healthcheck to reduce waiting time for coming online, updated readme for extra options, contributed by @Maquis196. diff --git a/docker/Dockerfile b/docker/Dockerfile index 4f5cd06d72..2bdc607e66 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -88,5 +88,5 @@ EXPOSE 8008/tcp 8009/tcp 8448/tcp ENTRYPOINT ["/start.py"] -HEALTHCHECK --interval=1m --timeout=5s \ +HEALTHCHECK --start-period=5s --interval=15s --timeout=5s \ CMD curl -fSs http://localhost:8008/health || exit 1 diff --git a/docker/README.md b/docker/README.md index a7d1e670fe..c8d3c4b3da 100644 --- a/docker/README.md +++ b/docker/README.md @@ -191,6 +191,16 @@ whilst running the above `docker run` commands. ``` --no-healthcheck ``` + +## Disabling the healthcheck in docker-compose file + +If you wish to disable the healthcheck via docker-compose, append the following to your service configuration. + +``` + healthcheck: + disable: true +``` + ## Setting custom healthcheck on docker run If you wish to point the healthcheck at a different port with docker command, add the following @@ -202,14 +212,15 @@ If you wish to point the healthcheck at a different port with docker command, ad ## Setting the healthcheck in docker-compose file You can add the following to set a custom healthcheck in a docker compose file. -You will need version >2.1 for this to work. +You will need docker-compose version >2.1 for this to work. ``` healthcheck: test: ["CMD", "curl", "-fSs", "http://localhost:8008/health"] - interval: 1m - timeout: 10s + interval: 15s + timeout: 5s retries: 3 + start_period: 5s ``` ## Using jemalloc From d0aee697ac0587c005bc1048f5036979331f1101 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 May 2021 16:49:34 +0100 Subject: [PATCH 14/21] Use get_current_users_in_room from store and not StateHandler (#9910) --- changelog.d/9910.bugfix | 1 + changelog.d/9910.feature | 1 + synapse/handlers/directory.py | 4 ++-- synapse/handlers/events.py | 2 +- synapse/handlers/message.py | 2 +- synapse/handlers/presence.py | 2 +- synapse/handlers/room.py | 2 +- synapse/handlers/sync.py | 6 +++--- synapse/state/__init__.py | 10 +++++++--- synapse/storage/_base.py | 1 + synapse/storage/databases/main/roommember.py | 8 ++++++-- synapse/storage/databases/main/user_directory.py | 4 +--- 12 files changed, 26 insertions(+), 17 deletions(-) create mode 100644 changelog.d/9910.bugfix create mode 100644 changelog.d/9910.feature diff --git a/changelog.d/9910.bugfix b/changelog.d/9910.bugfix new file mode 100644 index 0000000000..06d523fd46 --- /dev/null +++ b/changelog.d/9910.bugfix @@ -0,0 +1 @@ +Fix bug where user directory could get out of sync if room visibility and membership changed in quick succession. diff --git a/changelog.d/9910.feature b/changelog.d/9910.feature new file mode 100644 index 0000000000..54165cce18 --- /dev/null +++ b/changelog.d/9910.feature @@ -0,0 +1 @@ +Improve performance after joining a large room when presence is enabled. diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index de1b14cde3..4064a2b859 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -78,7 +78,7 @@ class DirectoryHandler(BaseHandler): # TODO(erikj): Add transactions. # TODO(erikj): Check if there is a current association. if not servers: - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) servers = {get_domain_from_id(u) for u in users} if not servers: @@ -270,7 +270,7 @@ class DirectoryHandler(BaseHandler): Codes.NOT_FOUND, ) - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) extra_servers = {get_domain_from_id(u) for u in users} servers = set(extra_servers) | set(servers) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index d82144d7fa..f134f1e234 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -103,7 +103,7 @@ class EventStreamHandler(BaseHandler): # Send down presence. if event.state_key == auth_user_id: # Send down presence for everyone in the room. - users = await self.state.get_current_users_in_room( + users = await self.store.get_users_in_room( event.room_id ) # type: Iterable[str] else: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 49f8aa25ea..393f17c3a3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -258,7 +258,7 @@ class MessageHandler: "Getting joined members after leaving is not implemented" ) - users_with_profile = await self.state.get_current_users_in_room(room_id) + users_with_profile = await self.store.get_users_in_room_with_profiles(room_id) # If this is an AS, double check that they are allowed to see the members. # This can either be because the AS user is in the room or because there diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index ebbc234334..8e085dfbec 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1293,7 +1293,7 @@ class PresenceHandler(BasePresenceHandler): remote_host = get_domain_from_id(user_id) - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) user_ids = list(filter(self.is_mine_id, users)) states_d = await self.current_state_for_users(user_ids) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5a888b7941..fb4823a5cc 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1327,7 +1327,7 @@ class RoomShutdownHandler: new_room_id = None logger.info("Shutting down room %r", room_id) - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) kicked_users = [] failed_to_kick_users = [] for user_id in users: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a9a3ee05c3..0fcc1532da 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1190,7 +1190,7 @@ class SyncHandler: # Step 1b, check for newly joined rooms for room_id in newly_joined_rooms: - joined_users = await self.state.get_current_users_in_room(room_id) + joined_users = await self.store.get_users_in_room(room_id) newly_joined_or_invited_users.update(joined_users) # TODO: Check that these users are actually new, i.e. either they @@ -1206,7 +1206,7 @@ class SyncHandler: # Now find users that we no longer track for room_id in newly_left_rooms: - left_users = await self.state.get_current_users_in_room(room_id) + left_users = await self.store.get_users_in_room(room_id) newly_left_users.update(left_users) # Remove any users that we still share a room with. @@ -1361,7 +1361,7 @@ class SyncHandler: extra_users_ids = set(newly_joined_or_invited_users) for room_id in newly_joined_rooms: - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) extra_users_ids.update(users) extra_users_ids.discard(user.to_string()) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index b3bd92d37c..a1770f620e 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -213,19 +213,23 @@ class StateHandler: return ret.state async def get_current_users_in_room( - self, room_id: str, latest_event_ids: Optional[List[str]] = None + self, room_id: str, latest_event_ids: List[str] ) -> Dict[str, ProfileInfo]: """ Get the users who are currently in a room. + Note: This is much slower than using the equivalent method + `DataStore.get_users_in_room` or `DataStore.get_users_in_room_with_profiles`, + so this should only be used when wanting the users at a particular point + in the room. + Args: room_id: The ID of the room. latest_event_ids: Precomputed list of latest event IDs. Will be computed if None. Returns: Dictionary of user IDs to their profileinfo. """ - if not latest_event_ids: - latest_event_ids = await self.store.get_latest_event_ids_in_room(room_id) + assert latest_event_ids is not None logger.debug("calling resolve_state_groups from get_current_users_in_room") diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 6b68d8720c..3d98d3f5f8 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -69,6 +69,7 @@ class SQLBaseStore(metaclass=ABCMeta): self._attempt_to_invalidate_cache("is_host_joined", (room_id, host)) self._attempt_to_invalidate_cache("get_users_in_room", (room_id,)) + self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,)) self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,)) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 2a8532f8c1..5fc3bb5a7d 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -205,8 +205,12 @@ class RoomMemberWorkerStore(EventsWorkerStore): def _get_users_in_room_with_profiles(txn) -> Dict[str, ProfileInfo]: sql = """ - SELECT user_id, display_name, avatar_url FROM room_memberships - WHERE room_id = ? AND membership = ? + SELECT state_key, display_name, avatar_url FROM room_memberships as m + INNER JOIN current_state_events as c + ON m.event_id = c.event_id + AND m.room_id = c.room_id + AND m.user_id = c.state_key + WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ? """ txn.execute(sql, (room_id, Membership.JOIN)) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 7a082fdd21..a6bfb4902a 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -142,8 +142,6 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): batch_size (int): Maximum number of state events to process per cycle. """ - state = self.hs.get_state_handler() - # If we don't have progress filed, delete everything. if not progress: await self.delete_all_from_user_dir() @@ -197,7 +195,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): room_id ) - users_with_profile = await state.get_current_users_in_room(room_id) + users_with_profile = await self.get_users_in_room_with_profiles(room_id) user_ids = set(users_with_profile) # Update each user in the user directory. From de8f0a03a3cc3a2327dfd3058c99e48067965079 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 May 2021 16:53:22 +0100 Subject: [PATCH 15/21] Don't set the external cache if its been done recently (#9905) --- changelog.d/9905.feature | 1 + synapse/handlers/federation.py | 4 +++- synapse/handlers/message.py | 34 ++++++++++++++++++++++++++++++---- 3 files changed, 34 insertions(+), 5 deletions(-) create mode 100644 changelog.d/9905.feature diff --git a/changelog.d/9905.feature b/changelog.d/9905.feature new file mode 100644 index 0000000000..96a0e7f09f --- /dev/null +++ b/changelog.d/9905.feature @@ -0,0 +1 @@ +Improve performance of sending events for worker-based deployments using Redis. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9d867aaf4d..e8330a2b50 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2446,7 +2446,9 @@ class FederationHandler(BaseHandler): # If we are going to send this event over federation we precaclculate # the joined hosts. if event.internal_metadata.get_send_on_behalf_of(): - await self.event_creation_handler.cache_joined_hosts_for_event(event) + await self.event_creation_handler.cache_joined_hosts_for_event( + event, context + ) return context diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 393f17c3a3..8729332d4b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -51,6 +51,7 @@ from synapse.storage.state import StateFilter from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester from synapse.util import json_decoder, json_encoder from synapse.util.async_helpers import Linearizer +from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client @@ -457,6 +458,19 @@ class EventCreationHandler: self._external_cache = hs.get_external_cache() + # Stores the state groups we've recently added to the joined hosts + # external cache. Note that the timeout must be significantly less than + # the TTL on the external cache. + self._external_cache_joined_hosts_updates = ( + None + ) # type: Optional[ExpiringCache] + if self._external_cache.is_enabled(): + self._external_cache_joined_hosts_updates = ExpiringCache( + "_external_cache_joined_hosts_updates", + self.clock, + expiry_ms=30 * 60 * 1000, + ) + async def create_event( self, requester: Requester, @@ -967,7 +981,7 @@ class EventCreationHandler: await self.action_generator.handle_push_actions_for_event(event, context) - await self.cache_joined_hosts_for_event(event) + await self.cache_joined_hosts_for_event(event, context) try: # If we're a worker we need to hit out to the master. @@ -1008,7 +1022,9 @@ class EventCreationHandler: await self.store.remove_push_actions_from_staging(event.event_id) raise - async def cache_joined_hosts_for_event(self, event: EventBase) -> None: + async def cache_joined_hosts_for_event( + self, event: EventBase, context: EventContext + ) -> None: """Precalculate the joined hosts at the event, when using Redis, so that external federation senders don't have to recalculate it themselves. """ @@ -1016,6 +1032,9 @@ class EventCreationHandler: if not self._external_cache.is_enabled(): return + # If external cache is enabled we should always have this. + assert self._external_cache_joined_hosts_updates is not None + # We actually store two mappings, event ID -> prev state group, # state group -> joined hosts, which is much more space efficient # than event ID -> joined hosts. @@ -1023,16 +1042,21 @@ class EventCreationHandler: # Note: We have to cache event ID -> prev state group, as we don't # store that in the DB. # - # Note: We always set the state group -> joined hosts cache, even if - # we already set it, so that the expiry time is reset. + # Note: We set the state group -> joined hosts cache if it hasn't been + # set for a while, so that the expiry time is reset. state_entry = await self.state.resolve_state_groups_for_events( event.room_id, event_ids=event.prev_event_ids() ) if state_entry.state_group: + if state_entry.state_group in self._external_cache_joined_hosts_updates: + return + joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry) + # Note that the expiry times must be larger than the expiry time in + # _external_cache_joined_hosts_updates. await self._external_cache.set( "event_to_prev_state_group", event.event_id, @@ -1046,6 +1070,8 @@ class EventCreationHandler: expiry_ms=60 * 60 * 1000, ) + self._external_cache_joined_hosts_updates[state_entry.state_group] = None + async def _validate_canonical_alias( self, directory_handler, room_alias_str: str, expected_room_id: str ) -> None: From 1fb9a2d0bf2506ca6e5343cb340a441585ca1c07 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 May 2021 16:53:45 +0100 Subject: [PATCH 16/21] Limit how often GC happens by time. (#9902) Synapse can be quite memory intensive, and unless care is taken to tune the GC thresholds it can end up thrashing, causing noticable performance problems for large servers. We fix this by limiting how often we GC a given generation, regardless of current counts/thresholds. This does not help with the reverse problem where the thresholds are set too high, but that should only happen in situations where they've been manually configured. Adds a `gc_min_seconds_between` config option to override the defaults. Fixes #9890. --- changelog.d/9902.feature | 1 + docs/sample_config.yaml | 10 ++++++++++ synapse/app/generic_worker.py | 3 +++ synapse/app/homeserver.py | 3 +++ synapse/config/server.py | 31 ++++++++++++++++++++++++++++++- synapse/metrics/__init__.py | 18 ++++++++++++++++-- 6 files changed, 63 insertions(+), 3 deletions(-) create mode 100644 changelog.d/9902.feature diff --git a/changelog.d/9902.feature b/changelog.d/9902.feature new file mode 100644 index 0000000000..4d9f324d4e --- /dev/null +++ b/changelog.d/9902.feature @@ -0,0 +1 @@ +Add limits to how often Synapse will GC, ensuring that large servers do not end up GC thrashing if `gc_thresholds` has not been correctly set. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index d013725cdc..f469d6e54f 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -152,6 +152,16 @@ presence: # #gc_thresholds: [700, 10, 10] +# The minimum time in seconds between each GC for a generation, regardless of +# the GC thresholds. This ensures that we don't do GC too frequently. +# +# A value of `[1s, 10s, 30s]` indicates that a second must pass between consecutive +# generation 0 GCs, etc. +# +# Defaults to `[1s, 10s, 30s]`. +# +#gc_min_interval: [0.5s, 30s, 1m] + # Set the limit on the returned events in the timeline in the get # and sync operations. The default value is 100. -1 means no upper limit. # diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 1a15ceee81..a3fe9a3f38 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -455,6 +455,9 @@ def start(config_options): synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts + if config.server.gc_seconds: + synapse.metrics.MIN_TIME_BETWEEN_GCS = config.server.gc_seconds + hs = GenericWorkerServer( config.server_name, config=config, diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 8e78134bbe..6a823da10d 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -342,6 +342,9 @@ def setup(config_options): events.USE_FROZEN_DICTS = config.use_frozen_dicts + if config.server.gc_seconds: + synapse.metrics.MIN_TIME_BETWEEN_GCS = config.server.gc_seconds + hs = SynapseHomeServer( config.server_name, config=config, diff --git a/synapse/config/server.py b/synapse/config/server.py index 21ca7b33e3..c290a35a92 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -19,7 +19,7 @@ import logging import os.path import re from textwrap import indent -from typing import Any, Dict, Iterable, List, Optional, Set +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple import attr import yaml @@ -572,6 +572,7 @@ class ServerConfig(Config): _warn_if_webclient_configured(self.listeners) self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None)) + self.gc_seconds = self.read_gc_intervals(config.get("gc_min_interval", None)) @attr.s class LimitRemoteRoomsConfig: @@ -917,6 +918,16 @@ class ServerConfig(Config): # #gc_thresholds: [700, 10, 10] + # The minimum time in seconds between each GC for a generation, regardless of + # the GC thresholds. This ensures that we don't do GC too frequently. + # + # A value of `[1s, 10s, 30s]` indicates that a second must pass between consecutive + # generation 0 GCs, etc. + # + # Defaults to `[1s, 10s, 30s]`. + # + #gc_min_interval: [0.5s, 30s, 1m] + # Set the limit on the returned events in the timeline in the get # and sync operations. The default value is 100. -1 means no upper limit. # @@ -1305,6 +1316,24 @@ class ServerConfig(Config): help="Turn on the twisted telnet manhole service on the given port.", ) + def read_gc_intervals(self, durations) -> Optional[Tuple[float, float, float]]: + """Reads the three durations for the GC min interval option, returning seconds.""" + if durations is None: + return None + + try: + if len(durations) != 3: + raise ValueError() + return ( + self.parse_duration(durations[0]) / 1000, + self.parse_duration(durations[1]) / 1000, + self.parse_duration(durations[2]) / 1000, + ) + except Exception: + raise ConfigError( + "Value of `gc_min_interval` must be a list of three durations if set" + ) + def is_threepid_reserved(reserved_threepids, threepid): """Check the threepid against the reserved threepid config diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 31b7b3c256..e671da26d5 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -535,6 +535,13 @@ class ReactorLastSeenMetric: REGISTRY.register(ReactorLastSeenMetric()) +# The minimum time in seconds between GCs for each generation, regardless of the current GC +# thresholds and counts. +MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0) + +# The time (in seconds since the epoch) of the last time we did a GC for each generation. +_last_gc = [0.0, 0.0, 0.0] + def runUntilCurrentTimer(reactor, func): @functools.wraps(func) @@ -575,11 +582,16 @@ def runUntilCurrentTimer(reactor, func): return ret # Check if we need to do a manual GC (since its been disabled), and do - # one if necessary. + # one if necessary. Note we go in reverse order as e.g. a gen 1 GC may + # promote an object into gen 2, and we don't want to handle the same + # object multiple times. threshold = gc.get_threshold() counts = gc.get_count() for i in (2, 1, 0): - if threshold[i] < counts[i]: + # We check if we need to do one based on a straightforward + # comparison between the threshold and count. We also do an extra + # check to make sure that we don't a GC too often. + if threshold[i] < counts[i] and MIN_TIME_BETWEEN_GCS[i] < end - _last_gc[i]: if i == 0: logger.debug("Collecting gc %d", i) else: @@ -589,6 +601,8 @@ def runUntilCurrentTimer(reactor, func): unreachable = gc.collect(i) end = time.time() + _last_gc[i] = end + gc_time.labels(i).observe(end - start) gc_unreachable.labels(i).set(unreachable) From ef889c98a6cde0cfa95f7fdaf7f99ec3c1e9bb7f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 May 2021 16:54:36 +0100 Subject: [PATCH 17/21] Optionally track memory usage of each LruCache (#9881) This will double count slightly in the presence of interned strings. It's off by default as it can consume a lot of resources. --- changelog.d/9881.feature | 1 + mypy.ini | 3 +++ synapse/app/generic_worker.py | 1 + synapse/app/homeserver.py | 1 + synapse/config/cache.py | 11 ++++++++ synapse/python_dependencies.py | 2 ++ synapse/util/caches/__init__.py | 31 +++++++++++++++++++++ synapse/util/caches/lrucache.py | 48 ++++++++++++++++++++++++++++++++- 8 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 changelog.d/9881.feature diff --git a/changelog.d/9881.feature b/changelog.d/9881.feature new file mode 100644 index 0000000000..088a517e02 --- /dev/null +++ b/changelog.d/9881.feature @@ -0,0 +1 @@ +Add experimental option to track memory usage of the caches. diff --git a/mypy.ini b/mypy.ini index a40f705b76..ea655a0d4d 100644 --- a/mypy.ini +++ b/mypy.ini @@ -171,3 +171,6 @@ ignore_missing_imports = True [mypy-txacme.*] ignore_missing_imports = True + +[mypy-pympler.*] +ignore_missing_imports = True diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index a3fe9a3f38..f730cdbd78 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -454,6 +454,7 @@ def start(config_options): config.server.update_user_directory = False synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts + synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage if config.server.gc_seconds: synapse.metrics.MIN_TIME_BETWEEN_GCS = config.server.gc_seconds diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 6a823da10d..b2501ee4d7 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -341,6 +341,7 @@ def setup(config_options): sys.exit(0) events.USE_FROZEN_DICTS = config.use_frozen_dicts + synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage if config.server.gc_seconds: synapse.metrics.MIN_TIME_BETWEEN_GCS = config.server.gc_seconds diff --git a/synapse/config/cache.py b/synapse/config/cache.py index 41b9b3f51f..91165ee1ce 100644 --- a/synapse/config/cache.py +++ b/synapse/config/cache.py @@ -17,6 +17,8 @@ import re import threading from typing import Callable, Dict +from synapse.python_dependencies import DependencyException, check_requirements + from ._base import Config, ConfigError # The prefix for all cache factor-related environment variables @@ -189,6 +191,15 @@ class CacheConfig(Config): ) self.cache_factors[cache] = factor + self.track_memory_usage = cache_config.get("track_memory_usage", False) + if self.track_memory_usage: + try: + check_requirements("cache_memory") + except DependencyException as e: + raise ConfigError( + e.message # noqa: B306, DependencyException.message is a property + ) + # Resize all caches (if necessary) with the new factors we've loaded self.resize_all_caches() diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 2de946f464..d58eeeaa74 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -116,6 +116,8 @@ CONDITIONAL_REQUIREMENTS = { # hiredis is not a *strict* dependency, but it makes things much faster. # (if it is not installed, we fall back to slow code.) "redis": ["txredisapi>=1.4.7", "hiredis"], + # Required to use experimental `caches.track_memory_usage` config option. + "cache_memory": ["pympler"], } ALL_OPTIONAL_REQUIREMENTS = set() # type: Set[str] diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 46af7fa473..ca36f07c20 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -24,6 +24,11 @@ from synapse.config.cache import add_resizable_cache logger = logging.getLogger(__name__) + +# Whether to track estimated memory usage of the LruCaches. +TRACK_MEMORY_USAGE = False + + caches_by_name = {} # type: Dict[str, Sized] collectors_by_name = {} # type: Dict[str, CacheMetric] @@ -32,6 +37,11 @@ cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"]) cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name"]) cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"]) cache_max_size = Gauge("synapse_util_caches_cache_max_size", "", ["name"]) +cache_memory_usage = Gauge( + "synapse_util_caches_cache_size_bytes", + "Estimated memory usage of the caches", + ["name"], +) response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"]) response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"]) @@ -52,6 +62,7 @@ class CacheMetric: hits = attr.ib(default=0) misses = attr.ib(default=0) evicted_size = attr.ib(default=0) + memory_usage = attr.ib(default=None) def inc_hits(self): self.hits += 1 @@ -62,6 +73,19 @@ class CacheMetric: def inc_evictions(self, size=1): self.evicted_size += size + def inc_memory_usage(self, memory: int): + if self.memory_usage is None: + self.memory_usage = 0 + + self.memory_usage += memory + + def dec_memory_usage(self, memory: int): + self.memory_usage -= memory + + def clear_memory_usage(self): + if self.memory_usage is not None: + self.memory_usage = 0 + def describe(self): return [] @@ -81,6 +105,13 @@ class CacheMetric: cache_total.labels(self._cache_name).set(self.hits + self.misses) if getattr(self._cache, "max_size", None): cache_max_size.labels(self._cache_name).set(self._cache.max_size) + + if TRACK_MEMORY_USAGE: + # self.memory_usage can be None if nothing has been inserted + # into the cache yet. + cache_memory_usage.labels(self._cache_name).set( + self.memory_usage or 0 + ) if self._collect_callback: self._collect_callback() except Exception as e: diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 10b0ec6b75..1be675e014 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -32,9 +32,36 @@ from typing import ( from typing_extensions import Literal from synapse.config import cache as cache_config +from synapse.util import caches from synapse.util.caches import CacheMetric, register_cache from synapse.util.caches.treecache import TreeCache +try: + from pympler.asizeof import Asizer + + def _get_size_of(val: Any, *, recurse=True) -> int: + """Get an estimate of the size in bytes of the object. + + Args: + val: The object to size. + recurse: If true will include referenced values in the size, + otherwise only sizes the given object. + """ + # Ignore singleton values when calculating memory usage. + if val in ((), None, ""): + return 0 + + sizer = Asizer() + sizer.exclude_refs((), None, "") + return sizer.asizeof(val, limit=100 if recurse else 0) + + +except ImportError: + + def _get_size_of(val: Any, *, recurse=True) -> int: + return 0 + + # Function type: the type used for invalidation callbacks FT = TypeVar("FT", bound=Callable[..., Any]) @@ -56,7 +83,7 @@ def enumerate_leaves(node, depth): class _Node: - __slots__ = ["prev_node", "next_node", "key", "value", "callbacks"] + __slots__ = ["prev_node", "next_node", "key", "value", "callbacks", "memory"] def __init__( self, @@ -84,6 +111,16 @@ class _Node: self.add_callbacks(callbacks) + self.memory = 0 + if caches.TRACK_MEMORY_USAGE: + self.memory = ( + _get_size_of(key) + + _get_size_of(value) + + _get_size_of(self.callbacks, recurse=False) + + _get_size_of(self, recurse=False) + ) + self.memory += _get_size_of(self.memory, recurse=False) + def add_callbacks(self, callbacks: Collection[Callable[[], None]]) -> None: """Add to stored list of callbacks, removing duplicates.""" @@ -233,6 +270,9 @@ class LruCache(Generic[KT, VT]): if size_callback: cached_cache_len[0] += size_callback(node.value) + if caches.TRACK_MEMORY_USAGE and metrics: + metrics.inc_memory_usage(node.memory) + def move_node_to_front(node): prev_node = node.prev_node next_node = node.next_node @@ -258,6 +298,9 @@ class LruCache(Generic[KT, VT]): node.run_and_clear_callbacks() + if caches.TRACK_MEMORY_USAGE and metrics: + metrics.dec_memory_usage(node.memory) + return deleted_len @overload @@ -373,6 +416,9 @@ class LruCache(Generic[KT, VT]): if size_callback: cached_cache_len[0] = 0 + if caches.TRACK_MEMORY_USAGE and metrics: + metrics.clear_memory_usage() + @synchronized def cache_contains(key: KT) -> bool: return key in cache From e2a443550e7b47bf8fe1b5fbd76f9ca95e81cbad Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 5 May 2021 11:56:51 -0400 Subject: [PATCH 18/21] Support stable MSC1772 spaces identifiers. (#9915) Support both the unstable and stable identifiers. A future release will disable the unstable identifiers. --- changelog.d/9915.feature | 1 + synapse/api/constants.py | 3 +++ synapse/handlers/space_summary.py | 8 ++++++-- 3 files changed, 10 insertions(+), 2 deletions(-) create mode 100644 changelog.d/9915.feature diff --git a/changelog.d/9915.feature b/changelog.d/9915.feature new file mode 100644 index 0000000000..832916cb01 --- /dev/null +++ b/changelog.d/9915.feature @@ -0,0 +1 @@ +Support stable identifiers from [MSC1772](https://github.com/matrix-org/matrix-doc/pull/1772). diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 936b6534b4..bff750e5fb 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -110,6 +110,8 @@ class EventTypes: Dummy = "org.matrix.dummy_event" + SpaceChild = "m.space.child" + SpaceParent = "m.space.parent" MSC1772_SPACE_CHILD = "org.matrix.msc1772.space.child" MSC1772_SPACE_PARENT = "org.matrix.msc1772.space.parent" @@ -174,6 +176,7 @@ class EventContentFields: SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after" # cf https://github.com/matrix-org/matrix-doc/pull/1772 + ROOM_TYPE = "m.type" MSC1772_ROOM_TYPE = "org.matrix.msc1772.type" diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py index 01e3e050f9..d32452747c 100644 --- a/synapse/handlers/space_summary.py +++ b/synapse/handlers/space_summary.py @@ -288,6 +288,7 @@ class SpaceSummaryHandler: ev.data for ev in res.events if ev.event_type == EventTypes.MSC1772_SPACE_CHILD + or ev.event_type == EventTypes.SpaceChild ) async def _is_room_accessible(self, room_id: str, requester: Optional[str]) -> bool: @@ -331,7 +332,9 @@ class SpaceSummaryHandler: ) # TODO: update once MSC1772 lands - room_type = create_event.content.get(EventContentFields.MSC1772_ROOM_TYPE) + room_type = create_event.content.get(EventContentFields.ROOM_TYPE) + if not room_type: + room_type = create_event.content.get(EventContentFields.MSC1772_ROOM_TYPE) entry = { "room_id": stats["room_id"], @@ -360,8 +363,9 @@ class SpaceSummaryHandler: [ event_id for key, event_id in current_state_ids.items() - # TODO: update once MSC1772 lands + # TODO: update once MSC1772 has been FCP for a period of time. if key[0] == EventTypes.MSC1772_SPACE_CHILD + or key[0] == EventTypes.SpaceChild ] ) From 37623e33822d4e032ba6d1f523fb09b12fe27aab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 May 2021 17:27:05 +0100 Subject: [PATCH 19/21] Increase perf of handling presence when joining large rooms. (#9916) --- changelog.d/9916.feature | 1 + synapse/handlers/presence.py | 148 +++++++++++++++++--------------- tests/handlers/test_presence.py | 14 +-- 3 files changed, 84 insertions(+), 79 deletions(-) create mode 100644 changelog.d/9916.feature diff --git a/changelog.d/9916.feature b/changelog.d/9916.feature new file mode 100644 index 0000000000..54165cce18 --- /dev/null +++ b/changelog.d/9916.feature @@ -0,0 +1 @@ +Improve performance after joining a large room when presence is enabled. diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 8e085dfbec..6fd1f34289 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1183,7 +1183,16 @@ class PresenceHandler(BasePresenceHandler): max_pos, deltas = await self.store.get_current_state_deltas( self._event_pos, room_max_stream_ordering ) - await self._handle_state_delta(deltas) + + # We may get multiple deltas for different rooms, but we want to + # handle them on a room by room basis, so we batch them up by + # room. + deltas_by_room: Dict[str, List[JsonDict]] = {} + for delta in deltas: + deltas_by_room.setdefault(delta["room_id"], []).append(delta) + + for room_id, deltas_for_room in deltas_by_room.items(): + await self._handle_state_delta(room_id, deltas_for_room) self._event_pos = max_pos @@ -1192,17 +1201,21 @@ class PresenceHandler(BasePresenceHandler): max_pos ) - async def _handle_state_delta(self, deltas: List[JsonDict]) -> None: - """Process current state deltas to find new joins that need to be - handled. + async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None: + """Process current state deltas for the room to find new joins that need + to be handled. """ - # A map of destination to a set of user state that they should receive - presence_destinations = {} # type: Dict[str, Set[UserPresenceState]] + + # Sets of newly joined users. Note that if the local server is + # joining a remote room for the first time we'll see both the joining + # user and all remote users as newly joined. + newly_joined_users = set() for delta in deltas: + assert room_id == delta["room_id"] + typ = delta["type"] state_key = delta["state_key"] - room_id = delta["room_id"] event_id = delta["event_id"] prev_event_id = delta["prev_event_id"] @@ -1231,72 +1244,55 @@ class PresenceHandler(BasePresenceHandler): # Ignore changes to join events. continue - # Retrieve any user presence state updates that need to be sent as a result, - # and the destinations that need to receive it - destinations, user_presence_states = await self._on_user_joined_room( - room_id, state_key - ) + newly_joined_users.add(state_key) - # Insert the destinations and respective updates into our destinations dict - for destination in destinations: - presence_destinations.setdefault(destination, set()).update( - user_presence_states - ) + if not newly_joined_users: + # If nobody has joined then there's nothing to do. + return - # Send out user presence updates for each destination - for destination, user_state_set in presence_destinations.items(): - self._federation_queue.send_presence_to_destinations( - destinations=[destination], states=user_state_set - ) + # We want to send: + # 1. presence states of all local users in the room to newly joined + # remote servers + # 2. presence states of newly joined users to all remote servers in + # the room. + # + # TODO: Only send presence states to remote hosts that don't already + # have them (because they already share rooms). - async def _on_user_joined_room( - self, room_id: str, user_id: str - ) -> Tuple[List[str], List[UserPresenceState]]: - """Called when we detect a user joining the room via the current state - delta stream. Returns the destinations that need to be updated and the - presence updates to send to them. + # Get all the users who were already in the room, by fetching the + # current users in the room and removing the newly joined users. + users = await self.store.get_users_in_room(room_id) + prev_users = set(users) - newly_joined_users - Args: - room_id: The ID of the room that the user has joined. - user_id: The ID of the user that has joined the room. + # Construct sets for all the local users and remote hosts that were + # already in the room + prev_local_users = [] + prev_remote_hosts = set() + for user_id in prev_users: + if self.is_mine_id(user_id): + prev_local_users.append(user_id) + else: + prev_remote_hosts.add(get_domain_from_id(user_id)) - Returns: - A tuple of destinations and presence updates to send to them. - """ - if self.is_mine_id(user_id): - # If this is a local user then we need to send their presence - # out to hosts in the room (who don't already have it) + # Similarly, construct sets for all the local users and remote hosts + # that were *not* already in the room. Care needs to be taken with the + # calculating the remote hosts, as a host may have already been in the + # room even if there is a newly joined user from that host. + newly_joined_local_users = [] + newly_joined_remote_hosts = set() + for user_id in newly_joined_users: + if self.is_mine_id(user_id): + newly_joined_local_users.append(user_id) + else: + host = get_domain_from_id(user_id) + if host not in prev_remote_hosts: + newly_joined_remote_hosts.add(host) - # TODO: We should be able to filter the hosts down to those that - # haven't previously seen the user - - remote_hosts = await self.state.get_current_hosts_in_room(room_id) - - # Filter out ourselves. - filtered_remote_hosts = [ - host for host in remote_hosts if host != self.server_name - ] - - state = await self.current_state_for_user(user_id) - return filtered_remote_hosts, [state] - else: - # A remote user has joined the room, so we need to: - # 1. Check if this is a new server in the room - # 2. If so send any presence they don't already have for - # local users in the room. - - # TODO: We should be able to filter the users down to those that - # the server hasn't previously seen - - # TODO: Check that this is actually a new server joining the - # room. - - remote_host = get_domain_from_id(user_id) - - users = await self.store.get_users_in_room(room_id) - user_ids = list(filter(self.is_mine_id, users)) - - states_d = await self.current_state_for_users(user_ids) + # Send presence states of all local users in the room to newly joined + # remote servers. (We actually only send states for local users already + # in the room, as we'll send states for newly joined local users below.) + if prev_local_users and newly_joined_remote_hosts: + local_states = await self.current_state_for_users(prev_local_users) # Filter out old presence, i.e. offline presence states where # the user hasn't been active for a week. We can change this @@ -1306,13 +1302,27 @@ class PresenceHandler(BasePresenceHandler): now = self.clock.time_msec() states = [ state - for state in states_d.values() + for state in local_states.values() if state.state != PresenceState.OFFLINE or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000 or state.status_msg is not None ] - return [remote_host], states + self._federation_queue.send_presence_to_destinations( + destinations=newly_joined_remote_hosts, + states=states, + ) + + # Send presence states of newly joined users to all remote servers in + # the room + if newly_joined_local_users and ( + prev_remote_hosts or newly_joined_remote_hosts + ): + local_states = await self.current_state_for_users(newly_joined_local_users) + self._federation_queue.send_presence_to_destinations( + destinations=prev_remote_hosts | newly_joined_remote_hosts, + states=list(local_states.values()), + ) def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool: diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index ce330e79cc..1ffab709fc 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -729,7 +729,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): ) self.assertEqual(expected_state.state, PresenceState.ONLINE) self.federation_sender.send_presence_to_destinations.assert_called_once_with( - destinations=["server2"], states={expected_state} + destinations={"server2"}, states=[expected_state] ) # @@ -740,7 +740,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): self._add_new_user(room_id, "@bob:server3") self.federation_sender.send_presence_to_destinations.assert_called_once_with( - destinations=["server3"], states={expected_state} + destinations={"server3"}, states=[expected_state] ) def test_remote_gets_presence_when_local_user_joins(self): @@ -788,14 +788,8 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): self.presence_handler.current_state_for_user("@test2:server") ) self.assertEqual(expected_state.state, PresenceState.ONLINE) - self.assertEqual( - self.federation_sender.send_presence_to_destinations.call_count, 2 - ) - self.federation_sender.send_presence_to_destinations.assert_any_call( - destinations=["server3"], states={expected_state} - ) - self.federation_sender.send_presence_to_destinations.assert_any_call( - destinations=["server2"], states={expected_state} + self.federation_sender.send_presence_to_destinations.assert_called_once_with( + destinations={"server2", "server3"}, states=[expected_state] ) def _add_new_user(self, room_id, user_id): From d783880083733a694ed4c7b15ca53be00e06f8a7 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 5 May 2021 13:33:05 -0400 Subject: [PATCH 20/21] Include the time of the create event in Spaces Summary. (#9928) This is an update based on changes to MSC2946. The origin_server_ts of the m.room.create event is copied into the creation_ts field for each room returned from the spaces summary. --- changelog.d/9928.bugfix | 1 + synapse/handlers/space_summary.py | 1 + 2 files changed, 2 insertions(+) create mode 100644 changelog.d/9928.bugfix diff --git a/changelog.d/9928.bugfix b/changelog.d/9928.bugfix new file mode 100644 index 0000000000..7b74cd9fb6 --- /dev/null +++ b/changelog.d/9928.bugfix @@ -0,0 +1 @@ +Include the `origin_server_ts` property in the experimental [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946) support to allow clients to properly sort rooms. diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py index d32452747c..2e997841f1 100644 --- a/synapse/handlers/space_summary.py +++ b/synapse/handlers/space_summary.py @@ -347,6 +347,7 @@ class SpaceSummaryHandler: stats["history_visibility"] == HistoryVisibility.WORLD_READABLE ), "guest_can_join": stats["guest_access"] == "can_join", + "creation_ts": create_event.origin_server_ts, "room_type": room_type, } From 70f0ffd2fcd815a065b4734ac606654a2e11dd28 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 5 May 2021 16:31:16 -0400 Subject: [PATCH 21/21] Follow-up to #9915 to correct the identifier for room types. --- synapse/api/constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index bff750e5fb..ab628b2be7 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -176,7 +176,7 @@ class EventContentFields: SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after" # cf https://github.com/matrix-org/matrix-doc/pull/1772 - ROOM_TYPE = "m.type" + ROOM_TYPE = "type" MSC1772_ROOM_TYPE = "org.matrix.msc1772.type"