Merge pull request #2106 from matrix-org/erikj/reduce_user_sync
Reduce rate of USER_SYNC repl commandspull/2108/head
commit
944692ef69
|
@ -120,12 +120,53 @@ class SynchrotronPresence(object):
|
||||||
for state in active_presence
|
for state in active_presence
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# user_id -> last_sync_ms. Lists the users that have stopped syncing
|
||||||
|
# but we haven't notified the master of that yet
|
||||||
|
self.users_going_offline = {}
|
||||||
|
|
||||||
|
self._send_stop_syncing_loop = self.clock.looping_call(
|
||||||
|
self.send_stop_syncing, 10 * 1000
|
||||||
|
)
|
||||||
|
|
||||||
self.process_id = random_string(16)
|
self.process_id = random_string(16)
|
||||||
logger.info("Presence process_id is %r", self.process_id)
|
logger.info("Presence process_id is %r", self.process_id)
|
||||||
|
|
||||||
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
|
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
|
||||||
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
|
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
|
||||||
|
|
||||||
|
def mark_as_coming_online(self, user_id):
|
||||||
|
"""A user has started syncing. Send a UserSync to the master, unless they
|
||||||
|
had recently stopped syncing.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_id (str)
|
||||||
|
"""
|
||||||
|
going_offline = self.users_going_offline.pop(user_id, None)
|
||||||
|
if not going_offline:
|
||||||
|
# Safe to skip because we haven't yet told the master they were offline
|
||||||
|
self.send_user_sync(user_id, True, self.clock.time_msec())
|
||||||
|
|
||||||
|
def mark_as_going_offline(self, user_id):
|
||||||
|
"""A user has stopped syncing. We wait before notifying the master as
|
||||||
|
its likely they'll come back soon. This allows us to avoid sending
|
||||||
|
a stopped syncing immediately followed by a started syncing notification
|
||||||
|
to the master
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_id (str)
|
||||||
|
"""
|
||||||
|
self.users_going_offline[user_id] = self.clock.time_msec()
|
||||||
|
|
||||||
|
def send_stop_syncing(self):
|
||||||
|
"""Check if there are any users who have stopped syncing a while ago
|
||||||
|
and haven't come back yet. If there are poke the master about them.
|
||||||
|
"""
|
||||||
|
now = self.clock.time_msec()
|
||||||
|
for user_id, last_sync_ms in self.users_going_offline.items():
|
||||||
|
if now - last_sync_ms > 10 * 1000:
|
||||||
|
self.users_going_offline.pop(user_id, None)
|
||||||
|
self.send_user_sync(user_id, False, last_sync_ms)
|
||||||
|
|
||||||
def set_state(self, user, state, ignore_status_msg=False):
|
def set_state(self, user, state, ignore_status_msg=False):
|
||||||
# TODO Hows this supposed to work?
|
# TODO Hows this supposed to work?
|
||||||
pass
|
pass
|
||||||
|
@ -142,8 +183,7 @@ class SynchrotronPresence(object):
|
||||||
|
|
||||||
# If we went from no in flight sync to some, notify replication
|
# If we went from no in flight sync to some, notify replication
|
||||||
if self.user_to_num_current_syncs[user_id] == 1:
|
if self.user_to_num_current_syncs[user_id] == 1:
|
||||||
now = self.clock.time_msec()
|
self.mark_as_coming_online(user_id)
|
||||||
self.send_user_sync(user_id, True, now)
|
|
||||||
|
|
||||||
def _end():
|
def _end():
|
||||||
# We check that the user_id is in user_to_num_current_syncs because
|
# We check that the user_id is in user_to_num_current_syncs because
|
||||||
|
@ -154,8 +194,7 @@ class SynchrotronPresence(object):
|
||||||
|
|
||||||
# If we went from one in flight sync to non, notify replication
|
# If we went from one in flight sync to non, notify replication
|
||||||
if self.user_to_num_current_syncs[user_id] == 0:
|
if self.user_to_num_current_syncs[user_id] == 0:
|
||||||
now = self.clock.time_msec()
|
self.mark_as_going_offline(user_id)
|
||||||
self.send_user_sync(user_id, False, now)
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def _user_syncing():
|
def _user_syncing():
|
||||||
|
|
Loading…
Reference in New Issue