Fix up RoomStreamToken
parent
f709da9e05
commit
e91f0e9d13
|
@ -1071,7 +1071,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
|
||||||
# stream token (as returned by `RoomStreamToken.get_max_stream_pos`) and
|
# stream token (as returned by `RoomStreamToken.get_max_stream_pos`) and
|
||||||
# then filtering the results.
|
# then filtering the results.
|
||||||
if from_token.topological is not None:
|
if from_token.topological is not None:
|
||||||
from_bound = from_token.as_tuple()
|
from_bound = (
|
||||||
|
from_token.as_historical_tuple()
|
||||||
|
) # type: Tuple[Optional[int], int]
|
||||||
elif direction == "b":
|
elif direction == "b":
|
||||||
from_bound = (
|
from_bound = (
|
||||||
None,
|
None,
|
||||||
|
@ -1083,10 +1085,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
|
||||||
from_token.stream,
|
from_token.stream,
|
||||||
)
|
)
|
||||||
|
|
||||||
to_bound = None
|
to_bound = None # type: Optional[Tuple[Optional[int], int]]
|
||||||
if to_token:
|
if to_token:
|
||||||
if to_token.topological is not None:
|
if to_token.topological is not None:
|
||||||
to_bound = to_token.as_tuple()
|
to_bound = to_token.as_historical_tuple()
|
||||||
elif direction == "b":
|
elif direction == "b":
|
||||||
to_bound = (
|
to_bound = (
|
||||||
None,
|
None,
|
||||||
|
|
|
@ -396,7 +396,7 @@ class RoomStreamToken:
|
||||||
|
|
||||||
Live tokens start with an "s" followed by the "stream_ordering" id of the
|
Live tokens start with an "s" followed by the "stream_ordering" id of the
|
||||||
event it comes after. Historic tokens start with a "t" followed by the
|
event it comes after. Historic tokens start with a "t" followed by the
|
||||||
"topological_ordering" id of the event it comes after, followed by "~",
|
"topological_ordering" id of the event it comes after, followed by "-",
|
||||||
followed by the "stream_ordering" id of the event it comes after.
|
followed by the "stream_ordering" id of the event it comes after.
|
||||||
|
|
||||||
There is also a third mode for live tokens where the token starts with "m",
|
There is also a third mode for live tokens where the token starts with "m",
|
||||||
|
@ -420,6 +420,9 @@ class RoomStreamToken:
|
||||||
3 (these are instance IDs that can be looked up in the DB to fetch the more
|
3 (these are instance IDs that can be looked up in the DB to fetch the more
|
||||||
commonly used instance names) are at positions 58 and 59 respectively, and
|
commonly used instance names) are at positions 58 and 59 respectively, and
|
||||||
all other instances are at position 56.
|
all other instances are at position 56.
|
||||||
|
|
||||||
|
Note: The `RoomStreamToken` cannot have both a topological part and an
|
||||||
|
instance map.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
topological = attr.ib(
|
topological = attr.ib(
|
||||||
|
@ -428,7 +431,19 @@ class RoomStreamToken:
|
||||||
)
|
)
|
||||||
stream = attr.ib(type=int, validator=attr.validators.instance_of(int))
|
stream = attr.ib(type=int, validator=attr.validators.instance_of(int))
|
||||||
|
|
||||||
instance_map = attr.ib(type=Dict[str, int], factory=dict)
|
instance_map = attr.ib(
|
||||||
|
type=Dict[str, int], factory=dict, validator=attr.validators.instance_of(dict)
|
||||||
|
)
|
||||||
|
|
||||||
|
@instance_map.validator
|
||||||
|
def _validate_topological_or_live(self, attribute, value):
|
||||||
|
"""Validates that both `topological` and `instance_map` aren't set.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if value and self.topological:
|
||||||
|
raise ValueError(
|
||||||
|
"Cannot set both 'topological' and 'instance_map' on 'RoomStreamToken'."
|
||||||
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def parse(cls, store: "DataStore", string: str) -> "RoomStreamToken":
|
async def parse(cls, store: "DataStore", string: str) -> "RoomStreamToken":
|
||||||
|
@ -485,12 +500,26 @@ class RoomStreamToken:
|
||||||
|
|
||||||
return RoomStreamToken(None, max_stream, instance_map)
|
return RoomStreamToken(None, max_stream, instance_map)
|
||||||
|
|
||||||
def as_tuple(self) -> Tuple[Optional[int], int]:
|
def as_historical_tuple(self) -> Tuple[int, int]:
|
||||||
|
"""Returns a tuple of `(topological, stream)` for historical tokens.
|
||||||
|
|
||||||
|
Raises if not an historical token (i.e. doesn't have a topological part).
|
||||||
|
"""
|
||||||
|
if self.topological is None:
|
||||||
|
raise Exception(
|
||||||
|
"Cannot call `RoomStreamToken.as_historical_tuple` on live token"
|
||||||
|
)
|
||||||
|
|
||||||
return (self.topological, self.stream)
|
return (self.topological, self.stream)
|
||||||
|
|
||||||
def get_stream_pos_for_instance(self, instance_name: str) -> int:
|
def get_stream_pos_for_instance(self, instance_name: str) -> int:
|
||||||
"""Get the stream position for the instance
|
"""Get the stream position that the writer was at at this token.
|
||||||
|
|
||||||
|
This only makes sense for "live" tokens that may have a vector clock
|
||||||
|
component.
|
||||||
"""
|
"""
|
||||||
|
# If we don't have an entry for the instance we can assume that it was
|
||||||
|
# at `self.stream`.
|
||||||
return self.instance_map.get(instance_name, self.stream)
|
return self.instance_map.get(instance_name, self.stream)
|
||||||
|
|
||||||
def get_max_stream_pos(self) -> int:
|
def get_max_stream_pos(self) -> int:
|
||||||
|
|
Loading…
Reference in New Issue