Use get_room_events_stream to get changes to the rooms if the number of changes is small
parent
e3e72b8c5c
commit
e016f4043b
|
@ -216,23 +216,57 @@ class SyncHandler(BaseHandler):
|
||||||
typing_by_room = {event["room_id"]: event for event in typing}
|
typing_by_room = {event["room_id"]: event for event in typing}
|
||||||
logger.debug("Typing %r", typing_by_room)
|
logger.debug("Typing %r", typing_by_room)
|
||||||
|
|
||||||
room_list = yield self.store.get_rooms_for_user_where_membership_is(
|
rm_handler = self.hs.get_handlers().room_member_handler
|
||||||
user_id=sync_config.user.to_string(),
|
room_ids = yield rm_handler.get_rooms_for_user(sync_config.user)
|
||||||
membership_list=[Membership.INVITE, Membership.JOIN]
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO (mjark): Does public mean "published"?
|
# TODO (mjark): Does public mean "published"?
|
||||||
published_rooms = yield self.store.get_rooms(is_public=True)
|
published_rooms = yield self.store.get_rooms(is_public=True)
|
||||||
published_room_ids = set(r["room_id"] for r in published_rooms)
|
published_room_ids = set(r["room_id"] for r in published_rooms)
|
||||||
|
|
||||||
|
room_events, _ = yield self.store.get_room_events_stream(
|
||||||
|
sync_config.user.to_string(),
|
||||||
|
from_key=since_token.room_key,
|
||||||
|
to_key=now_token.room_key,
|
||||||
|
room_id=None,
|
||||||
|
limit=sync_config.limit + 1,
|
||||||
|
)
|
||||||
|
|
||||||
rooms = []
|
rooms = []
|
||||||
for event in room_list:
|
if len(room_events) <= sync_config.limit:
|
||||||
room_sync = yield self.incremental_sync_with_gap_for_room(
|
# There is no gap in any of the rooms. Therefore we can just
|
||||||
event.room_id, sync_config, since_token, now_token,
|
# partition the new events by room and return them.
|
||||||
published_room_ids, typing_by_room
|
events_by_room_id = {}
|
||||||
)
|
for event in room_events:
|
||||||
if room_sync:
|
events_by_room_id.setdefault(event.room_id, []).append(event)
|
||||||
rooms.append(room_sync)
|
|
||||||
|
for room_id in room_ids:
|
||||||
|
recents = events_by_room_id.get(room_id, [])
|
||||||
|
state = [event for event in recents if event.is_state()]
|
||||||
|
if recents:
|
||||||
|
prev_batch = now_token.copy_and_replace(
|
||||||
|
"room_key", recents[0].internal_metadata.before
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
prev_batch = now_token
|
||||||
|
room_sync = RoomSyncResult(
|
||||||
|
room_id=room_id,
|
||||||
|
published=room_id in published_room_ids,
|
||||||
|
events=recents,
|
||||||
|
prev_batch=prev_batch,
|
||||||
|
state=state,
|
||||||
|
limited=False,
|
||||||
|
typing=typing_by_room.get(room_id, None)
|
||||||
|
)
|
||||||
|
if room_sync is not None:
|
||||||
|
rooms.append(room_sync)
|
||||||
|
else:
|
||||||
|
for room_id in room_ids:
|
||||||
|
room_sync = yield self.incremental_sync_with_gap_for_room(
|
||||||
|
room_id, sync_config, since_token, now_token,
|
||||||
|
published_room_ids, typing_by_room
|
||||||
|
)
|
||||||
|
if room_sync:
|
||||||
|
rooms.append(room_sync)
|
||||||
|
|
||||||
defer.returnValue(SyncResult(
|
defer.returnValue(SyncResult(
|
||||||
public_user_data=presence,
|
public_user_data=presence,
|
||||||
|
|
|
@ -181,6 +181,13 @@ class StreamStore(SQLBaseStore):
|
||||||
get_prev_content=True
|
get_prev_content=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
for event, row in zip(ret, rows):
|
||||||
|
stream = row["stream_ordering"]
|
||||||
|
topo = event.depth
|
||||||
|
internal = event.internal_metadata
|
||||||
|
internal.before = str(_StreamToken(topo, stream - 1))
|
||||||
|
internal.after = str(_StreamToken(topo, stream))
|
||||||
|
|
||||||
if rows:
|
if rows:
|
||||||
key = "s%d" % max([r["stream_ordering"] for r in rows])
|
key = "s%d" % max([r["stream_ordering"] for r in rows])
|
||||||
else:
|
else:
|
||||||
|
|
Loading…
Reference in New Issue