Basic extra include pagination impl

erikj/paginate_sync
Erik Johnston 2016-05-25 15:54:32 +01:00
parent 26c7f08465
commit 43cbde4653
3 changed files with 80 additions and 37 deletions

View File

@ -49,6 +49,12 @@ SYNC_PAGINATION_ORDER_TS = "o"
SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,) SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,)
SyncExtras = collections.namedtuple("SyncExtras", [
"paginate",
"rooms",
])
class TimelineBatch(collections.namedtuple("TimelineBatch", [ class TimelineBatch(collections.namedtuple("TimelineBatch", [
"prev_batch", "prev_batch",
"events", "events",
@ -152,7 +158,7 @@ class SyncHandler(object):
self.response_cache = ResponseCache() self.response_cache = ResponseCache()
def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0, def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0,
full_state=False): full_state=False, extras=None):
"""Get the sync for a client if we have new data for it now. Otherwise """Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result. return an empty sync result.
@ -164,14 +170,14 @@ class SyncHandler(object):
result = self.response_cache.set( result = self.response_cache.set(
sync_config.request_key, sync_config.request_key,
self._wait_for_sync_for_user( self._wait_for_sync_for_user(
sync_config, batch_token, timeout, full_state sync_config, batch_token, timeout, full_state, extras,
) )
) )
return result return result
@defer.inlineCallbacks @defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, batch_token, timeout, def _wait_for_sync_for_user(self, sync_config, batch_token, timeout,
full_state): full_state, extras=None):
context = LoggingContext.current_context() context = LoggingContext.current_context()
if context: if context:
if batch_token is None: if batch_token is None:
@ -184,13 +190,15 @@ class SyncHandler(object):
if timeout == 0 or batch_token is None or full_state: if timeout == 0 or batch_token is None or full_state:
# we are going to return immediately, so don't bother calling # we are going to return immediately, so don't bother calling
# notifier.wait_for_events. # notifier.wait_for_events.
result = yield self.current_sync_for_user( result = yield self.generate_sync_result(
sync_config, batch_token, full_state=full_state, sync_config, batch_token, full_state=full_state, extras=extras,
) )
defer.returnValue(result) defer.returnValue(result)
else: else:
def current_sync_callback(before_token, after_token): def current_sync_callback(before_token, after_token):
return self.current_sync_for_user(sync_config, batch_token) return self.generate_sync_result(
sync_config, batch_token, full_state=False, extras=extras,
)
result = yield self.notifier.wait_for_events( result = yield self.notifier.wait_for_events(
sync_config.user.to_string(), timeout, current_sync_callback, sync_config.user.to_string(), timeout, current_sync_callback,
@ -198,14 +206,6 @@ class SyncHandler(object):
) )
defer.returnValue(result) defer.returnValue(result)
def current_sync_for_user(self, sync_config, batch_token=None,
full_state=False):
"""Get the sync for client needed to match what the server has now.
Returns:
A Deferred SyncResult.
"""
return self.generate_sync_result(sync_config, batch_token, full_state)
@defer.inlineCallbacks @defer.inlineCallbacks
def push_rules_for_user(self, user): def push_rules_for_user(self, user):
user_id = user.to_string() user_id = user.to_string()
@ -502,7 +502,8 @@ class SyncHandler(object):
defer.returnValue(None) defer.returnValue(None)
@defer.inlineCallbacks @defer.inlineCallbacks
def generate_sync_result(self, sync_config, batch_token=None, full_state=False): def generate_sync_result(self, sync_config, batch_token=None, full_state=False,
extras=None):
"""Generates a sync result. """Generates a sync result.
Args: Args:
@ -531,7 +532,7 @@ class SyncHandler(object):
) )
res = yield self._generate_sync_entry_for_rooms( res = yield self._generate_sync_entry_for_rooms(
sync_result_builder, account_data_by_room sync_result_builder, account_data_by_room, extras,
) )
newly_joined_rooms, newly_joined_users = res newly_joined_rooms, newly_joined_users = res
@ -658,7 +659,8 @@ class SyncHandler(object):
sync_result_builder.presence = presence sync_result_builder.presence = presence
@defer.inlineCallbacks @defer.inlineCallbacks
def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room): def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room,
extras):
"""Generates the rooms portion of the sync response. Populates the """Generates the rooms portion of the sync response. Populates the
`sync_result_builder` with the result. `sync_result_builder` with the result.
@ -713,6 +715,8 @@ class SyncHandler(object):
else: else:
pagination_config = None pagination_config = None
include_map = extras.get("peek", {}) if extras else {}
if sync_result_builder.pagination_state: if sync_result_builder.pagination_state:
missing_state = yield self._get_rooms_that_need_full_state( missing_state = yield self._get_rooms_that_need_full_state(
room_ids=[r.room_id for r in room_entries], room_ids=[r.room_id for r in room_entries],
@ -725,34 +729,54 @@ class SyncHandler(object):
for r in room_entries: for r in room_entries:
if r.room_id in missing_state: if r.room_id in missing_state:
r.full_state = True r.full_state = True
if r.room_id in include_map:
r.always_include = True
r.events = None
r.since_token = None
r.upto_token = now_token
new_pagination_state = None
if pagination_config: if pagination_config:
room_ids = [r.room_id for r in room_entries] room_ids = [r.room_id for r in room_entries]
pagination_limit = pagination_config.limit pagination_limit = pagination_config.limit
extra_limit = extras.get("paginate", {}).get("limit", 0) if extras else 0
room_map = yield self._get_room_timestamps_at_token( room_map = yield self._get_room_timestamps_at_token(
room_ids, sync_result_builder.now_token, sync_config, pagination_limit room_ids, sync_result_builder.now_token, sync_config,
pagination_limit + extra_limit,
) )
if room_map: if room_map:
sorted_list = sorted( sorted_list = sorted(
room_map.items(), room_map.items(),
key=lambda item: -item[1] key=lambda item: -item[1]
)[:pagination_limit] )[:pagination_limit + extra_limit]
if sorted_list[pagination_limit:]:
new_room_ids = set(r[0] for r in sorted_list[pagination_limit:])
for r in room_entries:
if r.room_id in new_room_ids:
r.full_state = True
r.always_include = True
r.since_token = None
r.upto_token = now_token
r.events = None
_, bottom_ts = sorted_list[-1] _, bottom_ts = sorted_list[-1]
value = bottom_ts value = bottom_ts
new_pagination_state = SyncPaginationState( sync_result_builder.pagination_state = SyncPaginationState(
order=pagination_config.order, value=value, limit=pagination_limit, order=pagination_config.order, value=value,
limit=pagination_limit + extra_limit,
) )
else:
new_pagination_state = None
room_entries = [r for r in room_entries if r.room_id in room_map] if len(room_map) == len(room_entries):
sync_result_builder.pagination_state = None
sync_result_builder.pagination_state = new_pagination_state room_entries = [
r for r in room_entries
if r.room_id in room_map or r.always_include
]
sync_result_builder.full_state |= sync_result_builder.since_token is None sync_result_builder.full_state |= sync_result_builder.since_token is None
@ -764,7 +788,6 @@ class SyncHandler(object):
ephemeral=ephemeral_by_room.get(room_entry.room_id, []), ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
tags=tags_by_room.get(room_entry.room_id), tags=tags_by_room.get(room_entry.room_id),
account_data=account_data_by_room.get(room_entry.room_id, {}), account_data=account_data_by_room.get(room_entry.room_id, {}),
always_include=sync_result_builder.full_state,
) )
yield concurrently_execute(handle_room_entries, room_entries, 10) yield concurrently_execute(handle_room_entries, room_entries, 10)
@ -995,8 +1018,7 @@ class SyncHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _generate_room_entry(self, sync_result_builder, ignored_users, def _generate_room_entry(self, sync_result_builder, ignored_users,
room_builder, ephemeral, tags, account_data, room_builder, ephemeral, tags, account_data):
always_include=False):
"""Populates the `joined` and `archived` section of `sync_result_builder` """Populates the `joined` and `archived` section of `sync_result_builder`
based on the `room_builder`. based on the `room_builder`.
@ -1012,7 +1034,11 @@ class SyncHandler(object):
even if empty. even if empty.
""" """
newly_joined = room_builder.newly_joined newly_joined = room_builder.newly_joined
always_include = always_include or newly_joined or sync_result_builder.full_state always_include = (
newly_joined
or sync_result_builder.full_state
or room_builder.always_include
)
full_state = ( full_state = (
room_builder.full_state room_builder.full_state
or newly_joined or newly_joined
@ -1025,7 +1051,6 @@ class SyncHandler(object):
if events == [] and tags is None: if events == [] and tags is None:
return return
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config sync_config = sync_result_builder.sync_config
@ -1166,7 +1191,7 @@ class SyncHandler(object):
start_ts = yield self._get_room_timestamps_at_token( start_ts = yield self._get_room_timestamps_at_token(
room_ids, since_token, room_ids, since_token,
sync_config=sync_config, sync_config=sync_config,
limit=pagination_state.limit, limit=len(room_ids),
) )
missing_list = frozenset( missing_list = frozenset(
@ -1263,7 +1288,7 @@ class RoomSyncResultBuilder(object):
__slots__ = ( __slots__ = (
"room_id", "rtype", "events", "newly_joined", "full_state", "since_token", "room_id", "rtype", "events", "newly_joined", "full_state", "since_token",
"upto_token", "upto_token", "always_include",
) )
def __init__(self, room_id, rtype, events, newly_joined, full_state, def __init__(self, room_id, rtype, events, newly_joined, full_state,
@ -1286,3 +1311,4 @@ class RoomSyncResultBuilder(object):
self.full_state = full_state self.full_state = full_state
self.since_token = since_token self.since_token = since_token
self.upto_token = upto_token self.upto_token = upto_token
self.always_include = False

View File

@ -98,6 +98,8 @@ class SyncRestServlet(RestServlet):
since = body.get("since", None) since = body.get("since", None)
extras = body.get("extras", None)
if "from" in body: if "from" in body:
# /events used to use 'from', but /sync uses 'since'. # /events used to use 'from', but /sync uses 'since'.
# Lets be helpful and whine if we see a 'from'. # Lets be helpful and whine if we see a 'from'.
@ -162,6 +164,7 @@ class SyncRestServlet(RestServlet):
set_presence=set_presence, set_presence=set_presence,
full_state=full_state, full_state=full_state,
timeout=timeout, timeout=timeout,
extras=extras,
) )
defer.returnValue(sync_result) defer.returnValue(sync_result)
@ -239,7 +242,7 @@ class SyncRestServlet(RestServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_sync(self, requester, sync_config, batch_token, set_presence, def _handle_sync(self, requester, sync_config, batch_token, set_presence,
full_state, timeout): full_state, timeout, extras=None):
affect_presence = set_presence != PresenceState.OFFLINE affect_presence = set_presence != PresenceState.OFFLINE
user = sync_config.user user = sync_config.user
@ -253,7 +256,7 @@ class SyncRestServlet(RestServlet):
with context: with context:
sync_result = yield self.sync_handler.wait_for_sync_for_user( sync_result = yield self.sync_handler.wait_for_sync_for_user(
sync_config, batch_token=batch_token, timeout=timeout, sync_config, batch_token=batch_token, timeout=timeout,
full_state=full_state full_state=full_state, extras=extras,
) )
time_now = self.clock.time_msec() time_now = self.clock.time_msec()

View File

@ -129,7 +129,7 @@ class SyncNextBatchToken(
if pa: if pa:
pa = SyncPaginationState.from_dict(pa) pa = SyncPaginationState.from_dict(pa)
return cls( return cls(
stream_token=StreamToken.from_string(d["t"]), stream_token=StreamToken.from_arr(d["t"]),
pagination_state=pa, pagination_state=pa,
) )
except: except:
@ -137,7 +137,7 @@ class SyncNextBatchToken(
def to_string(self): def to_string(self):
return encode_base64(json.dumps({ return encode_base64(json.dumps({
"t": self.stream_token.to_string(), "t": self.stream_token.to_arr(),
"pa": self.pagination_state.to_dict() if self.pagination_state else None, "pa": self.pagination_state.to_dict() if self.pagination_state else None,
})) }))
@ -196,6 +196,20 @@ class StreamToken(
def to_string(self): def to_string(self):
return self._SEPARATOR.join([str(k) for k in self]) return self._SEPARATOR.join([str(k) for k in self])
@classmethod
def from_arr(cls, arr):
try:
keys = arr
while len(keys) < len(cls._fields):
# i.e. old token from before receipt_key
keys.append("0")
return cls(*keys)
except:
raise SynapseError(400, "Invalid Token")
def to_arr(self):
return self
@property @property
def room_stream_id(self): def room_stream_id(self):
# TODO(markjh): Awful hack to work around hacks in the presence tests # TODO(markjh): Awful hack to work around hacks in the presence tests