diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4c5b935012..33c05950c5 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -24,6 +24,8 @@ from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client +from synapse.types import SyncNextBatchToken + from twisted.internet import defer import collections @@ -141,7 +143,7 @@ class SyncHandler(BaseHandler): self.clock = hs.get_clock() self.response_cache = ResponseCache() - def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, + def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0, full_state=False): """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then @@ -154,53 +156,68 @@ class SyncHandler(BaseHandler): result = self.response_cache.set( sync_config.request_key, self._wait_for_sync_for_user( - sync_config, since_token, timeout, full_state + sync_config, batch_token, timeout, full_state ) ) return result @defer.inlineCallbacks - def _wait_for_sync_for_user(self, sync_config, since_token, timeout, + def _wait_for_sync_for_user(self, sync_config, batch_token, timeout, full_state): context = LoggingContext.current_context() if context: - if since_token is None: + if batch_token is None: context.tag = "initial_sync" elif full_state: context.tag = "full_state_sync" else: context.tag = "incremental_sync" - if timeout == 0 or since_token is None or full_state: + if timeout == 0 or batch_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. result = yield self.current_sync_for_user( - sync_config, since_token, full_state=full_state, + sync_config, batch_token, full_state=full_state, ) defer.returnValue(result) else: def current_sync_callback(before_token, after_token): - return self.current_sync_for_user(sync_config, since_token) + return self.current_sync_for_user(sync_config, batch_token) result = yield self.notifier.wait_for_events( sync_config.user.to_string(), timeout, current_sync_callback, - from_token=since_token, + from_token=batch_token.stream_token, ) defer.returnValue(result) - def current_sync_for_user(self, sync_config, since_token=None, + def current_sync_for_user(self, sync_config, batch_token=None, full_state=False): """Get the sync for client needed to match what the server has now. Returns: A Deferred SyncResult. """ - if since_token is None or full_state: - return self.full_state_sync(sync_config, since_token) + if batch_token is None or full_state: + return self.full_state_sync(sync_config, batch_token) else: - return self.incremental_sync_with_gap(sync_config, since_token) + return self.incremental_sync_with_gap(sync_config, batch_token) @defer.inlineCallbacks - def full_state_sync(self, sync_config, timeline_since_token): + def _get_room_timestamps_at_token(self, room_ids, token): + room_to_last_ts = {} + + @defer.inlineCallbacks + def _get_last_ts(room_id): + ts = yield self.store.get_last_ts_for_room( + room_id, token.room_key + ) + room_to_last_ts[room_id] = ts if ts else 0 + + logger.info("room_to_last_ts: %r", room_to_last_ts) + yield concurrently_execute(_get_last_ts, room_ids, 10) + defer.returnValue(room_to_last_ts) + + @defer.inlineCallbacks + def full_state_sync(self, sync_config, batch_token): """Get a sync for a client which is starting without any state. If a 'message_since_token' is given, only timeline events which have @@ -209,6 +226,11 @@ class SyncHandler(BaseHandler): Returns: A Deferred SyncResult. """ + if batch_token: + timeline_since_token = batch_token.stream_token + else: + timeline_since_token = None + now_token = yield self.event_sources.get_current_token() now_token, ephemeral_by_room = yield self.ephemeral_by_room( @@ -258,24 +280,22 @@ class SyncHandler(BaseHandler): user_id = sync_config.user.to_string() - room_to_last_ts = {} + pagination_limit = 20 + room_pagination_config = { + "l": pagination_limit, + "o": 0, + "t": now_token, + } - @defer.inlineCallbacks - def _get_last_ts(event): - room_id = event.room_id - if event.membership == Membership.JOIN: - ts = yield self.store.get_last_ts_for_room( - room_id, now_token.room_key - ) - room_to_last_ts[room_id] = ts if ts else 0 - - logger.info("room_to_last_ts: %r", room_to_last_ts) - yield concurrently_execute(_get_last_ts, room_list, 10) + room_to_last_ts = yield self._get_room_timestamps_at_token( + room_ids=[e.room_id for e in room_list if e.membership == Membership.JOIN], + token=now_token, + ) joined_rooms_list = frozenset([ room_id for room_id, _f in sorted(room_to_last_ts.items(), key=lambda item: -item[1]) - ][:20]) + ][:pagination_limit]) @defer.inlineCallbacks def _generate_room_entry(event): @@ -326,9 +346,7 @@ class SyncHandler(BaseHandler): self.account_data_for_user(account_data) ) - presence = sync_config.filter_collection.filter_presence( - presence - ) + presence = sync_config.filter_collection.filter_presence(presence) defer.returnValue(SyncResult( presence=presence, @@ -336,7 +354,10 @@ class SyncHandler(BaseHandler): joined=joined, invited=invited, archived=archived, - next_batch=now_token, + next_batch=SyncNextBatchToken( + stream_token=now_token, + pagination_config=room_pagination_config, + ), )) @defer.inlineCallbacks @@ -480,12 +501,14 @@ class SyncHandler(BaseHandler): ) @defer.inlineCallbacks - def incremental_sync_with_gap(self, sync_config, since_token): + def incremental_sync_with_gap(self, sync_config, batch_token): """ Get the incremental delta needed to bring the client up to date with the server. Returns: A Deferred SyncResult. """ + since_token = batch_token.stream_token + now_token = yield self.event_sources.get_current_token() rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) @@ -693,7 +716,7 @@ class SyncHandler(BaseHandler): joined=joined, invited=invited, archived=archived, - next_batch=now_token, + next_batch=batch_token.replace(stream_token=now_token), )) @defer.inlineCallbacks diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 60d3dc4030..37fd1539f6 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -19,7 +19,7 @@ from synapse.http.servlet import ( RestServlet, parse_string, parse_integer, parse_boolean ) from synapse.handlers.sync import SyncConfig -from synapse.types import StreamToken +from synapse.types import SyncNextBatchToken from synapse.events.utils import ( serialize_event, format_event_for_client_v2_without_room_id, ) @@ -140,9 +140,9 @@ class SyncRestServlet(RestServlet): ) if since is not None: - since_token = StreamToken.from_string(since) + batch_token = SyncNextBatchToken.from_string(since) else: - since_token = None + batch_token = None affect_presence = set_presence != PresenceState.OFFLINE @@ -154,7 +154,7 @@ class SyncRestServlet(RestServlet): ) with context: sync_result = yield self.sync_handler.wait_for_sync_for_user( - sync_config, since_token=since_token, timeout=timeout, + sync_config, batch_token=batch_token, timeout=timeout, full_state=full_state ) diff --git a/synapse/types.py b/synapse/types.py index 42fd9c7204..b53d91747b 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -17,6 +17,9 @@ from synapse.api.errors import SynapseError from collections import namedtuple +from unpaddedbase64 import encode_base64, decode_base64 +import ujson as json + Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"]) @@ -112,6 +115,30 @@ class EventID(DomainSpecificString): SIGIL = "$" +class SyncNextBatchToken( + namedtuple("SyncNextBatchToken", ( + "stream_token", + "pagination_config", + )) +): + @classmethod + def from_string(cls, string): + try: + d = json.loads(decode_base64(string)) + return cls(StreamToken.from_string(d["t"]), d.get("pa", {})) + except: + raise SynapseError(400, "Invalid Token") + + def to_string(self): + return encode_base64(json.dumps({ + "t": self.stream_token.to_string(), + "pa": self.pagination_config, + })) + + def replace(self, **kwargs): + return self._replace(**kwargs) + + class StreamToken( namedtuple("Token", ( "room_key",