Also discard 'caches' and 'backfill' stream POSITIONS (#16655)

Follow on from #16640
pull/16660/head
Erik Johnston 2023-11-17 14:14:29 +00:00 committed by GitHub
parent bdb0cbc5ca
commit 6fec2d035f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 0 deletions

1
changelog.d/16655.misc Normal file
View File

@ -0,0 +1 @@
More efficiently handle no-op `POSITION` over replication.

View File

@ -305,6 +305,14 @@ class BackfillStream(Stream):
# which means we need to negate it. # which means we need to negate it.
return -self.store._backfill_id_gen.get_minimal_local_current_token() return -self.store._backfill_id_gen.get_minimal_local_current_token()
def can_discard_position(
self, instance_name: str, prev_token: int, new_token: int
) -> bool:
# Backfill stream can't go backwards, so we know we can ignore any
# positions where the tokens are from before the current token.
return new_token <= self.current_token(instance_name)
class PresenceStream(_StreamFromIdGen): class PresenceStream(_StreamFromIdGen):
@attr.s(slots=True, frozen=True, auto_attribs=True) @attr.s(slots=True, frozen=True, auto_attribs=True)
@ -519,6 +527,14 @@ class CachesStream(Stream):
return self.store._cache_id_gen.get_minimal_local_current_token() return self.store._cache_id_gen.get_minimal_local_current_token()
return self.current_token(self.local_instance_name) return self.current_token(self.local_instance_name)
def can_discard_position(
self, instance_name: str, prev_token: int, new_token: int
) -> bool:
# Caches streams can't go backwards, so we know we can ignore any
# positions where the tokens are from before the current token.
return new_token <= self.current_token(instance_name)
class DeviceListsStream(_StreamFromIdGen): class DeviceListsStream(_StreamFromIdGen):
"""Either a user has updated their devices or a remote server needs to be """Either a user has updated their devices or a remote server needs to be