Change token format

erikj/paginate_sync
Erik Johnston 2016-05-16 16:59:18 +01:00
parent a2decbdd66
commit 32d476d4f1
3 changed files with 86 additions and 36 deletions

View File

@ -24,6 +24,8 @@ from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user from synapse.push.clientformat import format_push_rules_for_user
from synapse.visibility import filter_events_for_client from synapse.visibility import filter_events_for_client
from synapse.types import SyncNextBatchToken
from twisted.internet import defer from twisted.internet import defer
import collections import collections
@ -141,7 +143,7 @@ class SyncHandler(BaseHandler):
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.response_cache = ResponseCache() self.response_cache = ResponseCache()
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0,
full_state=False): full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise """Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then wait for new data to arrive on the server. If the timeout expires, then
@ -154,53 +156,68 @@ class SyncHandler(BaseHandler):
result = self.response_cache.set( result = self.response_cache.set(
sync_config.request_key, sync_config.request_key,
self._wait_for_sync_for_user( self._wait_for_sync_for_user(
sync_config, since_token, timeout, full_state sync_config, batch_token, timeout, full_state
) )
) )
return result return result
@defer.inlineCallbacks @defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout, def _wait_for_sync_for_user(self, sync_config, batch_token, timeout,
full_state): full_state):
context = LoggingContext.current_context() context = LoggingContext.current_context()
if context: if context:
if since_token is None: if batch_token is None:
context.tag = "initial_sync" context.tag = "initial_sync"
elif full_state: elif full_state:
context.tag = "full_state_sync" context.tag = "full_state_sync"
else: else:
context.tag = "incremental_sync" context.tag = "incremental_sync"
if timeout == 0 or since_token is None or full_state: if timeout == 0 or batch_token is None or full_state:
# we are going to return immediately, so don't bother calling # we are going to return immediately, so don't bother calling
# notifier.wait_for_events. # notifier.wait_for_events.
result = yield self.current_sync_for_user( result = yield self.current_sync_for_user(
sync_config, since_token, full_state=full_state, sync_config, batch_token, full_state=full_state,
) )
defer.returnValue(result) defer.returnValue(result)
else: else:
def current_sync_callback(before_token, after_token): def current_sync_callback(before_token, after_token):
return self.current_sync_for_user(sync_config, since_token) return self.current_sync_for_user(sync_config, batch_token)
result = yield self.notifier.wait_for_events( result = yield self.notifier.wait_for_events(
sync_config.user.to_string(), timeout, current_sync_callback, sync_config.user.to_string(), timeout, current_sync_callback,
from_token=since_token, from_token=batch_token.stream_token,
) )
defer.returnValue(result) defer.returnValue(result)
def current_sync_for_user(self, sync_config, since_token=None, def current_sync_for_user(self, sync_config, batch_token=None,
full_state=False): full_state=False):
"""Get the sync for client needed to match what the server has now. """Get the sync for client needed to match what the server has now.
Returns: Returns:
A Deferred SyncResult. A Deferred SyncResult.
""" """
if since_token is None or full_state: if batch_token is None or full_state:
return self.full_state_sync(sync_config, since_token) return self.full_state_sync(sync_config, batch_token)
else: else:
return self.incremental_sync_with_gap(sync_config, since_token) return self.incremental_sync_with_gap(sync_config, batch_token)
@defer.inlineCallbacks @defer.inlineCallbacks
def full_state_sync(self, sync_config, timeline_since_token): def _get_room_timestamps_at_token(self, room_ids, token):
room_to_last_ts = {}
@defer.inlineCallbacks
def _get_last_ts(room_id):
ts = yield self.store.get_last_ts_for_room(
room_id, token.room_key
)
room_to_last_ts[room_id] = ts if ts else 0
logger.info("room_to_last_ts: %r", room_to_last_ts)
yield concurrently_execute(_get_last_ts, room_ids, 10)
defer.returnValue(room_to_last_ts)
@defer.inlineCallbacks
def full_state_sync(self, sync_config, batch_token):
"""Get a sync for a client which is starting without any state. """Get a sync for a client which is starting without any state.
If a 'message_since_token' is given, only timeline events which have If a 'message_since_token' is given, only timeline events which have
@ -209,6 +226,11 @@ class SyncHandler(BaseHandler):
Returns: Returns:
A Deferred SyncResult. A Deferred SyncResult.
""" """
if batch_token:
timeline_since_token = batch_token.stream_token
else:
timeline_since_token = None
now_token = yield self.event_sources.get_current_token() now_token = yield self.event_sources.get_current_token()
now_token, ephemeral_by_room = yield self.ephemeral_by_room( now_token, ephemeral_by_room = yield self.ephemeral_by_room(
@ -258,24 +280,22 @@ class SyncHandler(BaseHandler):
user_id = sync_config.user.to_string() user_id = sync_config.user.to_string()
room_to_last_ts = {} pagination_limit = 20
room_pagination_config = {
"l": pagination_limit,
"o": 0,
"t": now_token,
}
@defer.inlineCallbacks room_to_last_ts = yield self._get_room_timestamps_at_token(
def _get_last_ts(event): room_ids=[e.room_id for e in room_list if e.membership == Membership.JOIN],
room_id = event.room_id token=now_token,
if event.membership == Membership.JOIN: )
ts = yield self.store.get_last_ts_for_room(
room_id, now_token.room_key
)
room_to_last_ts[room_id] = ts if ts else 0
logger.info("room_to_last_ts: %r", room_to_last_ts)
yield concurrently_execute(_get_last_ts, room_list, 10)
joined_rooms_list = frozenset([ joined_rooms_list = frozenset([
room_id for room_id, _f in room_id for room_id, _f in
sorted(room_to_last_ts.items(), key=lambda item: -item[1]) sorted(room_to_last_ts.items(), key=lambda item: -item[1])
][:20]) ][:pagination_limit])
@defer.inlineCallbacks @defer.inlineCallbacks
def _generate_room_entry(event): def _generate_room_entry(event):
@ -326,9 +346,7 @@ class SyncHandler(BaseHandler):
self.account_data_for_user(account_data) self.account_data_for_user(account_data)
) )
presence = sync_config.filter_collection.filter_presence( presence = sync_config.filter_collection.filter_presence(presence)
presence
)
defer.returnValue(SyncResult( defer.returnValue(SyncResult(
presence=presence, presence=presence,
@ -336,7 +354,10 @@ class SyncHandler(BaseHandler):
joined=joined, joined=joined,
invited=invited, invited=invited,
archived=archived, archived=archived,
next_batch=now_token, next_batch=SyncNextBatchToken(
stream_token=now_token,
pagination_config=room_pagination_config,
),
)) ))
@defer.inlineCallbacks @defer.inlineCallbacks
@ -480,12 +501,14 @@ class SyncHandler(BaseHandler):
) )
@defer.inlineCallbacks @defer.inlineCallbacks
def incremental_sync_with_gap(self, sync_config, since_token): def incremental_sync_with_gap(self, sync_config, batch_token):
""" Get the incremental delta needed to bring the client up to """ Get the incremental delta needed to bring the client up to
date with the server. date with the server.
Returns: Returns:
A Deferred SyncResult. A Deferred SyncResult.
""" """
since_token = batch_token.stream_token
now_token = yield self.event_sources.get_current_token() now_token = yield self.event_sources.get_current_token()
rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
@ -693,7 +716,7 @@ class SyncHandler(BaseHandler):
joined=joined, joined=joined,
invited=invited, invited=invited,
archived=archived, archived=archived,
next_batch=now_token, next_batch=batch_token.replace(stream_token=now_token),
)) ))
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -19,7 +19,7 @@ from synapse.http.servlet import (
RestServlet, parse_string, parse_integer, parse_boolean RestServlet, parse_string, parse_integer, parse_boolean
) )
from synapse.handlers.sync import SyncConfig from synapse.handlers.sync import SyncConfig
from synapse.types import StreamToken from synapse.types import SyncNextBatchToken
from synapse.events.utils import ( from synapse.events.utils import (
serialize_event, format_event_for_client_v2_without_room_id, serialize_event, format_event_for_client_v2_without_room_id,
) )
@ -140,9 +140,9 @@ class SyncRestServlet(RestServlet):
) )
if since is not None: if since is not None:
since_token = StreamToken.from_string(since) batch_token = SyncNextBatchToken.from_string(since)
else: else:
since_token = None batch_token = None
affect_presence = set_presence != PresenceState.OFFLINE affect_presence = set_presence != PresenceState.OFFLINE
@ -154,7 +154,7 @@ class SyncRestServlet(RestServlet):
) )
with context: with context:
sync_result = yield self.sync_handler.wait_for_sync_for_user( sync_result = yield self.sync_handler.wait_for_sync_for_user(
sync_config, since_token=since_token, timeout=timeout, sync_config, batch_token=batch_token, timeout=timeout,
full_state=full_state full_state=full_state
) )

View File

@ -17,6 +17,9 @@ from synapse.api.errors import SynapseError
from collections import namedtuple from collections import namedtuple
from unpaddedbase64 import encode_base64, decode_base64
import ujson as json
Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"]) Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"])
@ -112,6 +115,30 @@ class EventID(DomainSpecificString):
SIGIL = "$" SIGIL = "$"
class SyncNextBatchToken(
namedtuple("SyncNextBatchToken", (
"stream_token",
"pagination_config",
))
):
@classmethod
def from_string(cls, string):
try:
d = json.loads(decode_base64(string))
return cls(StreamToken.from_string(d["t"]), d.get("pa", {}))
except:
raise SynapseError(400, "Invalid Token")
def to_string(self):
return encode_base64(json.dumps({
"t": self.stream_token.to_string(),
"pa": self.pagination_config,
}))
def replace(self, **kwargs):
return self._replace(**kwargs)
class StreamToken( class StreamToken(
namedtuple("Token", ( namedtuple("Token", (
"room_key", "room_key",