Create new token format for encoding instance positions
parent
a23a3136f9
commit
b2172da9bb
|
@ -22,6 +22,7 @@ from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Any,
|
Any,
|
||||||
Dict,
|
Dict,
|
||||||
|
Iterable,
|
||||||
Mapping,
|
Mapping,
|
||||||
MutableMapping,
|
MutableMapping,
|
||||||
Optional,
|
Optional,
|
||||||
|
@ -43,7 +44,7 @@ if TYPE_CHECKING:
|
||||||
if sys.version_info[:3] >= (3, 6, 0):
|
if sys.version_info[:3] >= (3, 6, 0):
|
||||||
from typing import Collection
|
from typing import Collection
|
||||||
else:
|
else:
|
||||||
from typing import Container, Iterable, Sized
|
from typing import Container, Sized
|
||||||
|
|
||||||
T_co = TypeVar("T_co", covariant=True)
|
T_co = TypeVar("T_co", covariant=True)
|
||||||
|
|
||||||
|
@ -375,7 +376,7 @@ def map_username_to_mxid_localpart(username, case_sensitive=False):
|
||||||
return username.decode("ascii")
|
return username.decode("ascii")
|
||||||
|
|
||||||
|
|
||||||
@attr.s(frozen=True, slots=True)
|
@attr.s(frozen=True, slots=True, cmp=False)
|
||||||
class RoomStreamToken:
|
class RoomStreamToken:
|
||||||
"""Tokens are positions between events. The token "s1" comes after event 1.
|
"""Tokens are positions between events. The token "s1" comes after event 1.
|
||||||
|
|
||||||
|
@ -395,8 +396,30 @@ class RoomStreamToken:
|
||||||
|
|
||||||
Live tokens start with an "s" followed by the "stream_ordering" id of the
|
Live tokens start with an "s" followed by the "stream_ordering" id of the
|
||||||
event it comes after. Historic tokens start with a "t" followed by the
|
event it comes after. Historic tokens start with a "t" followed by the
|
||||||
"topological_ordering" id of the event it comes after, followed by "-",
|
"topological_ordering" id of the event it comes after, followed by "~",
|
||||||
followed by the "stream_ordering" id of the event it comes after.
|
followed by the "stream_ordering" id of the event it comes after.
|
||||||
|
|
||||||
|
There is also a third mode for live tokens where the token starts with "m",
|
||||||
|
which is sometimes used when using sharded event persisters. In this case
|
||||||
|
the events stream is considered to be a set of streams (one for each writer)
|
||||||
|
and the token encodes the vector clock of positions of each writer in their
|
||||||
|
respective streams.
|
||||||
|
|
||||||
|
The format of the token in such case is an initial integer min position,
|
||||||
|
followed by the mapping of instance ID to position seperate by '.' and '~':
|
||||||
|
|
||||||
|
m{min_pos}.{writer1}~{pos1}.{writer2}~{pos2}. ...
|
||||||
|
|
||||||
|
The `min_pos` corresponds to the minimum position all writers have persisted
|
||||||
|
up to, and then only writers that are ahead of that position need to be
|
||||||
|
encoded. An example token is:
|
||||||
|
|
||||||
|
m56.2~58.3~59
|
||||||
|
|
||||||
|
Which corresponds to a set of three (or more writers) where instances 2 and
|
||||||
|
3 (these are instance IDs that can be looked up in the DB to fetch the more
|
||||||
|
commonly used instance names) are at positions 58 and 59 respectively, and
|
||||||
|
all other instances are at position 56.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
topological = attr.ib(
|
topological = attr.ib(
|
||||||
|
@ -405,6 +428,8 @@ class RoomStreamToken:
|
||||||
)
|
)
|
||||||
stream = attr.ib(type=int, validator=attr.validators.instance_of(int))
|
stream = attr.ib(type=int, validator=attr.validators.instance_of(int))
|
||||||
|
|
||||||
|
instance_map = attr.ib(type=Dict[str, int], factory=dict)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def parse(cls, store: "DataStore", string: str) -> "RoomStreamToken":
|
async def parse(cls, store: "DataStore", string: str) -> "RoomStreamToken":
|
||||||
try:
|
try:
|
||||||
|
@ -413,6 +438,20 @@ class RoomStreamToken:
|
||||||
if string[0] == "t":
|
if string[0] == "t":
|
||||||
parts = string[1:].split("-", 1)
|
parts = string[1:].split("-", 1)
|
||||||
return cls(topological=int(parts[0]), stream=int(parts[1]))
|
return cls(topological=int(parts[0]), stream=int(parts[1]))
|
||||||
|
if string[0] == "m":
|
||||||
|
parts = string[1:].split(".")
|
||||||
|
stream = int(parts[0])
|
||||||
|
|
||||||
|
instance_map = {}
|
||||||
|
for part in parts[1:]:
|
||||||
|
key, value = part.split("~")
|
||||||
|
instance_id = int(key)
|
||||||
|
pos = int(value)
|
||||||
|
|
||||||
|
instance_name = await store.get_name_from_instance_id(instance_id)
|
||||||
|
instance_map[instance_name] = pos
|
||||||
|
|
||||||
|
return cls(topological=None, stream=stream, instance_map=instance_map,)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
raise SynapseError(400, "Invalid token %r" % (string,))
|
raise SynapseError(400, "Invalid token %r" % (string,))
|
||||||
|
@ -436,7 +475,15 @@ class RoomStreamToken:
|
||||||
|
|
||||||
max_stream = max(self.stream, other.stream)
|
max_stream = max(self.stream, other.stream)
|
||||||
|
|
||||||
return RoomStreamToken(None, max_stream)
|
instance_map = {
|
||||||
|
instance: max(
|
||||||
|
self.instance_map.get(instance, self.stream),
|
||||||
|
other.instance_map.get(instance, other.stream),
|
||||||
|
)
|
||||||
|
for instance in set(self.instance_map).union(other.instance_map)
|
||||||
|
}
|
||||||
|
|
||||||
|
return RoomStreamToken(None, max_stream, instance_map)
|
||||||
|
|
||||||
def as_tuple(self) -> Tuple[Optional[int], int]:
|
def as_tuple(self) -> Tuple[Optional[int], int]:
|
||||||
return (self.topological, self.stream)
|
return (self.topological, self.stream)
|
||||||
|
@ -444,6 +491,14 @@ class RoomStreamToken:
|
||||||
async def to_string(self, store: "DataStore") -> str:
|
async def to_string(self, store: "DataStore") -> str:
|
||||||
if self.topological is not None:
|
if self.topological is not None:
|
||||||
return "t%d-%d" % (self.topological, self.stream)
|
return "t%d-%d" % (self.topological, self.stream)
|
||||||
|
elif self.instance_map:
|
||||||
|
entries = []
|
||||||
|
for name, pos in self.instance_map.items():
|
||||||
|
instance_id = await store.get_id_for_instance(name)
|
||||||
|
entries.append("{}~{}".format(instance_id, pos))
|
||||||
|
|
||||||
|
encoded_map = ".".join(entries)
|
||||||
|
return "m{}.{}".format(self.stream, encoded_map)
|
||||||
else:
|
else:
|
||||||
return "s%d" % (self.stream,)
|
return "s%d" % (self.stream,)
|
||||||
|
|
||||||
|
@ -535,7 +590,7 @@ class PersistedEventPosition:
|
||||||
stream = attr.ib(type=int)
|
stream = attr.ib(type=int)
|
||||||
|
|
||||||
def persisted_after(self, token: RoomStreamToken) -> bool:
|
def persisted_after(self, token: RoomStreamToken) -> bool:
|
||||||
return token.stream < self.stream
|
return token.get_stream_pos_for_instance(self.instance_name) < self.stream
|
||||||
|
|
||||||
def to_room_stream_token(self) -> RoomStreamToken:
|
def to_room_stream_token(self) -> RoomStreamToken:
|
||||||
"""Converts the position to a room stream token such that events
|
"""Converts the position to a room stream token such that events
|
||||||
|
|
Loading…
Reference in New Issue