From c8dd4db9eba5335924046a27c1156f0e18862bdd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 11 Jun 2021 13:08:30 +0100 Subject: [PATCH] Fix sending presence over federation when using workers (#10163) When using a federation sender we'd send out all local presence updates over federation even when they shouldn't be. Fixes #10153. --- changelog.d/10163.bugfix | 1 + synapse/handlers/presence.py | 25 +++++++++++++++++++------ 2 files changed, 20 insertions(+), 6 deletions(-) create mode 100644 changelog.d/10163.bugfix diff --git a/changelog.d/10163.bugfix b/changelog.d/10163.bugfix new file mode 100644 index 0000000000..7ccde66743 --- /dev/null +++ b/changelog.d/10163.bugfix @@ -0,0 +1 @@ +Fix a bug when using federation sender worker where it would send out more presence updates than necessary, leading to high resource usage. Broke in v1.33.0. diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index f5a049d754..79508580ac 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -495,9 +495,6 @@ class WorkerPresenceHandler(BasePresenceHandler): users=users_to_states.keys(), ) - # If this is a federation sender, notify about presence updates. - await self.maybe_send_presence_to_interested_destinations(states) - async def process_replication_rows( self, stream_name: str, instance_name: str, token: int, rows: list ): @@ -519,11 +516,27 @@ class WorkerPresenceHandler(BasePresenceHandler): for row in rows ] - for state in states: - self.user_to_current_state[state.user_id] = state + # The list of states to notify sync streams and remote servers about. + # This is calculated by comparing the old and new states for each user + # using `should_notify(..)`. + # + # Note that this is necessary as the presence writer will periodically + # flush presence state changes that should not be notified about to the + # DB, and so will be sent over the replication stream. + state_to_notify = [] + + for new_state in states: + old_state = self.user_to_current_state.get(new_state.user_id) + self.user_to_current_state[new_state.user_id] = new_state + + if not old_state or should_notify(old_state, new_state): + state_to_notify.append(new_state) stream_id = token - await self.notify_from_replication(states, stream_id) + await self.notify_from_replication(state_to_notify, stream_id) + + # If this is a federation sender, notify about presence updates. + await self.maybe_send_presence_to_interested_destinations(state_to_notify) def get_currently_syncing_users_for_replication(self) -> Iterable[str]: return [