Add typing support

markjh/synchrotron
Mark Haines 2016-06-02 16:14:19 +01:00
parent 0382946067
commit aa3b6baa55
1 changed files with 24 additions and 3 deletions

View File

@ -210,8 +210,24 @@ class SynchrotronPresence(object):
class SynchrotronTyping(object): class SynchrotronTyping(object):
_latest_room_serial = 0 def __init__(self, hs):
_room_serials = () self._latest_room_serial = 0
self._room_serials = {}
self._room_typing = {}
def stream_positions(self):
return {"typing": self._latest_room_serial}
def process_replication(self, result):
stream = result.get("typing")
if stream:
self._latest_room_serial = int(stream["position"])
for row in stream["rows"]:
position, room_id, typing_json = row
typing = json.loads(typing_json)
self._room_serials[room_id] = position
self._room_typing[room_id] = typing
class SynchrotronApplicationService(object): class SynchrotronApplicationService(object):
@ -294,6 +310,7 @@ class SynchrotronServer(HomeServer):
clock = self.get_clock() clock = self.get_clock()
notifier = self.get_notifier() notifier = self.get_notifier()
presence_handler = self.get_presence_handler() presence_handler = self.get_presence_handler()
typing_handler = self.get_typing_handler()
def expire_broken_caches(): def expire_broken_caches():
store.who_forgot_in_room.invalidate_all() store.who_forgot_in_room.invalidate_all()
@ -318,6 +335,7 @@ class SynchrotronServer(HomeServer):
while True: while True:
try: try:
args = store.stream_positions() args = store.stream_positions()
args.update(typing_handler.stream_positions())
args["timeout"] = 30000 args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args) result = yield http_client.get_json(replication_url, args=args)
logger.error("FENRIS %r", result) logger.error("FENRIS %r", result)
@ -328,6 +346,7 @@ class SynchrotronServer(HomeServer):
now_ms + store.BROKEN_CACHE_EXPIRY_MS now_ms + store.BROKEN_CACHE_EXPIRY_MS
) )
yield store.process_replication(result) yield store.process_replication(result)
typing_handler.process_replication(result)
presence_handler.process_replication(result) presence_handler.process_replication(result)
notify(result) notify(result)
except: except:
@ -337,6 +356,9 @@ class SynchrotronServer(HomeServer):
def build_presence_handler(self): def build_presence_handler(self):
return SynchrotronPresence(self) return SynchrotronPresence(self)
def build_typing_handler(self):
return SynchrotronTyping(self)
def setup(config_options): def setup(config_options):
try: try:
@ -360,7 +382,6 @@ def setup(config_options):
config=config, config=config,
version_string=get_version_string("Synapse", synapse), version_string=get_version_string("Synapse", synapse),
database_engine=database_engine, database_engine=database_engine,
typing_handler=SynchrotronTyping(),
application_service_handler=SynchrotronApplicationService(), application_service_handler=SynchrotronApplicationService(),
) )