Fix presence timeouts when synchrotron restarts. (#6212)
* Fix presence timeouts when synchrotron restarts. Handling timeouts would fail if there was an external process that had timed out, e.g. a synchrotron restarting. This was due to a couple of variable name typoes. Fixes #3715.pull/6217/head
parent
6fb0a3da07
commit
5859a5c569
|
@ -0,0 +1 @@
|
||||||
|
Fix bug where presence would not get timed out correctly if a synchrotron worker is used and restarted.
|
|
@ -24,6 +24,7 @@ The methods that define policy are:
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
from typing import Dict, Set
|
||||||
|
|
||||||
from six import iteritems, itervalues
|
from six import iteritems, itervalues
|
||||||
|
|
||||||
|
@ -179,8 +180,9 @@ class PresenceHandler(object):
|
||||||
# we assume that all the sync requests on that process have stopped.
|
# we assume that all the sync requests on that process have stopped.
|
||||||
# Stored as a dict from process_id to set of user_id, and a dict of
|
# Stored as a dict from process_id to set of user_id, and a dict of
|
||||||
# process_id to millisecond timestamp last updated.
|
# process_id to millisecond timestamp last updated.
|
||||||
self.external_process_to_current_syncs = {}
|
self.external_process_to_current_syncs = {} # type: Dict[int, Set[str]]
|
||||||
self.external_process_last_updated_ms = {}
|
self.external_process_last_updated_ms = {} # type: Dict[int, int]
|
||||||
|
|
||||||
self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
|
self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
|
||||||
|
|
||||||
# Start a LoopingCall in 30s that fires every 5s.
|
# Start a LoopingCall in 30s that fires every 5s.
|
||||||
|
@ -349,10 +351,13 @@ class PresenceHandler(object):
|
||||||
if now - last_update > EXTERNAL_PROCESS_EXPIRY
|
if now - last_update > EXTERNAL_PROCESS_EXPIRY
|
||||||
]
|
]
|
||||||
for process_id in expired_process_ids:
|
for process_id in expired_process_ids:
|
||||||
|
# For each expired process drop tracking info and check the users
|
||||||
|
# that were syncing on that process to see if they need to be timed
|
||||||
|
# out.
|
||||||
users_to_check.update(
|
users_to_check.update(
|
||||||
self.external_process_last_updated_ms.pop(process_id, ())
|
self.external_process_to_current_syncs.pop(process_id, ())
|
||||||
)
|
)
|
||||||
self.external_process_last_update.pop(process_id)
|
self.external_process_last_updated_ms.pop(process_id)
|
||||||
|
|
||||||
states = [
|
states = [
|
||||||
self.user_to_current_state.get(user_id, UserPresenceState.default(user_id))
|
self.user_to_current_state.get(user_id, UserPresenceState.default(user_id))
|
||||||
|
|
|
@ -22,6 +22,7 @@ from synapse.api.constants import EventTypes, Membership, PresenceState
|
||||||
from synapse.events import room_version_to_event_format
|
from synapse.events import room_version_to_event_format
|
||||||
from synapse.events.builder import EventBuilder
|
from synapse.events.builder import EventBuilder
|
||||||
from synapse.handlers.presence import (
|
from synapse.handlers.presence import (
|
||||||
|
EXTERNAL_PROCESS_EXPIRY,
|
||||||
FEDERATION_PING_INTERVAL,
|
FEDERATION_PING_INTERVAL,
|
||||||
FEDERATION_TIMEOUT,
|
FEDERATION_TIMEOUT,
|
||||||
IDLE_TIMER,
|
IDLE_TIMER,
|
||||||
|
@ -413,6 +414,44 @@ class PresenceTimeoutTestCase(unittest.TestCase):
|
||||||
self.assertEquals(state, new_state)
|
self.assertEquals(state, new_state)
|
||||||
|
|
||||||
|
|
||||||
|
class PresenceHandlerTestCase(unittest.HomeserverTestCase):
|
||||||
|
def prepare(self, reactor, clock, hs):
|
||||||
|
self.presence_handler = hs.get_presence_handler()
|
||||||
|
self.clock = hs.get_clock()
|
||||||
|
|
||||||
|
def test_external_process_timeout(self):
|
||||||
|
"""Test that if an external process doesn't update the records for a while
|
||||||
|
we time out their syncing users presence.
|
||||||
|
"""
|
||||||
|
process_id = 1
|
||||||
|
user_id = "@test:server"
|
||||||
|
|
||||||
|
# Notify handler that a user is now syncing.
|
||||||
|
self.get_success(
|
||||||
|
self.presence_handler.update_external_syncs_row(
|
||||||
|
process_id, user_id, True, self.clock.time_msec()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check that if we wait a while without telling the handler the user has
|
||||||
|
# stopped syncing that their presence state doesn't get timed out.
|
||||||
|
self.reactor.advance(EXTERNAL_PROCESS_EXPIRY / 2)
|
||||||
|
|
||||||
|
state = self.get_success(
|
||||||
|
self.presence_handler.get_state(UserID.from_string(user_id))
|
||||||
|
)
|
||||||
|
self.assertEqual(state.state, PresenceState.ONLINE)
|
||||||
|
|
||||||
|
# Check that if the external process timeout fires, then the syncing
|
||||||
|
# user gets timed out
|
||||||
|
self.reactor.advance(EXTERNAL_PROCESS_EXPIRY)
|
||||||
|
|
||||||
|
state = self.get_success(
|
||||||
|
self.presence_handler.get_state(UserID.from_string(user_id))
|
||||||
|
)
|
||||||
|
self.assertEqual(state.state, PresenceState.OFFLINE)
|
||||||
|
|
||||||
|
|
||||||
class PresenceJoinTestCase(unittest.HomeserverTestCase):
|
class PresenceJoinTestCase(unittest.HomeserverTestCase):
|
||||||
"""Tests remote servers get told about presence of users in the room when
|
"""Tests remote servers get told about presence of users in the room when
|
||||||
they join and when new local users join.
|
they join and when new local users join.
|
||||||
|
|
Loading…
Reference in New Issue