Merge pull request #2098 from matrix-org/erikj/repl_tcp_fix
Advance replication streams even if nothing is listeningpull/2099/head
commit
a76886726b
|
@ -415,16 +415,18 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
|
|||
token, row = update[0], update[1]
|
||||
self.send_command(RdataCommand(stream_name, token, row))
|
||||
|
||||
# Now we can send any updates that came in while we were subscribing
|
||||
pending_rdata = self.pending_rdata.pop(stream_name, [])
|
||||
for token, update in pending_rdata:
|
||||
self.send_command(RdataCommand(stream_name, token, update))
|
||||
|
||||
# We send a POSITION command to ensure that they have an up to
|
||||
# date token (especially useful if we didn't send any updates
|
||||
# above)
|
||||
self.send_command(PositionCommand(stream_name, current_token))
|
||||
|
||||
# Now we can send any updates that came in while we were subscribing
|
||||
pending_rdata = self.pending_rdata.pop(stream_name, [])
|
||||
for token, update in pending_rdata:
|
||||
# Only send updates newer than the current token
|
||||
if token > current_token:
|
||||
self.send_command(RdataCommand(stream_name, token, update))
|
||||
|
||||
# They're now fully subscribed
|
||||
self.replication_streams.add(stream_name)
|
||||
except Exception as e:
|
||||
|
|
|
@ -124,7 +124,7 @@ class ReplicationStreamer(object):
|
|||
# Don't bother if nothing is listening. We still need to advance
|
||||
# the stream tokens otherwise they'll fall beihind forever
|
||||
for stream in self.streams:
|
||||
stream.advance_current_token()
|
||||
stream.discard_updates_and_advance()
|
||||
return
|
||||
|
||||
# If we're in the process of checking for new updates, mark that fact
|
||||
|
|
|
@ -89,6 +89,13 @@ class Stream(object):
|
|||
"""
|
||||
self.upto_token = self.current_token()
|
||||
|
||||
def discard_updates_and_advance(self):
|
||||
"""Called when the stream should advance but the updates would be discarded,
|
||||
e.g. when there are no currently connected workers.
|
||||
"""
|
||||
self.upto_token = self.current_token()
|
||||
self.last_token = self.upto_token
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_updates(self):
|
||||
"""Gets all updates since the last time this function was called (or
|
||||
|
|
Loading…
Reference in New Issue