Fix sending presence over federation when using workers (#10163)
When using a federation sender we'd send out all local presence updates over federation even when they shouldn't be. Fixes #10153.pull/10164/head
parent
a15a046c93
commit
c8dd4db9eb
|
@ -0,0 +1 @@
|
||||||
|
Fix a bug when using federation sender worker where it would send out more presence updates than necessary, leading to high resource usage. Broke in v1.33.0.
|
|
@ -495,9 +495,6 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||||
users=users_to_states.keys(),
|
users=users_to_states.keys(),
|
||||||
)
|
)
|
||||||
|
|
||||||
# If this is a federation sender, notify about presence updates.
|
|
||||||
await self.maybe_send_presence_to_interested_destinations(states)
|
|
||||||
|
|
||||||
async def process_replication_rows(
|
async def process_replication_rows(
|
||||||
self, stream_name: str, instance_name: str, token: int, rows: list
|
self, stream_name: str, instance_name: str, token: int, rows: list
|
||||||
):
|
):
|
||||||
|
@ -519,11 +516,27 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||||
for row in rows
|
for row in rows
|
||||||
]
|
]
|
||||||
|
|
||||||
for state in states:
|
# The list of states to notify sync streams and remote servers about.
|
||||||
self.user_to_current_state[state.user_id] = state
|
# This is calculated by comparing the old and new states for each user
|
||||||
|
# using `should_notify(..)`.
|
||||||
|
#
|
||||||
|
# Note that this is necessary as the presence writer will periodically
|
||||||
|
# flush presence state changes that should not be notified about to the
|
||||||
|
# DB, and so will be sent over the replication stream.
|
||||||
|
state_to_notify = []
|
||||||
|
|
||||||
|
for new_state in states:
|
||||||
|
old_state = self.user_to_current_state.get(new_state.user_id)
|
||||||
|
self.user_to_current_state[new_state.user_id] = new_state
|
||||||
|
|
||||||
|
if not old_state or should_notify(old_state, new_state):
|
||||||
|
state_to_notify.append(new_state)
|
||||||
|
|
||||||
stream_id = token
|
stream_id = token
|
||||||
await self.notify_from_replication(states, stream_id)
|
await self.notify_from_replication(state_to_notify, stream_id)
|
||||||
|
|
||||||
|
# If this is a federation sender, notify about presence updates.
|
||||||
|
await self.maybe_send_presence_to_interested_destinations(state_to_notify)
|
||||||
|
|
||||||
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
|
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
|
||||||
return [
|
return [
|
||||||
|
|
Loading…
Reference in New Issue