Implement basic pagination

erikj/paginate_sync
Erik Johnston 2016-05-24 17:10:23 +01:00
parent 4902770e32
commit 26c7f08465
4 changed files with 118 additions and 16 deletions

View File

@ -20,7 +20,7 @@ from synapse.util.metrics import Measure
from synapse.util.caches.response_cache import ResponseCache from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user from synapse.push.clientformat import format_push_rules_for_user
from synapse.visibility import filter_events_for_client from synapse.visibility import filter_events_for_client
from synapse.types import SyncNextBatchToken from synapse.types import SyncNextBatchToken, SyncPaginationState
from twisted.internet import defer from twisted.internet import defer
@ -36,9 +36,19 @@ SyncConfig = collections.namedtuple("SyncConfig", [
"filter_collection", "filter_collection",
"is_guest", "is_guest",
"request_key", "request_key",
"pagination_config",
]) ])
SyncPaginationConfig = collections.namedtuple("SyncPaginationConfig", [
"order",
"limit",
])
SYNC_PAGINATION_ORDER_TS = "o"
SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,)
class TimelineBatch(collections.namedtuple("TimelineBatch", [ class TimelineBatch(collections.namedtuple("TimelineBatch", [
"prev_batch", "prev_batch",
"events", "events",
@ -537,7 +547,7 @@ class SyncHandler(object):
archived=sync_result_builder.archived, archived=sync_result_builder.archived,
next_batch=SyncNextBatchToken( next_batch=SyncNextBatchToken(
stream_token=sync_result_builder.now_token, stream_token=sync_result_builder.now_token,
pagination_config=batch_token.pagination_config if batch_token else None, pagination_state=sync_result_builder.pagination_state,
) )
)) ))
@ -661,6 +671,7 @@ class SyncHandler(object):
`(newly_joined_rooms, newly_joined_users)` `(newly_joined_rooms, newly_joined_users)`
""" """
user_id = sync_result_builder.sync_config.user.to_string() user_id = sync_result_builder.sync_config.user.to_string()
sync_config = sync_result_builder.sync_config
now_token, ephemeral_by_room = yield self.ephemeral_by_room( now_token, ephemeral_by_room = yield self.ephemeral_by_room(
sync_result_builder.sync_config, sync_result_builder.sync_config,
@ -692,6 +703,59 @@ class SyncHandler(object):
tags_by_room = yield self.store.get_tags_for_user(user_id) tags_by_room = yield self.store.get_tags_for_user(user_id)
if sync_config.pagination_config:
pagination_config = sync_config.pagination_config
elif sync_result_builder.pagination_state:
pagination_config = SyncPaginationConfig(
order=sync_result_builder.pagination_state.order,
limit=sync_result_builder.pagination_state.limit,
)
else:
pagination_config = None
if sync_result_builder.pagination_state:
missing_state = yield self._get_rooms_that_need_full_state(
room_ids=[r.room_id for r in room_entries],
sync_config=sync_config,
since_token=sync_result_builder.since_token,
pagination_state=sync_result_builder.pagination_state,
)
if missing_state:
for r in room_entries:
if r.room_id in missing_state:
r.full_state = True
new_pagination_state = None
if pagination_config:
room_ids = [r.room_id for r in room_entries]
pagination_limit = pagination_config.limit
room_map = yield self._get_room_timestamps_at_token(
room_ids, sync_result_builder.now_token, sync_config, pagination_limit
)
if room_map:
sorted_list = sorted(
room_map.items(),
key=lambda item: -item[1]
)[:pagination_limit]
_, bottom_ts = sorted_list[-1]
value = bottom_ts
new_pagination_state = SyncPaginationState(
order=pagination_config.order, value=value, limit=pagination_limit,
)
else:
new_pagination_state = None
room_entries = [r for r in room_entries if r.room_id in room_map]
sync_result_builder.pagination_state = new_pagination_state
sync_result_builder.full_state |= sync_result_builder.since_token is None
def handle_room_entries(room_entry): def handle_room_entries(room_entry):
return self._generate_room_entry( return self._generate_room_entry(
sync_result_builder, sync_result_builder,
@ -948,6 +1012,7 @@ class SyncHandler(object):
even if empty. even if empty.
""" """
newly_joined = room_builder.newly_joined newly_joined = room_builder.newly_joined
always_include = always_include or newly_joined or sync_result_builder.full_state
full_state = ( full_state = (
room_builder.full_state room_builder.full_state
or newly_joined or newly_joined
@ -956,7 +1021,7 @@ class SyncHandler(object):
events = room_builder.events events = room_builder.events
# We want to shortcut out as early as possible. # We want to shortcut out as early as possible.
if not (always_include or account_data or ephemeral or full_state): if not (always_include or account_data or ephemeral):
if events == [] and tags is None: if events == [] and tags is None:
return return
@ -995,7 +1060,7 @@ class SyncHandler(object):
ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral) ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
if not (always_include or batch or account_data or ephemeral or full_state): if not (always_include or batch or account_data or ephemeral):
return return
state = yield self.compute_state_delta( state = yield self.compute_state_delta(
@ -1037,13 +1102,12 @@ class SyncHandler(object):
raise Exception("Unrecognized rtype: %r", room_builder.rtype) raise Exception("Unrecognized rtype: %r", room_builder.rtype)
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_room_timestamps_at_token(self, room_ids, token, sync_config, def _get_room_timestamps_at_token(self, room_ids, token, sync_config, limit):
limit):
room_to_entries = {} room_to_entries = {}
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_last_ts(room_id): def _get_last_ts(room_id):
entry = yield self.store.get_last_ts_for_room( entry = yield self.store.get_last_event_id_ts_for_room(
room_id, token.room_key room_id, token.room_key
) )
@ -1096,6 +1160,23 @@ class SyncHandler(object):
defer.returnValue(to_return) defer.returnValue(to_return)
@defer.inlineCallbacks
def _get_rooms_that_need_full_state(self, room_ids, sync_config, since_token,
pagination_state):
start_ts = yield self._get_room_timestamps_at_token(
room_ids, since_token,
sync_config=sync_config,
limit=pagination_state.limit,
)
missing_list = frozenset(
room_id for room_id, ts in
sorted(start_ts.items(), key=lambda item: -item[1])
if ts < pagination_state.value
)
defer.returnValue(missing_list)
def _action_has_highlight(actions): def _action_has_highlight(actions):
for action in actions: for action in actions:
@ -1147,6 +1228,12 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
class SyncResultBuilder(object): class SyncResultBuilder(object):
"Used to help build up a new SyncResult for a user" "Used to help build up a new SyncResult for a user"
__slots__ = (
"sync_config", "full_state", "batch_token", "since_token", "pagination_state",
"now_token", "presence", "account_data", "joined", "invited", "archived",
)
def __init__(self, sync_config, full_state, batch_token, now_token): def __init__(self, sync_config, full_state, batch_token, now_token):
""" """
Args: Args:
@ -1159,6 +1246,7 @@ class SyncResultBuilder(object):
self.full_state = full_state self.full_state = full_state
self.batch_token = batch_token self.batch_token = batch_token
self.since_token = batch_token.stream_token if batch_token else None self.since_token = batch_token.stream_token if batch_token else None
self.pagination_state = batch_token.pagination_state if batch_token else None
self.now_token = now_token self.now_token = now_token
self.presence = [] self.presence = []
@ -1172,6 +1260,12 @@ class RoomSyncResultBuilder(object):
"""Stores information needed to create either a `JoinedSyncResult` or """Stores information needed to create either a `JoinedSyncResult` or
`ArchivedSyncResult`. `ArchivedSyncResult`.
""" """
__slots__ = (
"room_id", "rtype", "events", "newly_joined", "full_state", "since_token",
"upto_token",
)
def __init__(self, room_id, rtype, events, newly_joined, full_state, def __init__(self, room_id, rtype, events, newly_joined, full_state,
since_token, upto_token): since_token, upto_token):
""" """

View File

@ -19,7 +19,7 @@ from synapse.http.servlet import (
RestServlet, parse_string, parse_integer, parse_boolean, RestServlet, parse_string, parse_integer, parse_boolean,
parse_json_object_from_request, parse_json_object_from_request,
) )
from synapse.handlers.sync import SyncConfig from synapse.handlers.sync import SyncConfig, SyncPaginationConfig
from synapse.types import SyncNextBatchToken from synapse.types import SyncNextBatchToken
from synapse.events.utils import ( from synapse.events.utils import (
serialize_event, format_event_for_client_v2_without_room_id, serialize_event, format_event_for_client_v2_without_room_id,
@ -116,6 +116,7 @@ class SyncRestServlet(RestServlet):
filter_id = body.get("filter_id", None) filter_id = body.get("filter_id", None)
filter_dict = body.get("filter", None) filter_dict = body.get("filter", None)
pagination_config = body.get("pagination_config", None)
if filter_dict is not None and filter_id is not None: if filter_dict is not None and filter_id is not None:
raise SynapseError( raise SynapseError(
@ -143,6 +144,10 @@ class SyncRestServlet(RestServlet):
filter_collection=filter_collection, filter_collection=filter_collection,
is_guest=requester.is_guest, is_guest=requester.is_guest,
request_key=request_key, request_key=request_key,
pagination_config=SyncPaginationConfig(
order=pagination_config["order"],
limit=pagination_config["limit"],
) if pagination_config else None,
) )
if since is not None: if since is not None:
@ -213,6 +218,7 @@ class SyncRestServlet(RestServlet):
filter_collection=filter, filter_collection=filter,
is_guest=requester.is_guest, is_guest=requester.is_guest,
request_key=request_key, request_key=request_key,
pagination_config=None,
) )
if since is not None: if since is not None:

View File

@ -547,7 +547,7 @@ class StreamStore(SQLBaseStore):
else: else:
return None return None
return self.runInteraction("get_last_ts_for_room", f) return self.runInteraction("get_last_event_id_ts_for_room", f)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_events_around(self, room_id, event_id, before_limit, after_limit): def get_events_around(self, room_id, event_id, before_limit, after_limit):

View File

@ -118,7 +118,7 @@ class EventID(DomainSpecificString):
class SyncNextBatchToken( class SyncNextBatchToken(
namedtuple("SyncNextBatchToken", ( namedtuple("SyncNextBatchToken", (
"stream_token", "stream_token",
"pagination_config", "pagination_state",
)) ))
): ):
@classmethod @classmethod
@ -127,10 +127,10 @@ class SyncNextBatchToken(
d = json.loads(decode_base64(string)) d = json.loads(decode_base64(string))
pa = d.get("pa", None) pa = d.get("pa", None)
if pa: if pa:
pa = SyncPaginationConfig.from_dict(pa) pa = SyncPaginationState.from_dict(pa)
return cls( return cls(
stream_token=StreamToken.from_string(d["t"]), stream_token=StreamToken.from_string(d["t"]),
pagination_config=pa, pagination_state=pa,
) )
except: except:
raise SynapseError(400, "Invalid Token") raise SynapseError(400, "Invalid Token")
@ -138,23 +138,24 @@ class SyncNextBatchToken(
def to_string(self): def to_string(self):
return encode_base64(json.dumps({ return encode_base64(json.dumps({
"t": self.stream_token.to_string(), "t": self.stream_token.to_string(),
"pa": self.pagination_config.to_dict() if self.pagination_config else None, "pa": self.pagination_state.to_dict() if self.pagination_state else None,
})) }))
def replace(self, **kwargs): def replace(self, **kwargs):
return self._replace(**kwargs) return self._replace(**kwargs)
class SyncPaginationConfig( class SyncPaginationState(
namedtuple("SyncPaginationConfig", ( namedtuple("SyncPaginationState", (
"order", "order",
"value", "value",
"limit",
)) ))
): ):
@classmethod @classmethod
def from_dict(cls, d): def from_dict(cls, d):
try: try:
return cls(d["o"], d["v"]) return cls(d["o"], d["v"], d["l"])
except: except:
raise SynapseError(400, "Invalid Token") raise SynapseError(400, "Invalid Token")
@ -162,6 +163,7 @@ class SyncPaginationConfig(
return { return {
"o": self.order, "o": self.order,
"v": self.value, "v": self.value,
"l": self.limit,
} }
def replace(self, **kwargs): def replace(self, **kwargs):