diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index c1338e8e36..9d31732960 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -210,8 +210,24 @@ class SynchrotronPresence(object): class SynchrotronTyping(object): - _latest_room_serial = 0 - _room_serials = () + def __init__(self, hs): + self._latest_room_serial = 0 + self._room_serials = {} + self._room_typing = {} + + def stream_positions(self): + return {"typing": self._latest_room_serial} + + def process_replication(self, result): + stream = result.get("typing") + if stream: + self._latest_room_serial = int(stream["position"]) + + for row in stream["rows"]: + position, room_id, typing_json = row + typing = json.loads(typing_json) + self._room_serials[room_id] = position + self._room_typing[room_id] = typing class SynchrotronApplicationService(object): @@ -294,6 +310,7 @@ class SynchrotronServer(HomeServer): clock = self.get_clock() notifier = self.get_notifier() presence_handler = self.get_presence_handler() + typing_handler = self.get_typing_handler() def expire_broken_caches(): store.who_forgot_in_room.invalidate_all() @@ -318,6 +335,7 @@ class SynchrotronServer(HomeServer): while True: try: args = store.stream_positions() + args.update(typing_handler.stream_positions()) args["timeout"] = 30000 result = yield http_client.get_json(replication_url, args=args) logger.error("FENRIS %r", result) @@ -328,6 +346,7 @@ class SynchrotronServer(HomeServer): now_ms + store.BROKEN_CACHE_EXPIRY_MS ) yield store.process_replication(result) + typing_handler.process_replication(result) presence_handler.process_replication(result) notify(result) except: @@ -337,6 +356,9 @@ class SynchrotronServer(HomeServer): def build_presence_handler(self): return SynchrotronPresence(self) + def build_typing_handler(self): + return SynchrotronTyping(self) + def setup(config_options): try: @@ -360,7 +382,6 @@ def setup(config_options): config=config, version_string=get_version_string("Synapse", synapse), database_engine=database_engine, - typing_handler=SynchrotronTyping(), application_service_handler=SynchrotronApplicationService(), )