From 26c7f08465a23c28fafbef4bf45d249b8404a300 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 17:10:23 +0100 Subject: [PATCH] Implement basic pagination --- synapse/handlers/sync.py | 108 +++++++++++++++++++++++++-- synapse/rest/client/v2_alpha/sync.py | 8 +- synapse/storage/stream.py | 2 +- synapse/types.py | 16 ++-- 4 files changed, 118 insertions(+), 16 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4ca9ff4dbc..a880845605 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -20,7 +20,7 @@ from synapse.util.metrics import Measure from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client -from synapse.types import SyncNextBatchToken +from synapse.types import SyncNextBatchToken, SyncPaginationState from twisted.internet import defer @@ -36,9 +36,19 @@ SyncConfig = collections.namedtuple("SyncConfig", [ "filter_collection", "is_guest", "request_key", + "pagination_config", ]) +SyncPaginationConfig = collections.namedtuple("SyncPaginationConfig", [ + "order", + "limit", +]) + +SYNC_PAGINATION_ORDER_TS = "o" +SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,) + + class TimelineBatch(collections.namedtuple("TimelineBatch", [ "prev_batch", "events", @@ -537,7 +547,7 @@ class SyncHandler(object): archived=sync_result_builder.archived, next_batch=SyncNextBatchToken( stream_token=sync_result_builder.now_token, - pagination_config=batch_token.pagination_config if batch_token else None, + pagination_state=sync_result_builder.pagination_state, ) )) @@ -661,6 +671,7 @@ class SyncHandler(object): `(newly_joined_rooms, newly_joined_users)` """ user_id = sync_result_builder.sync_config.user.to_string() + sync_config = sync_result_builder.sync_config now_token, ephemeral_by_room = yield self.ephemeral_by_room( sync_result_builder.sync_config, @@ -692,6 +703,59 @@ class SyncHandler(object): tags_by_room = yield self.store.get_tags_for_user(user_id) + if sync_config.pagination_config: + pagination_config = sync_config.pagination_config + elif sync_result_builder.pagination_state: + pagination_config = SyncPaginationConfig( + order=sync_result_builder.pagination_state.order, + limit=sync_result_builder.pagination_state.limit, + ) + else: + pagination_config = None + + if sync_result_builder.pagination_state: + missing_state = yield self._get_rooms_that_need_full_state( + room_ids=[r.room_id for r in room_entries], + sync_config=sync_config, + since_token=sync_result_builder.since_token, + pagination_state=sync_result_builder.pagination_state, + ) + + if missing_state: + for r in room_entries: + if r.room_id in missing_state: + r.full_state = True + + new_pagination_state = None + if pagination_config: + room_ids = [r.room_id for r in room_entries] + pagination_limit = pagination_config.limit + + room_map = yield self._get_room_timestamps_at_token( + room_ids, sync_result_builder.now_token, sync_config, pagination_limit + ) + + if room_map: + sorted_list = sorted( + room_map.items(), + key=lambda item: -item[1] + )[:pagination_limit] + + _, bottom_ts = sorted_list[-1] + value = bottom_ts + + new_pagination_state = SyncPaginationState( + order=pagination_config.order, value=value, limit=pagination_limit, + ) + else: + new_pagination_state = None + + room_entries = [r for r in room_entries if r.room_id in room_map] + + sync_result_builder.pagination_state = new_pagination_state + + sync_result_builder.full_state |= sync_result_builder.since_token is None + def handle_room_entries(room_entry): return self._generate_room_entry( sync_result_builder, @@ -948,6 +1012,7 @@ class SyncHandler(object): even if empty. """ newly_joined = room_builder.newly_joined + always_include = always_include or newly_joined or sync_result_builder.full_state full_state = ( room_builder.full_state or newly_joined @@ -956,7 +1021,7 @@ class SyncHandler(object): events = room_builder.events # We want to shortcut out as early as possible. - if not (always_include or account_data or ephemeral or full_state): + if not (always_include or account_data or ephemeral): if events == [] and tags is None: return @@ -995,7 +1060,7 @@ class SyncHandler(object): ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral) - if not (always_include or batch or account_data or ephemeral or full_state): + if not (always_include or batch or account_data or ephemeral): return state = yield self.compute_state_delta( @@ -1037,13 +1102,12 @@ class SyncHandler(object): raise Exception("Unrecognized rtype: %r", room_builder.rtype) @defer.inlineCallbacks - def _get_room_timestamps_at_token(self, room_ids, token, sync_config, - limit): + def _get_room_timestamps_at_token(self, room_ids, token, sync_config, limit): room_to_entries = {} @defer.inlineCallbacks def _get_last_ts(room_id): - entry = yield self.store.get_last_ts_for_room( + entry = yield self.store.get_last_event_id_ts_for_room( room_id, token.room_key ) @@ -1096,6 +1160,23 @@ class SyncHandler(object): defer.returnValue(to_return) + @defer.inlineCallbacks + def _get_rooms_that_need_full_state(self, room_ids, sync_config, since_token, + pagination_state): + start_ts = yield self._get_room_timestamps_at_token( + room_ids, since_token, + sync_config=sync_config, + limit=pagination_state.limit, + ) + + missing_list = frozenset( + room_id for room_id, ts in + sorted(start_ts.items(), key=lambda item: -item[1]) + if ts < pagination_state.value + ) + + defer.returnValue(missing_list) + def _action_has_highlight(actions): for action in actions: @@ -1147,6 +1228,12 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): class SyncResultBuilder(object): "Used to help build up a new SyncResult for a user" + + __slots__ = ( + "sync_config", "full_state", "batch_token", "since_token", "pagination_state", + "now_token", "presence", "account_data", "joined", "invited", "archived", + ) + def __init__(self, sync_config, full_state, batch_token, now_token): """ Args: @@ -1159,6 +1246,7 @@ class SyncResultBuilder(object): self.full_state = full_state self.batch_token = batch_token self.since_token = batch_token.stream_token if batch_token else None + self.pagination_state = batch_token.pagination_state if batch_token else None self.now_token = now_token self.presence = [] @@ -1172,6 +1260,12 @@ class RoomSyncResultBuilder(object): """Stores information needed to create either a `JoinedSyncResult` or `ArchivedSyncResult`. """ + + __slots__ = ( + "room_id", "rtype", "events", "newly_joined", "full_state", "since_token", + "upto_token", + ) + def __init__(self, room_id, rtype, events, newly_joined, full_state, since_token, upto_token): """ diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index e1ece5d406..3df9743132 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -19,7 +19,7 @@ from synapse.http.servlet import ( RestServlet, parse_string, parse_integer, parse_boolean, parse_json_object_from_request, ) -from synapse.handlers.sync import SyncConfig +from synapse.handlers.sync import SyncConfig, SyncPaginationConfig from synapse.types import SyncNextBatchToken from synapse.events.utils import ( serialize_event, format_event_for_client_v2_without_room_id, @@ -116,6 +116,7 @@ class SyncRestServlet(RestServlet): filter_id = body.get("filter_id", None) filter_dict = body.get("filter", None) + pagination_config = body.get("pagination_config", None) if filter_dict is not None and filter_id is not None: raise SynapseError( @@ -143,6 +144,10 @@ class SyncRestServlet(RestServlet): filter_collection=filter_collection, is_guest=requester.is_guest, request_key=request_key, + pagination_config=SyncPaginationConfig( + order=pagination_config["order"], + limit=pagination_config["limit"], + ) if pagination_config else None, ) if since is not None: @@ -213,6 +218,7 @@ class SyncRestServlet(RestServlet): filter_collection=filter, is_guest=requester.is_guest, request_key=request_key, + pagination_config=None, ) if since is not None: diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index a85cdac038..ab991e877d 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -547,7 +547,7 @@ class StreamStore(SQLBaseStore): else: return None - return self.runInteraction("get_last_ts_for_room", f) + return self.runInteraction("get_last_event_id_ts_for_room", f) @defer.inlineCallbacks def get_events_around(self, room_id, event_id, before_limit, after_limit): diff --git a/synapse/types.py b/synapse/types.py index 0c9efdfd00..fd7c0ffe7a 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -118,7 +118,7 @@ class EventID(DomainSpecificString): class SyncNextBatchToken( namedtuple("SyncNextBatchToken", ( "stream_token", - "pagination_config", + "pagination_state", )) ): @classmethod @@ -127,10 +127,10 @@ class SyncNextBatchToken( d = json.loads(decode_base64(string)) pa = d.get("pa", None) if pa: - pa = SyncPaginationConfig.from_dict(pa) + pa = SyncPaginationState.from_dict(pa) return cls( stream_token=StreamToken.from_string(d["t"]), - pagination_config=pa, + pagination_state=pa, ) except: raise SynapseError(400, "Invalid Token") @@ -138,23 +138,24 @@ class SyncNextBatchToken( def to_string(self): return encode_base64(json.dumps({ "t": self.stream_token.to_string(), - "pa": self.pagination_config.to_dict() if self.pagination_config else None, + "pa": self.pagination_state.to_dict() if self.pagination_state else None, })) def replace(self, **kwargs): return self._replace(**kwargs) -class SyncPaginationConfig( - namedtuple("SyncPaginationConfig", ( +class SyncPaginationState( + namedtuple("SyncPaginationState", ( "order", "value", + "limit", )) ): @classmethod def from_dict(cls, d): try: - return cls(d["o"], d["v"]) + return cls(d["o"], d["v"], d["l"]) except: raise SynapseError(400, "Invalid Token") @@ -162,6 +163,7 @@ class SyncPaginationConfig( return { "o": self.order, "v": self.value, + "l": self.limit, } def replace(self, **kwargs):