Fix bug in wait for stream position (#14872)

This caused some requests to fail.

This caused some requests to fail.

This really only started causing issues due to #14856
pull/14875/head
Erik Johnston 2023-01-19 22:19:56 +00:00 committed by GitHub
parent a7b54ca8d8
commit cdf2707678
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 10 deletions

1
changelog.d/14872.misc Normal file
View File

@ -0,0 +1 @@
Fix `wait_for_stream_position` to correctly wait for the right instance to advance its token.

View File

@ -133,9 +133,9 @@ class ReplicationDataHandler:
if hs.should_send_federation(): if hs.should_send_federation():
self.send_handler = FederationSenderHandler(hs) self.send_handler = FederationSenderHandler(hs)
# Map from stream to list of deferreds waiting for the stream to # Map from stream and instance to list of deferreds waiting for the stream to
# arrive at a particular position. The lists are sorted by stream position. # arrive at a particular position. The lists are sorted by stream position.
self._streams_to_waiters: Dict[str, List[Tuple[int, Deferred]]] = {} self._streams_to_waiters: Dict[Tuple[str, str], List[Tuple[int, Deferred]]] = {}
async def on_rdata( async def on_rdata(
self, stream_name: str, instance_name: str, token: int, rows: list self, stream_name: str, instance_name: str, token: int, rows: list
@ -270,7 +270,7 @@ class ReplicationDataHandler:
# Notify any waiting deferreds. The list is ordered by position so we # Notify any waiting deferreds. The list is ordered by position so we
# just iterate through the list until we reach a position that is # just iterate through the list until we reach a position that is
# greater than the received row position. # greater than the received row position.
waiting_list = self._streams_to_waiters.get(stream_name, []) waiting_list = self._streams_to_waiters.get((stream_name, instance_name), [])
# Index of first item with a position after the current token, i.e we # Index of first item with a position after the current token, i.e we
# have called all deferreds before this index. If not overwritten by # have called all deferreds before this index. If not overwritten by
@ -279,14 +279,13 @@ class ReplicationDataHandler:
# `len(list)` works for both cases. # `len(list)` works for both cases.
index_of_first_deferred_not_called = len(waiting_list) index_of_first_deferred_not_called = len(waiting_list)
# We don't fire the deferreds until after we finish iterating over the
# list, to avoid the list changing when we fire the deferreds.
deferreds_to_callback = []
for idx, (position, deferred) in enumerate(waiting_list): for idx, (position, deferred) in enumerate(waiting_list):
if position <= token: if position <= token:
try: deferreds_to_callback.append(deferred)
with PreserveLoggingContext():
deferred.callback(None)
except Exception:
# The deferred has been cancelled or timed out.
pass
else: else:
# The list is sorted by position so we don't need to continue # The list is sorted by position so we don't need to continue
# checking any further entries in the list. # checking any further entries in the list.
@ -297,6 +296,14 @@ class ReplicationDataHandler:
# loop. (This maintains the order so no need to resort) # loop. (This maintains the order so no need to resort)
waiting_list[:] = waiting_list[index_of_first_deferred_not_called:] waiting_list[:] = waiting_list[index_of_first_deferred_not_called:]
for deferred in deferreds_to_callback:
try:
with PreserveLoggingContext():
deferred.callback(None)
except Exception:
# The deferred has been cancelled or timed out.
pass
async def on_position( async def on_position(
self, stream_name: str, instance_name: str, token: int self, stream_name: str, instance_name: str, token: int
) -> None: ) -> None:
@ -349,7 +356,9 @@ class ReplicationDataHandler:
deferred, _WAIT_FOR_REPLICATION_TIMEOUT_SECONDS, self._reactor deferred, _WAIT_FOR_REPLICATION_TIMEOUT_SECONDS, self._reactor
) )
waiting_list = self._streams_to_waiters.setdefault(stream_name, []) waiting_list = self._streams_to_waiters.setdefault(
(stream_name, instance_name), []
)
waiting_list.append((position, deferred)) waiting_list.append((position, deferred))
waiting_list.sort(key=lambda t: t[0]) waiting_list.sort(key=lambda t: t[0])