Fix "db txn 'update_presence' from sentinel context" log messages (#5275)
Fixes #4414.pull/5281/head
parent
a97d4e218a
commit
5726378ece
|
@ -0,0 +1 @@
|
|||
Fix "db txn 'update_presence' from sentinel context" log messages.
|
|
@ -182,17 +182,27 @@ class PresenceHandler(object):
|
|||
# Start a LoopingCall in 30s that fires every 5s.
|
||||
# The initial delay is to allow disconnected clients a chance to
|
||||
# reconnect before we treat them as offline.
|
||||
def run_timeout_handler():
|
||||
return run_as_background_process(
|
||||
"handle_presence_timeouts", self._handle_timeouts
|
||||
)
|
||||
|
||||
self.clock.call_later(
|
||||
30,
|
||||
self.clock.looping_call,
|
||||
self._handle_timeouts,
|
||||
run_timeout_handler,
|
||||
5000,
|
||||
)
|
||||
|
||||
def run_persister():
|
||||
return run_as_background_process(
|
||||
"persist_presence_changes", self._persist_unpersisted_changes
|
||||
)
|
||||
|
||||
self.clock.call_later(
|
||||
60,
|
||||
self.clock.looping_call,
|
||||
self._persist_unpersisted_changes,
|
||||
run_persister,
|
||||
60 * 1000,
|
||||
)
|
||||
|
||||
|
@ -229,6 +239,7 @@ class PresenceHandler(object):
|
|||
)
|
||||
|
||||
if self.unpersisted_users_changes:
|
||||
|
||||
yield self.store.update_presence([
|
||||
self.user_to_current_state[user_id]
|
||||
for user_id in self.unpersisted_users_changes
|
||||
|
@ -240,30 +251,18 @@ class PresenceHandler(object):
|
|||
"""We periodically persist the unpersisted changes, as otherwise they
|
||||
may stack up and slow down shutdown times.
|
||||
"""
|
||||
logger.info(
|
||||
"Performing _persist_unpersisted_changes. Persisting %d unpersisted changes",
|
||||
len(self.unpersisted_users_changes)
|
||||
)
|
||||
|
||||
unpersisted = self.unpersisted_users_changes
|
||||
self.unpersisted_users_changes = set()
|
||||
|
||||
if unpersisted:
|
||||
logger.info(
|
||||
"Persisting %d upersisted presence updates", len(unpersisted)
|
||||
)
|
||||
yield self.store.update_presence([
|
||||
self.user_to_current_state[user_id]
|
||||
for user_id in unpersisted
|
||||
])
|
||||
|
||||
logger.info("Finished _persist_unpersisted_changes")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _update_states_and_catch_exception(self, new_states):
|
||||
try:
|
||||
res = yield self._update_states(new_states)
|
||||
defer.returnValue(res)
|
||||
except Exception:
|
||||
logger.exception("Error updating presence")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _update_states(self, new_states):
|
||||
"""Updates presence of users. Sets the appropriate timeouts. Pokes
|
||||
|
@ -338,45 +337,41 @@ class PresenceHandler(object):
|
|||
logger.info("Handling presence timeouts")
|
||||
now = self.clock.time_msec()
|
||||
|
||||
try:
|
||||
with Measure(self.clock, "presence_handle_timeouts"):
|
||||
# Fetch the list of users that *may* have timed out. Things may have
|
||||
# changed since the timeout was set, so we won't necessarily have to
|
||||
# take any action.
|
||||
users_to_check = set(self.wheel_timer.fetch(now))
|
||||
# Fetch the list of users that *may* have timed out. Things may have
|
||||
# changed since the timeout was set, so we won't necessarily have to
|
||||
# take any action.
|
||||
users_to_check = set(self.wheel_timer.fetch(now))
|
||||
|
||||
# Check whether the lists of syncing processes from an external
|
||||
# process have expired.
|
||||
expired_process_ids = [
|
||||
process_id for process_id, last_update
|
||||
in self.external_process_last_updated_ms.items()
|
||||
if now - last_update > EXTERNAL_PROCESS_EXPIRY
|
||||
]
|
||||
for process_id in expired_process_ids:
|
||||
users_to_check.update(
|
||||
self.external_process_last_updated_ms.pop(process_id, ())
|
||||
)
|
||||
self.external_process_last_update.pop(process_id)
|
||||
# Check whether the lists of syncing processes from an external
|
||||
# process have expired.
|
||||
expired_process_ids = [
|
||||
process_id for process_id, last_update
|
||||
in self.external_process_last_updated_ms.items()
|
||||
if now - last_update > EXTERNAL_PROCESS_EXPIRY
|
||||
]
|
||||
for process_id in expired_process_ids:
|
||||
users_to_check.update(
|
||||
self.external_process_last_updated_ms.pop(process_id, ())
|
||||
)
|
||||
self.external_process_last_update.pop(process_id)
|
||||
|
||||
states = [
|
||||
self.user_to_current_state.get(
|
||||
user_id, UserPresenceState.default(user_id)
|
||||
)
|
||||
for user_id in users_to_check
|
||||
]
|
||||
states = [
|
||||
self.user_to_current_state.get(
|
||||
user_id, UserPresenceState.default(user_id)
|
||||
)
|
||||
for user_id in users_to_check
|
||||
]
|
||||
|
||||
timers_fired_counter.inc(len(states))
|
||||
timers_fired_counter.inc(len(states))
|
||||
|
||||
changes = handle_timeouts(
|
||||
states,
|
||||
is_mine_fn=self.is_mine_id,
|
||||
syncing_user_ids=self.get_currently_syncing_users(),
|
||||
now=now,
|
||||
)
|
||||
changes = handle_timeouts(
|
||||
states,
|
||||
is_mine_fn=self.is_mine_id,
|
||||
syncing_user_ids=self.get_currently_syncing_users(),
|
||||
now=now,
|
||||
)
|
||||
|
||||
run_in_background(self._update_states_and_catch_exception, changes)
|
||||
except Exception:
|
||||
logger.exception("Exception in _handle_timeouts loop")
|
||||
return self._update_states(changes)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def bump_presence_active_time(self, user):
|
||||
|
|
Loading…
Reference in New Issue