Advance replication streams even if nothing is listening
Otherwise the streams don't advance and steadily fall behind, so when a worker does connect either a) they'll be streamed lots of old updates or b) the connection will fail as the streams are too far behind.pull/2098/head
							parent
							
								
									27cc627e42
								
							
						
					
					
						commit
						023ee197be
					
				|  | @ -414,16 +414,18 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): | |||
|                 token, row = update[0], update[1] | ||||
|                 self.send_command(RdataCommand(stream_name, token, row)) | ||||
| 
 | ||||
|             # Now we can send any updates that came in while we were subscribing | ||||
|             pending_rdata = self.pending_rdata.pop(stream_name, []) | ||||
|             for token, update in pending_rdata: | ||||
|                 self.send_command(RdataCommand(stream_name, token, update)) | ||||
| 
 | ||||
|             # We send a POSITION command to ensure that they have an up to | ||||
|             # date token (especially useful if we didn't send any updates | ||||
|             # above) | ||||
|             self.send_command(PositionCommand(stream_name, current_token)) | ||||
| 
 | ||||
|             # Now we can send any updates that came in while we were subscribing | ||||
|             pending_rdata = self.pending_rdata.pop(stream_name, []) | ||||
|             for token, update in pending_rdata: | ||||
|                 # Only send updates newer than the current token | ||||
|                 if token > current_token: | ||||
|                     self.send_command(RdataCommand(stream_name, token, update)) | ||||
| 
 | ||||
|             # They're now fully subscribed | ||||
|             self.replication_streams.add(stream_name) | ||||
|         except Exception as e: | ||||
|  |  | |||
|  | @ -124,7 +124,7 @@ class ReplicationStreamer(object): | |||
|             # Don't bother if nothing is listening. We still need to advance | ||||
|             # the stream tokens otherwise they'll fall beihind forever | ||||
|             for stream in self.streams: | ||||
|                 stream.advance_current_token() | ||||
|                 stream.discard_updates_and_advance() | ||||
|             return | ||||
| 
 | ||||
|         # If we're in the process of checking for new updates, mark that fact | ||||
|  |  | |||
|  | @ -89,6 +89,13 @@ class Stream(object): | |||
|         """ | ||||
|         self.upto_token = self.current_token() | ||||
| 
 | ||||
|     def discard_updates_and_advance(self): | ||||
|         """Called when the stream should advance but the updates would be discarded, | ||||
|         e.g. when there are no currently connected workers. | ||||
|         """ | ||||
|         self.upto_token = self.current_token() | ||||
|         self.last_token = self.upto_token | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def get_updates(self): | ||||
|         """Gets all updates since the last time this function was called (or | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Erik Johnston
						Erik Johnston