Correctly figure out which rooms we've sent down
parent
32d476d4f1
commit
64df836067
|
@ -284,18 +284,29 @@ class SyncHandler(BaseHandler):
|
||||||
room_pagination_config = {
|
room_pagination_config = {
|
||||||
"l": pagination_limit,
|
"l": pagination_limit,
|
||||||
"o": 0,
|
"o": 0,
|
||||||
"t": now_token,
|
"t": now_token.to_string(),
|
||||||
}
|
}
|
||||||
|
|
||||||
room_to_last_ts = yield self._get_room_timestamps_at_token(
|
room_to_last_ts = yield self._get_room_timestamps_at_token(
|
||||||
room_ids=[e.room_id for e in room_list if e.membership == Membership.JOIN],
|
room_ids=[
|
||||||
|
e.room_id for e in room_list if e.membership == Membership.JOIN
|
||||||
|
],
|
||||||
token=now_token,
|
token=now_token,
|
||||||
)
|
)
|
||||||
|
|
||||||
joined_rooms_list = frozenset([
|
if room_to_last_ts:
|
||||||
room_id for room_id, _f in
|
sorted_list = sorted(
|
||||||
sorted(room_to_last_ts.items(), key=lambda item: -item[1])
|
room_to_last_ts.items(),
|
||||||
][:pagination_limit])
|
key=lambda item: -item[1]
|
||||||
|
)[:pagination_limit]
|
||||||
|
|
||||||
|
_, bottom_ts = sorted_list[-1]
|
||||||
|
room_pagination_config["ts"] = bottom_ts
|
||||||
|
|
||||||
|
joined_rooms_list = frozenset(room_id for room_id, _f in sorted_list)
|
||||||
|
else:
|
||||||
|
room_pagination_config = {}
|
||||||
|
joined_rooms_list = frozenset()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _generate_room_entry(event):
|
def _generate_room_entry(event):
|
||||||
|
@ -500,6 +511,18 @@ class SyncHandler(BaseHandler):
|
||||||
account_data_by_room, full_state=True, leave_token=leave_token,
|
account_data_by_room, full_state=True, leave_token=leave_token,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _get_rooms_that_need_full_state(self, room_ids, since_token, pa_ts):
|
||||||
|
start_ts = yield self._get_room_timestamps_at_token(room_ids, since_token)
|
||||||
|
|
||||||
|
missing_list = frozenset(
|
||||||
|
room_id for room_id, ts in
|
||||||
|
sorted(start_ts.items(), key=lambda item: -item[1])
|
||||||
|
if ts < pa_ts
|
||||||
|
)
|
||||||
|
|
||||||
|
defer.returnValue(missing_list)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def incremental_sync_with_gap(self, sync_config, batch_token):
|
def incremental_sync_with_gap(self, sync_config, batch_token):
|
||||||
""" Get the incremental delta needed to bring the client up to
|
""" Get the incremental delta needed to bring the client up to
|
||||||
|
@ -508,6 +531,7 @@ class SyncHandler(BaseHandler):
|
||||||
A Deferred SyncResult.
|
A Deferred SyncResult.
|
||||||
"""
|
"""
|
||||||
since_token = batch_token.stream_token
|
since_token = batch_token.stream_token
|
||||||
|
room_pagination_config = batch_token.pagination_config
|
||||||
|
|
||||||
now_token = yield self.event_sources.get_current_token()
|
now_token = yield self.event_sources.get_current_token()
|
||||||
|
|
||||||
|
@ -638,18 +662,30 @@ class SyncHandler(BaseHandler):
|
||||||
limit=timeline_limit + 1,
|
limit=timeline_limit + 1,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
p_room_token = room_pagination_config.get("t", None)
|
||||||
|
if p_room_token:
|
||||||
|
needing_full_state = yield self._get_rooms_that_need_full_state(
|
||||||
|
joined_room_ids,
|
||||||
|
since_token,
|
||||||
|
room_pagination_config.get("ts", 0),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
needing_full_state = set()
|
||||||
|
|
||||||
joined = []
|
joined = []
|
||||||
# We loop through all room ids, even if there are no new events, in case
|
# We loop through all room ids, even if there are no new events, in case
|
||||||
# there are non room events taht we need to notify about.
|
# there are non room events taht we need to notify about.
|
||||||
for room_id in joined_room_ids:
|
for room_id in joined_room_ids:
|
||||||
room_entry = room_to_events.get(room_id, None)
|
room_entry = room_to_events.get(room_id, None)
|
||||||
|
|
||||||
|
need_full_state = room_id in needing_full_state
|
||||||
|
|
||||||
if room_entry:
|
if room_entry:
|
||||||
events, start_key = room_entry
|
events, start_key = room_entry
|
||||||
|
|
||||||
prev_batch_token = now_token.copy_and_replace("room_key", start_key)
|
prev_batch_token = now_token.copy_and_replace("room_key", start_key)
|
||||||
|
|
||||||
newly_joined_room = room_id in newly_joined_rooms
|
newly_joined_room = (room_id in newly_joined_rooms) or need_full_state
|
||||||
full_state = newly_joined_room
|
full_state = newly_joined_room
|
||||||
|
|
||||||
batch = yield self.load_filtered_recents(
|
batch = yield self.load_filtered_recents(
|
||||||
|
@ -659,6 +695,9 @@ class SyncHandler(BaseHandler):
|
||||||
newly_joined_room=newly_joined_room,
|
newly_joined_room=newly_joined_room,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
if need_full_state:
|
||||||
|
continue
|
||||||
|
|
||||||
batch = TimelineBatch(
|
batch = TimelineBatch(
|
||||||
events=[],
|
events=[],
|
||||||
prev_batch=since_token,
|
prev_batch=since_token,
|
||||||
|
|
Loading…
Reference in New Issue