Reduce amount of caches POSITIONS we send (#16561)

Follow on from / actually correctly does #16557
pull/16572/head
Erik Johnston 2023-10-27 16:07:11 +01:00 committed by GitHub
parent 89dbbd68e1
commit 5413cefe32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 11 additions and 0 deletions

1
changelog.d/16561.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a long-standing, exceedingly rare edge case where the first event persisted by a new event persister worker might not be sent down `/sync`.

View File

@ -161,6 +161,14 @@ class Stream:
and `limited` is whether there are more updates to fetch. and `limited` is whether there are more updates to fetch.
""" """
current_token = self.current_token(self.local_instance_name) current_token = self.current_token(self.local_instance_name)
# If the minimum current token for the local instance is less than or
# equal to the last thing we published, we know that there are no
# updates.
if self.last_token >= self.minimal_local_current_token():
self.last_token = current_token
return [], current_token, False
updates, current_token, limited = await self.get_updates_since( updates, current_token, limited = await self.get_updates_since(
self.local_instance_name, self.last_token, current_token self.local_instance_name, self.last_token, current_token
) )
@ -489,6 +497,8 @@ class CachesStream(Stream):
return self.store.get_cache_stream_token_for_writer(instance_name) return self.store.get_cache_stream_token_for_writer(instance_name)
def minimal_local_current_token(self) -> Token: def minimal_local_current_token(self) -> Token:
if self.store._cache_id_gen:
return self.store._cache_id_gen.get_minimal_local_current_token()
return self.current_token(self.local_instance_name) return self.current_token(self.local_instance_name)