Add infrastructure to the presence handler to track sync requests in external processes

pull/812/head
Mark Haines 2016-06-01 15:57:46 +01:00
parent 00c487a8db
commit b06b10c8e3
2 changed files with 72 additions and 20 deletions

View File

@ -158,10 +158,21 @@ class PresenceHandler(object):
self.serial_to_user = {} self.serial_to_user = {}
self._next_serial = 1 self._next_serial = 1
# Keeps track of the number of *ongoing* syncs. While this is non zero # Keeps track of the number of *ongoing* syncs on this process. While
# a user will never go offline. # this is non zero a user will never go offline.
self.user_to_num_current_syncs = {} self.user_to_num_current_syncs = {}
# Keeps track of the number of *ongoing* syncs on other processes.
# While any sync is ongoing on another process the user will never
# go offline.
# Each process has a unique identifier and an update frequency. If
# no update is received from that process within the update period then
# we assume that all the sync requests on that process have stopped.
# Stored as a dict from process_id to set of user_id, and a dict of
# process_id to millisecond timestamp last updated.
self.external_process_to_current_syncs = {}
self.external_process_last_updated_ms = []
# Start a LoopingCall in 30s that fires every 5s. # Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to # The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline. # reconnect before we treat them as offline.
@ -286,7 +297,7 @@ class PresenceHandler(object):
changes = handle_timeouts( changes = handle_timeouts(
states, states,
is_mine_fn=self.is_mine_id, is_mine_fn=self.is_mine_id,
user_to_num_current_syncs=self.user_to_num_current_syncs, syncing_users=self.get_syncing_users(),
now=now, now=now,
) )
@ -363,6 +374,51 @@ class PresenceHandler(object):
defer.returnValue(_user_syncing()) defer.returnValue(_user_syncing())
def get_currently_syncing_users(self):
syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
}
syncing_user_ids.update(self.external_process_to_current_syncs.values())
return syncing_user_ids
@defer.inlineCallbacks
def update_external_syncs(self, process_id, syncing_user_ids):
time_now_ms = self.clock.time_msec()
prev_syncing_user_ids = (
self.external_process_to_current_syncs.get(process_id, set())
)
prev_states = yield self.current_state_for_users(
syncing_user_ids + prev_syncing_user_ids
)
updates = []
for new_user_id in syncing_user_ids - prev_syncing_user_ids:
prev_state = prev_states[new_user_id]
if prev_state.state == PresenceState.OFFLINE:
updates.append(prev_state.copy_and_replace(
state=PresenceState.ONLINE,
last_active_ts=time_now_ms,
last_user_sync_ts=time_now_ms,
))
else:
updates.append(prev_state.copy_and_replace(
last_user_sync_ts=time_now_ms,
))
for old_user_id in prev_syncing_user_ids:
prev_state = prev_states[old_user_id]
updates.append(prev_state.copy_and_replace(
last_user_sync_ts=time_now_ms,
))
yield self._update_states(updates)
self.external_process_last_updated_ms = time_now_ms
self.external_process_to_current_syncs[process_id] = syncing_user_ids
if not syncing_user_ids:
del self.external_process_to_current_syncs[process_id]
@defer.inlineCallbacks @defer.inlineCallbacks
def current_state_for_user(self, user_id): def current_state_for_user(self, user_id):
"""Get the current presence state for a user. """Get the current presence state for a user.
@ -935,15 +991,14 @@ class PresenceEventSource(object):
return self.get_new_events(user, from_key=None, include_offline=False) return self.get_new_events(user, from_key=None, include_offline=False)
def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now): def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
"""Checks the presence of users that have timed out and updates as """Checks the presence of users that have timed out and updates as
appropriate. appropriate.
Args: Args:
user_states(list): List of UserPresenceState's to check. user_states(list): List of UserPresenceState's to check.
is_mine_fn (fn): Function that returns if a user_id is ours is_mine_fn (fn): Function that returns if a user_id is ours
user_to_num_current_syncs (dict): Mapping of user_id to number of currently syncing_user_ids (set): Set of user_ids with active syncs.
active syncs.
now (int): Current time in ms. now (int): Current time in ms.
Returns: Returns:
@ -954,21 +1009,20 @@ def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
for state in user_states: for state in user_states:
is_mine = is_mine_fn(state.user_id) is_mine = is_mine_fn(state.user_id)
new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now) new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
if new_state: if new_state:
changes[state.user_id] = new_state changes[state.user_id] = new_state
return changes.values() return changes.values()
def handle_timeout(state, is_mine, user_to_num_current_syncs, now): def handle_timeout(state, is_mine, syncing_user_ids, now):
"""Checks the presence of the user to see if any of the timers have elapsed """Checks the presence of the user to see if any of the timers have elapsed
Args: Args:
state (UserPresenceState) state (UserPresenceState)
is_mine (bool): Whether the user is ours is_mine (bool): Whether the user is ours
user_to_num_current_syncs (dict): Mapping of user_id to number of currently syncing_user_ids (set): Set of user_ids with active syncs.
active syncs.
now (int): Current time in ms. now (int): Current time in ms.
Returns: Returns:
@ -1002,7 +1056,7 @@ def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
# If there are have been no sync for a while (and none ongoing), # If there are have been no sync for a while (and none ongoing),
# set presence to offline # set presence to offline
if not user_to_num_current_syncs.get(user_id, 0): if user_id not in syncing_user_ids:
if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT: if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
state = state.copy_and_replace( state = state.copy_and_replace(
state=PresenceState.OFFLINE, state=PresenceState.OFFLINE,

View File

@ -264,7 +264,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
) )
new_state = handle_timeout( new_state = handle_timeout(
state, is_mine=True, user_to_num_current_syncs={}, now=now state, is_mine=True, syncing_user_ids=set(), now=now
) )
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)
@ -282,7 +282,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
) )
new_state = handle_timeout( new_state = handle_timeout(
state, is_mine=True, user_to_num_current_syncs={}, now=now state, is_mine=True, syncing_user_ids=set(), now=now
) )
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)
@ -300,9 +300,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
) )
new_state = handle_timeout( new_state = handle_timeout(
state, is_mine=True, user_to_num_current_syncs={ state, is_mine=True, syncing_user_ids=set([user_id]), now=now
user_id: 1,
}, now=now
) )
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)
@ -321,7 +319,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
) )
new_state = handle_timeout( new_state = handle_timeout(
state, is_mine=True, user_to_num_current_syncs={}, now=now state, is_mine=True, syncing_user_ids=set(), now=now
) )
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)
@ -340,7 +338,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
) )
new_state = handle_timeout( new_state = handle_timeout(
state, is_mine=True, user_to_num_current_syncs={}, now=now state, is_mine=True, syncing_user_ids=set(), now=now
) )
self.assertIsNone(new_state) self.assertIsNone(new_state)
@ -358,7 +356,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
) )
new_state = handle_timeout( new_state = handle_timeout(
state, is_mine=False, user_to_num_current_syncs={}, now=now state, is_mine=False, syncing_user_ids=set(), now=now
) )
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)
@ -377,7 +375,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
) )
new_state = handle_timeout( new_state = handle_timeout(
state, is_mine=True, user_to_num_current_syncs={}, now=now state, is_mine=True, syncing_user_ids=set(), now=now
) )
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)