s/self.client/self.command_handler/

pull/7187/head
Erik Johnston 2020-04-06 15:38:18 +01:00
parent 2190027cf9
commit 7b2af21b93
1 changed files with 5 additions and 3 deletions

View File

@ -93,7 +93,7 @@ class ReplicationStreamer(object):
self.is_looping = False
self.pending_updates = False
self.client = hs.get_tcp_replication()
self.command_handler = hs.get_tcp_replication()
def get_streams(self) -> Dict[str, Stream]:
"""Get a mapp from stream name to stream instance.
@ -107,7 +107,7 @@ class ReplicationStreamer(object):
This should get called each time new data is available, even if it
is currently being executed, so that nothing gets missed
"""
if not self.client.connected():
if not self.command_handler.connected():
# Don't bother if nothing is listening. We still need to advance
# the stream tokens otherwise they'll fall beihind forever
for stream in self.streams:
@ -182,7 +182,9 @@ class ReplicationStreamer(object):
for token, row in batched_updates:
try:
self.client.stream_update(stream.NAME, token, row)
self.command_handler.stream_update(
stream.NAME, token, row
)
except Exception:
logger.exception("Failed to replicate")