From b2172da9bba51628bced7979a516acb58b79f241 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Oct 2020 16:14:00 +0100 Subject: [PATCH] Create new token format for encoding instance positions --- synapse/types.py | 65 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 60 insertions(+), 5 deletions(-) diff --git a/synapse/types.py b/synapse/types.py index bd271f9f16..550a968a03 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -22,6 +22,7 @@ from typing import ( TYPE_CHECKING, Any, Dict, + Iterable, Mapping, MutableMapping, Optional, @@ -43,7 +44,7 @@ if TYPE_CHECKING: if sys.version_info[:3] >= (3, 6, 0): from typing import Collection else: - from typing import Container, Iterable, Sized + from typing import Container, Sized T_co = TypeVar("T_co", covariant=True) @@ -375,7 +376,7 @@ def map_username_to_mxid_localpart(username, case_sensitive=False): return username.decode("ascii") -@attr.s(frozen=True, slots=True) +@attr.s(frozen=True, slots=True, cmp=False) class RoomStreamToken: """Tokens are positions between events. The token "s1" comes after event 1. @@ -395,8 +396,30 @@ class RoomStreamToken: Live tokens start with an "s" followed by the "stream_ordering" id of the event it comes after. Historic tokens start with a "t" followed by the - "topological_ordering" id of the event it comes after, followed by "-", + "topological_ordering" id of the event it comes after, followed by "~", followed by the "stream_ordering" id of the event it comes after. + + There is also a third mode for live tokens where the token starts with "m", + which is sometimes used when using sharded event persisters. In this case + the events stream is considered to be a set of streams (one for each writer) + and the token encodes the vector clock of positions of each writer in their + respective streams. + + The format of the token in such case is an initial integer min position, + followed by the mapping of instance ID to position seperate by '.' and '~': + + m{min_pos}.{writer1}~{pos1}.{writer2}~{pos2}. ... + + The `min_pos` corresponds to the minimum position all writers have persisted + up to, and then only writers that are ahead of that position need to be + encoded. An example token is: + + m56.2~58.3~59 + + Which corresponds to a set of three (or more writers) where instances 2 and + 3 (these are instance IDs that can be looked up in the DB to fetch the more + commonly used instance names) are at positions 58 and 59 respectively, and + all other instances are at position 56. """ topological = attr.ib( @@ -405,6 +428,8 @@ class RoomStreamToken: ) stream = attr.ib(type=int, validator=attr.validators.instance_of(int)) + instance_map = attr.ib(type=Dict[str, int], factory=dict) + @classmethod async def parse(cls, store: "DataStore", string: str) -> "RoomStreamToken": try: @@ -413,6 +438,20 @@ class RoomStreamToken: if string[0] == "t": parts = string[1:].split("-", 1) return cls(topological=int(parts[0]), stream=int(parts[1])) + if string[0] == "m": + parts = string[1:].split(".") + stream = int(parts[0]) + + instance_map = {} + for part in parts[1:]: + key, value = part.split("~") + instance_id = int(key) + pos = int(value) + + instance_name = await store.get_name_from_instance_id(instance_id) + instance_map[instance_name] = pos + + return cls(topological=None, stream=stream, instance_map=instance_map,) except Exception: pass raise SynapseError(400, "Invalid token %r" % (string,)) @@ -436,7 +475,15 @@ class RoomStreamToken: max_stream = max(self.stream, other.stream) - return RoomStreamToken(None, max_stream) + instance_map = { + instance: max( + self.instance_map.get(instance, self.stream), + other.instance_map.get(instance, other.stream), + ) + for instance in set(self.instance_map).union(other.instance_map) + } + + return RoomStreamToken(None, max_stream, instance_map) def as_tuple(self) -> Tuple[Optional[int], int]: return (self.topological, self.stream) @@ -444,6 +491,14 @@ class RoomStreamToken: async def to_string(self, store: "DataStore") -> str: if self.topological is not None: return "t%d-%d" % (self.topological, self.stream) + elif self.instance_map: + entries = [] + for name, pos in self.instance_map.items(): + instance_id = await store.get_id_for_instance(name) + entries.append("{}~{}".format(instance_id, pos)) + + encoded_map = ".".join(entries) + return "m{}.{}".format(self.stream, encoded_map) else: return "s%d" % (self.stream,) @@ -535,7 +590,7 @@ class PersistedEventPosition: stream = attr.ib(type=int) def persisted_after(self, token: RoomStreamToken) -> bool: - return token.stream < self.stream + return token.get_stream_pos_for_instance(self.instance_name) < self.stream def to_room_stream_token(self) -> RoomStreamToken: """Converts the position to a room stream token such that events