diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 8006568b0a..257c5584d0 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1071,7 +1071,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): # stream token (as returned by `RoomStreamToken.get_max_stream_pos`) and # then filtering the results. if from_token.topological is not None: - from_bound = from_token.as_tuple() + from_bound = ( + from_token.as_historical_tuple() + ) # type: Tuple[Optional[int], int] elif direction == "b": from_bound = ( None, @@ -1083,10 +1085,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): from_token.stream, ) - to_bound = None + to_bound = None # type: Optional[Tuple[Optional[int], int]] if to_token: if to_token.topological is not None: - to_bound = to_token.as_tuple() + to_bound = to_token.as_historical_tuple() elif direction == "b": to_bound = ( None, diff --git a/synapse/types.py b/synapse/types.py index dd8160f355..dd36477e9d 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -396,7 +396,7 @@ class RoomStreamToken: Live tokens start with an "s" followed by the "stream_ordering" id of the event it comes after. Historic tokens start with a "t" followed by the - "topological_ordering" id of the event it comes after, followed by "~", + "topological_ordering" id of the event it comes after, followed by "-", followed by the "stream_ordering" id of the event it comes after. There is also a third mode for live tokens where the token starts with "m", @@ -420,6 +420,9 @@ class RoomStreamToken: 3 (these are instance IDs that can be looked up in the DB to fetch the more commonly used instance names) are at positions 58 and 59 respectively, and all other instances are at position 56. + + Note: The `RoomStreamToken` cannot have both a topological part and an + instance map. """ topological = attr.ib( @@ -428,7 +431,19 @@ class RoomStreamToken: ) stream = attr.ib(type=int, validator=attr.validators.instance_of(int)) - instance_map = attr.ib(type=Dict[str, int], factory=dict) + instance_map = attr.ib( + type=Dict[str, int], factory=dict, validator=attr.validators.instance_of(dict) + ) + + @instance_map.validator + def _validate_topological_or_live(self, attribute, value): + """Validates that both `topological` and `instance_map` aren't set. + """ + + if value and self.topological: + raise ValueError( + "Cannot set both 'topological' and 'instance_map' on 'RoomStreamToken'." + ) @classmethod async def parse(cls, store: "DataStore", string: str) -> "RoomStreamToken": @@ -485,12 +500,26 @@ class RoomStreamToken: return RoomStreamToken(None, max_stream, instance_map) - def as_tuple(self) -> Tuple[Optional[int], int]: + def as_historical_tuple(self) -> Tuple[int, int]: + """Returns a tuple of `(topological, stream)` for historical tokens. + + Raises if not an historical token (i.e. doesn't have a topological part). + """ + if self.topological is None: + raise Exception( + "Cannot call `RoomStreamToken.as_historical_tuple` on live token" + ) + return (self.topological, self.stream) def get_stream_pos_for_instance(self, instance_name: str) -> int: - """Get the stream position for the instance + """Get the stream position that the writer was at at this token. + + This only makes sense for "live" tokens that may have a vector clock + component. """ + # If we don't have an entry for the instance we can assume that it was + # at `self.stream`. return self.instance_map.get(instance_name, self.stream) def get_max_stream_pos(self) -> int: