Prod the main synapse with the list of syncing users
parent
b161fae864
commit
40c7c81da9
|
@ -41,6 +41,7 @@ from synapse.util.httpresourcetree import create_resource_tree
|
|||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.rlimit import change_resource_limit
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
from twisted.internet import reactor, defer
|
||||
|
@ -137,19 +138,56 @@ class SynchrotronSlavedStore(
|
|||
|
||||
|
||||
class SynchrotronPresence(object):
|
||||
def __init__(self, hs):
|
||||
self.http_client = hs.get_simple_http_client()
|
||||
self.user_to_num_current_syncs = {}
|
||||
self.process_id = random_string(16)
|
||||
self.syncing_users_url = hs.config.replication_url + "/syncing_users"
|
||||
logger.info("Presence process_id is %r", self.process_id)
|
||||
|
||||
def set_state(self, user, state):
|
||||
pass
|
||||
|
||||
def get_states(self, user_ids, as_event=False):
|
||||
return {}
|
||||
|
||||
@contextlib.contextmanager
|
||||
def user_syncing(self, user, affect_presence):
|
||||
yield
|
||||
|
||||
def current_state_for_users(self, user_ids):
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def user_syncing(self, user_id, affect_presence):
|
||||
if affect_presence:
|
||||
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
|
||||
self.user_to_num_current_syncs[user_id] = curr_sync + 1
|
||||
# TODO: Send this less frequently.
|
||||
# TODO: Make sure this doesn't race. Currently we can lose updates
|
||||
# if two users come online in quick sucession and the second http
|
||||
# to the master completes before the first.
|
||||
# TODO: Don't block the sync request on this HTTP hit.
|
||||
yield self._send_syncing_users()
|
||||
|
||||
def _end():
|
||||
if affect_presence:
|
||||
self.user_to_num_current_syncs[user_id] -= 1
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _user_syncing():
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
_end()
|
||||
|
||||
defer.returnValue(_user_syncing())
|
||||
|
||||
def _send_syncing_users(self):
|
||||
return self.http_client.post_json_get_json(self.syncing_users_url, {
|
||||
"process_id": self.process_id,
|
||||
"syncing_users": [
|
||||
user_id for user_id, count in self.user_to_num_current_syncs.items()
|
||||
if count > 0
|
||||
],
|
||||
})
|
||||
|
||||
|
||||
class SynchrotronTyping(object):
|
||||
_latest_room_serial = 0
|
||||
|
@ -274,6 +312,9 @@ class SynchrotronServer(HomeServer):
|
|||
logger.exception("Error replicating from %r", replication_url)
|
||||
sleep(5)
|
||||
|
||||
def build_presence_handler(self):
|
||||
return SynchrotronPresence(self)
|
||||
|
||||
|
||||
def setup(config_options):
|
||||
try:
|
||||
|
@ -297,7 +338,6 @@ def setup(config_options):
|
|||
config=config,
|
||||
version_string=get_version_string("Synapse", synapse),
|
||||
database_engine=database_engine,
|
||||
presence_handler=SynchrotronPresence(),
|
||||
typing_handler=SynchrotronTyping(),
|
||||
application_service_handler=SynchrotronApplicationService(),
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue