Combine logic about not overriding BUSY presence. (#16170)
Simplify some of the presence code by reducing duplicated code between worker & non-worker modes. The main change is to push some of the logic from `user_syncing` into `set_state`. This is done by passing whether the user is setting the presence via a `/sync` with a new `is_sync` flag to `set_state`. If this is `true` some additional logic is performed: * Don't override `busy` presence. * Update the `last_user_sync_ts`. * Never update the status message.pull/16204/head
parent
501da8ecd8
commit
1bf143699c
|
@ -0,0 +1 @@
|
|||
Simplify presence code when using workers.
|
|
@ -151,15 +151,13 @@ class BasePresenceHandler(abc.ABC):
|
|||
|
||||
self._federation_queue = PresenceFederationQueue(hs, self)
|
||||
|
||||
self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
|
||||
|
||||
self.VALID_PRESENCE: Tuple[str, ...] = (
|
||||
PresenceState.ONLINE,
|
||||
PresenceState.UNAVAILABLE,
|
||||
PresenceState.OFFLINE,
|
||||
)
|
||||
|
||||
if self._busy_presence_enabled:
|
||||
if hs.config.experimental.msc3026_enabled:
|
||||
self.VALID_PRESENCE += (PresenceState.BUSY,)
|
||||
|
||||
active_presence = self.store.take_presence_startup_info()
|
||||
|
@ -255,17 +253,19 @@ class BasePresenceHandler(abc.ABC):
|
|||
self,
|
||||
target_user: UserID,
|
||||
state: JsonDict,
|
||||
ignore_status_msg: bool = False,
|
||||
force_notify: bool = False,
|
||||
is_sync: bool = False,
|
||||
) -> None:
|
||||
"""Set the presence state of the user.
|
||||
|
||||
Args:
|
||||
target_user: The ID of the user to set the presence state of.
|
||||
state: The presence state as a JSON dictionary.
|
||||
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
|
||||
If False, the user's current status will be updated.
|
||||
force_notify: Whether to force notification of the update to clients.
|
||||
is_sync: True if this update was from a sync, which results in
|
||||
*not* overriding a previously set BUSY status, updating the
|
||||
user's last_user_sync_ts, and ignoring the "status_msg" field of
|
||||
the `state` dict.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
|
@ -491,23 +491,18 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|||
if not affect_presence or not self._presence_enabled:
|
||||
return _NullContextManager()
|
||||
|
||||
prev_state = await self.current_state_for_user(user_id)
|
||||
if prev_state.state != PresenceState.BUSY:
|
||||
# We set state here but pass ignore_status_msg = True as we don't want to
|
||||
# cause the status message to be cleared.
|
||||
# Note that this causes last_active_ts to be incremented which is not
|
||||
# what the spec wants: see comment in the BasePresenceHandler version
|
||||
# of this function.
|
||||
await self.set_state(
|
||||
UserID.from_string(user_id),
|
||||
{"presence": presence_state},
|
||||
ignore_status_msg=True,
|
||||
)
|
||||
# Note that this causes last_active_ts to be incremented which is not
|
||||
# what the spec wants.
|
||||
await self.set_state(
|
||||
UserID.from_string(user_id),
|
||||
state={"presence": presence_state},
|
||||
is_sync=True,
|
||||
)
|
||||
|
||||
curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
|
||||
self._user_to_num_current_syncs[user_id] = curr_sync + 1
|
||||
|
||||
# If we went from no in flight sync to some, notify replication
|
||||
# If this is the first in-flight sync, notify replication
|
||||
if self._user_to_num_current_syncs[user_id] == 1:
|
||||
self.mark_as_coming_online(user_id)
|
||||
|
||||
|
@ -518,7 +513,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|||
if user_id in self._user_to_num_current_syncs:
|
||||
self._user_to_num_current_syncs[user_id] -= 1
|
||||
|
||||
# If we went from one in flight sync to non, notify replication
|
||||
# If there are no more in-flight syncs, notify replication
|
||||
if self._user_to_num_current_syncs[user_id] == 0:
|
||||
self.mark_as_going_offline(user_id)
|
||||
|
||||
|
@ -598,17 +593,19 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|||
self,
|
||||
target_user: UserID,
|
||||
state: JsonDict,
|
||||
ignore_status_msg: bool = False,
|
||||
force_notify: bool = False,
|
||||
is_sync: bool = False,
|
||||
) -> None:
|
||||
"""Set the presence state of the user.
|
||||
|
||||
Args:
|
||||
target_user: The ID of the user to set the presence state of.
|
||||
state: The presence state as a JSON dictionary.
|
||||
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
|
||||
If False, the user's current status will be updated.
|
||||
force_notify: Whether to force notification of the update to clients.
|
||||
is_sync: True if this update was from a sync, which results in
|
||||
*not* overriding a previously set BUSY status, updating the
|
||||
user's last_user_sync_ts, and ignoring the "status_msg" field of
|
||||
the `state` dict.
|
||||
"""
|
||||
presence = state["presence"]
|
||||
|
||||
|
@ -626,8 +623,8 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|||
instance_name=self._presence_writer_instance,
|
||||
user_id=user_id,
|
||||
state=state,
|
||||
ignore_status_msg=ignore_status_msg,
|
||||
force_notify=force_notify,
|
||||
is_sync=is_sync,
|
||||
)
|
||||
|
||||
async def bump_presence_active_time(self, user: UserID) -> None:
|
||||
|
@ -992,45 +989,13 @@ class PresenceHandler(BasePresenceHandler):
|
|||
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
|
||||
self.user_to_num_current_syncs[user_id] = curr_sync + 1
|
||||
|
||||
prev_state = await self.current_state_for_user(user_id)
|
||||
|
||||
# If they're busy then they don't stop being busy just by syncing,
|
||||
# so just update the last sync time.
|
||||
if prev_state.state != PresenceState.BUSY:
|
||||
# XXX: We set_state separately here and just update the last_active_ts above
|
||||
# This keeps the logic as similar as possible between the worker and single
|
||||
# process modes. Using set_state will actually cause last_active_ts to be
|
||||
# updated always, which is not what the spec calls for, but synapse has done
|
||||
# this for... forever, I think.
|
||||
await self.set_state(
|
||||
UserID.from_string(user_id),
|
||||
{"presence": presence_state},
|
||||
ignore_status_msg=True,
|
||||
)
|
||||
# Retrieve the new state for the logic below. This should come from the
|
||||
# in-memory cache.
|
||||
prev_state = await self.current_state_for_user(user_id)
|
||||
|
||||
# To keep the single process behaviour consistent with worker mode, run the
|
||||
# same logic as `update_external_syncs_row`, even though it looks weird.
|
||||
if prev_state.state == PresenceState.OFFLINE:
|
||||
await self._update_states(
|
||||
[
|
||||
prev_state.copy_and_replace(
|
||||
state=PresenceState.ONLINE,
|
||||
last_active_ts=self.clock.time_msec(),
|
||||
last_user_sync_ts=self.clock.time_msec(),
|
||||
)
|
||||
]
|
||||
)
|
||||
# otherwise, set the new presence state & update the last sync time,
|
||||
# but don't update last_active_ts as this isn't an indication that
|
||||
# they've been active (even though it's probably been updated by
|
||||
# set_state above)
|
||||
else:
|
||||
await self._update_states(
|
||||
[prev_state.copy_and_replace(last_user_sync_ts=self.clock.time_msec())]
|
||||
)
|
||||
# Note that this causes last_active_ts to be incremented which is not
|
||||
# what the spec wants.
|
||||
await self.set_state(
|
||||
UserID.from_string(user_id),
|
||||
state={"presence": presence_state},
|
||||
is_sync=True,
|
||||
)
|
||||
|
||||
async def _end() -> None:
|
||||
try:
|
||||
|
@ -1080,32 +1045,27 @@ class PresenceHandler(BasePresenceHandler):
|
|||
process_id, set()
|
||||
)
|
||||
|
||||
updates = []
|
||||
# USER_SYNC is sent when a user starts or stops syncing on a remote
|
||||
# process. (But only for the initial and last device.)
|
||||
#
|
||||
# When a user *starts* syncing it also calls set_state(...) which
|
||||
# will update the state, last_active_ts, and last_user_sync_ts.
|
||||
# Simply ensure the user is tracked as syncing in this case.
|
||||
#
|
||||
# When a user *stops* syncing, update the last_user_sync_ts and mark
|
||||
# them as no longer syncing. Note this doesn't quite match the
|
||||
# monolith behaviour, which updates last_user_sync_ts at the end of
|
||||
# every sync, not just the last in-flight sync.
|
||||
if is_syncing and user_id not in process_presence:
|
||||
if prev_state.state == PresenceState.OFFLINE:
|
||||
updates.append(
|
||||
prev_state.copy_and_replace(
|
||||
state=PresenceState.ONLINE,
|
||||
last_active_ts=sync_time_msec,
|
||||
last_user_sync_ts=sync_time_msec,
|
||||
)
|
||||
)
|
||||
else:
|
||||
updates.append(
|
||||
prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
|
||||
)
|
||||
process_presence.add(user_id)
|
||||
elif user_id in process_presence:
|
||||
updates.append(
|
||||
prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
|
||||
elif not is_syncing and user_id in process_presence:
|
||||
new_state = prev_state.copy_and_replace(
|
||||
last_user_sync_ts=sync_time_msec
|
||||
)
|
||||
await self._update_states([new_state])
|
||||
|
||||
if not is_syncing:
|
||||
process_presence.discard(user_id)
|
||||
|
||||
if updates:
|
||||
await self._update_states(updates)
|
||||
|
||||
self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
|
||||
|
||||
async def update_external_syncs_clear(self, process_id: str) -> None:
|
||||
|
@ -1204,17 +1164,19 @@ class PresenceHandler(BasePresenceHandler):
|
|||
self,
|
||||
target_user: UserID,
|
||||
state: JsonDict,
|
||||
ignore_status_msg: bool = False,
|
||||
force_notify: bool = False,
|
||||
is_sync: bool = False,
|
||||
) -> None:
|
||||
"""Set the presence state of the user.
|
||||
|
||||
Args:
|
||||
target_user: The ID of the user to set the presence state of.
|
||||
state: The presence state as a JSON dictionary.
|
||||
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
|
||||
If False, the user's current status will be updated.
|
||||
force_notify: Whether to force notification of the update to clients.
|
||||
is_sync: True if this update was from a sync, which results in
|
||||
*not* overriding a previously set BUSY status, updating the
|
||||
user's last_user_sync_ts, and ignoring the "status_msg" field of
|
||||
the `state` dict.
|
||||
"""
|
||||
status_msg = state.get("status_msg", None)
|
||||
presence = state["presence"]
|
||||
|
@ -1227,18 +1189,27 @@ class PresenceHandler(BasePresenceHandler):
|
|||
return
|
||||
|
||||
user_id = target_user.to_string()
|
||||
now = self.clock.time_msec()
|
||||
|
||||
prev_state = await self.current_state_for_user(user_id)
|
||||
|
||||
# Syncs do not override a previous presence of busy.
|
||||
#
|
||||
# TODO: This is a hack for lack of multi-device support. Unfortunately
|
||||
# removing this requires coordination with clients.
|
||||
if prev_state.state == PresenceState.BUSY and is_sync:
|
||||
presence = PresenceState.BUSY
|
||||
|
||||
new_fields = {"state": presence}
|
||||
|
||||
if not ignore_status_msg:
|
||||
new_fields["status_msg"] = status_msg
|
||||
if presence == PresenceState.ONLINE or presence == PresenceState.BUSY:
|
||||
new_fields["last_active_ts"] = now
|
||||
|
||||
if presence == PresenceState.ONLINE or (
|
||||
presence == PresenceState.BUSY and self._busy_presence_enabled
|
||||
):
|
||||
new_fields["last_active_ts"] = self.clock.time_msec()
|
||||
if is_sync:
|
||||
new_fields["last_user_sync_ts"] = now
|
||||
else:
|
||||
# Syncs do not override the status message.
|
||||
new_fields["status_msg"] = status_msg
|
||||
|
||||
await self._update_states(
|
||||
[prev_state.copy_and_replace(**new_fields)], force_notify=force_notify
|
||||
|
|
|
@ -73,8 +73,8 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
|
|||
|
||||
{
|
||||
"state": { ... },
|
||||
"ignore_status_msg": false,
|
||||
"force_notify": false
|
||||
"force_notify": false,
|
||||
"is_sync": false
|
||||
}
|
||||
|
||||
200 OK
|
||||
|
@ -96,13 +96,13 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
|
|||
async def _serialize_payload( # type: ignore[override]
|
||||
user_id: str,
|
||||
state: JsonDict,
|
||||
ignore_status_msg: bool = False,
|
||||
force_notify: bool = False,
|
||||
is_sync: bool = False,
|
||||
) -> JsonDict:
|
||||
return {
|
||||
"state": state,
|
||||
"ignore_status_msg": ignore_status_msg,
|
||||
"force_notify": force_notify,
|
||||
"is_sync": is_sync,
|
||||
}
|
||||
|
||||
async def _handle_request( # type: ignore[override]
|
||||
|
@ -111,8 +111,8 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
|
|||
await self._presence_handler.set_state(
|
||||
UserID.from_string(user_id),
|
||||
content["state"],
|
||||
content["ignore_status_msg"],
|
||||
content["force_notify"],
|
||||
content.get("is_sync", False),
|
||||
)
|
||||
|
||||
return (200, {})
|
||||
|
|
|
@ -641,13 +641,20 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
|
|||
"""Test that if an external process doesn't update the records for a while
|
||||
we time out their syncing users presence.
|
||||
"""
|
||||
process_id = "1"
|
||||
|
||||
# Notify handler that a user is now syncing.
|
||||
# Create a worker and use it to handle /sync traffic instead.
|
||||
# This is used to test that presence changes get replicated from workers
|
||||
# to the main process correctly.
|
||||
worker_to_sync_against = self.make_worker_hs(
|
||||
"synapse.app.generic_worker", {"worker_name": "synchrotron"}
|
||||
)
|
||||
worker_presence_handler = worker_to_sync_against.get_presence_handler()
|
||||
|
||||
self.get_success(
|
||||
self.presence_handler.update_external_syncs_row(
|
||||
process_id, self.user_id, True, self.clock.time_msec()
|
||||
)
|
||||
worker_presence_handler.user_syncing(
|
||||
self.user_id, True, PresenceState.ONLINE
|
||||
),
|
||||
by=0.1,
|
||||
)
|
||||
|
||||
# Check that if we wait a while without telling the handler the user has
|
||||
|
@ -820,7 +827,7 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
|
|||
# This is used to test that presence changes get replicated from workers
|
||||
# to the main process correctly.
|
||||
worker_to_sync_against = self.make_worker_hs(
|
||||
"synapse.app.generic_worker", {"worker_name": "presence_writer"}
|
||||
"synapse.app.generic_worker", {"worker_name": "synchrotron"}
|
||||
)
|
||||
|
||||
# Set presence to BUSY
|
||||
|
@ -832,7 +839,8 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
|
|||
self.get_success(
|
||||
worker_to_sync_against.get_presence_handler().user_syncing(
|
||||
self.user_id, True, PresenceState.ONLINE
|
||||
)
|
||||
),
|
||||
by=0.1,
|
||||
)
|
||||
|
||||
# Check against the main process that the user's presence did not change.
|
||||
|
@ -840,6 +848,21 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
|
|||
# we should still be busy
|
||||
self.assertEqual(state.state, PresenceState.BUSY)
|
||||
|
||||
# Advance such that the device would be discarded if it was not busy,
|
||||
# then pump so _handle_timeouts function to called.
|
||||
self.reactor.advance(IDLE_TIMER / 1000)
|
||||
self.reactor.pump([5])
|
||||
|
||||
# The account should still be busy.
|
||||
state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
|
||||
self.assertEqual(state.state, PresenceState.BUSY)
|
||||
|
||||
# Ensure that a /presence call can set the user *off* busy.
|
||||
self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
|
||||
|
||||
state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
|
||||
self.assertEqual(state.state, PresenceState.ONLINE)
|
||||
|
||||
def _set_presencestate_with_status_msg(
|
||||
self, state: str, status_msg: Optional[str]
|
||||
) -> None:
|
||||
|
|
Loading…
Reference in New Issue