Speed up lazy loading (#3827)

* speed up room summaries by pulling their data from room_memberships rather than room state
* disable LL for incr syncs, and log incr sync stats  (#3840)
pull/3844/merge
Matthew Hodgson 2018-09-12 00:50:39 +01:00 committed by GitHub
parent 037a06e8f0
commit b041115415
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 159 additions and 30 deletions

1
changelog.d/3827.misc Normal file
View File

@ -0,0 +1 @@
speed up lazy loading by 2-3x

1
changelog.d/3840.misc Normal file
View File

@ -0,0 +1 @@
Disable lazy loading for incremental syncs for now

View File

@ -24,6 +24,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventTypes, Membership
from synapse.push.clientformat import format_push_rules_for_user from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.roommember import MemberSummary
from synapse.types import RoomStreamToken from synapse.types import RoomStreamToken
from synapse.util.async_helpers import concurrently_execute from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.expiringcache import ExpiringCache
@ -525,6 +526,8 @@ class SyncHandler(object):
A deferred dict describing the room summary A deferred dict describing the room summary
""" """
# FIXME: we could/should get this from room_stats when matthew/stats lands
# FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305 # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
last_events, _ = yield self.store.get_recent_event_ids_for_room( last_events, _ = yield self.store.get_recent_event_ids_for_room(
room_id, end_token=now_token.room_key, limit=1, room_id, end_token=now_token.room_key, limit=1,
@ -537,44 +540,54 @@ class SyncHandler(object):
last_event = last_events[-1] last_event = last_events[-1]
state_ids = yield self.store.get_state_ids_for_event( state_ids = yield self.store.get_state_ids_for_event(
last_event.event_id, [ last_event.event_id, [
(EventTypes.Member, None),
(EventTypes.Name, ''), (EventTypes.Name, ''),
(EventTypes.CanonicalAlias, ''), (EventTypes.CanonicalAlias, ''),
] ]
) )
member_ids = { # this is heavily cached, thus: fast.
state_key: event_id details = yield self.store.get_room_summary(room_id)
for (t, state_key), event_id in iteritems(state_ids)
if t == EventTypes.Member
}
name_id = state_ids.get((EventTypes.Name, '')) name_id = state_ids.get((EventTypes.Name, ''))
canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, '')) canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
summary = {} summary = {}
empty_ms = MemberSummary([], 0)
# FIXME: it feels very heavy to load up every single membership event
# just to calculate the counts.
member_events = yield self.store.get_events(member_ids.values())
joined_user_ids = []
invited_user_ids = []
for ev in member_events.values():
if ev.content.get("membership") == Membership.JOIN:
joined_user_ids.append(ev.state_key)
elif ev.content.get("membership") == Membership.INVITE:
invited_user_ids.append(ev.state_key)
# TODO: only send these when they change. # TODO: only send these when they change.
summary["m.joined_member_count"] = len(joined_user_ids) summary["m.joined_member_count"] = (
summary["m.invited_member_count"] = len(invited_user_ids) details.get(Membership.JOIN, empty_ms).count
)
summary["m.invited_member_count"] = (
details.get(Membership.INVITE, empty_ms).count
)
if name_id or canonical_alias_id: if name_id or canonical_alias_id:
defer.returnValue(summary) defer.returnValue(summary)
# FIXME: order by stream ordering, not alphabetic joined_user_ids = [
r[0] for r in details.get(Membership.JOIN, empty_ms).members
]
invited_user_ids = [
r[0] for r in details.get(Membership.INVITE, empty_ms).members
]
gone_user_ids = (
[r[0] for r in details.get(Membership.LEAVE, empty_ms).members] +
[r[0] for r in details.get(Membership.BAN, empty_ms).members]
)
# FIXME: only build up a member_ids list for our heroes
member_ids = {}
for membership in (
Membership.JOIN,
Membership.INVITE,
Membership.LEAVE,
Membership.BAN
):
for user_id, event_id in details.get(membership, empty_ms).members:
member_ids[user_id] = event_id
# FIXME: order by stream ordering rather than as returned by SQL
me = sync_config.user.to_string() me = sync_config.user.to_string()
if (joined_user_ids or invited_user_ids): if (joined_user_ids or invited_user_ids):
summary['m.heroes'] = sorted( summary['m.heroes'] = sorted(
@ -586,7 +599,11 @@ class SyncHandler(object):
)[0:5] )[0:5]
else: else:
summary['m.heroes'] = sorted( summary['m.heroes'] = sorted(
[user_id for user_id in member_ids.keys() if user_id != me] [
user_id
for user_id in gone_user_ids
if user_id != me
]
)[0:5] )[0:5]
if not sync_config.filter_collection.lazy_load_members(): if not sync_config.filter_collection.lazy_load_members():
@ -719,6 +736,26 @@ class SyncHandler(object):
lazy_load_members=lazy_load_members, lazy_load_members=lazy_load_members,
) )
elif batch.limited: elif batch.limited:
state_at_timeline_start = yield self.store.get_state_ids_for_event(
batch.events[0].event_id, types=types,
filtered_types=filtered_types,
)
# for now, we disable LL for gappy syncs - see
# https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
# N.B. this slows down incr syncs as we are now processing way
# more state in the server than if we were LLing.
#
# We still have to filter timeline_start to LL entries (above) in order
# for _calculate_state's LL logic to work, as we have to include LL
# members for timeline senders in case they weren't loaded in the initial
# sync. We do this by (counterintuitively) by filtering timeline_start
# members to just be ones which were timeline senders, which then ensures
# all of the rest get included in the state block (if we need to know
# about them).
types = None
filtered_types = None
state_at_previous_sync = yield self.get_state_at( state_at_previous_sync = yield self.get_state_at(
room_id, stream_position=since_token, types=types, room_id, stream_position=since_token, types=types,
filtered_types=filtered_types, filtered_types=filtered_types,
@ -729,24 +766,21 @@ class SyncHandler(object):
filtered_types=filtered_types, filtered_types=filtered_types,
) )
state_at_timeline_start = yield self.store.get_state_ids_for_event(
batch.events[0].event_id, types=types,
filtered_types=filtered_types,
)
state_ids = _calculate_state( state_ids = _calculate_state(
timeline_contains=timeline_state, timeline_contains=timeline_state,
timeline_start=state_at_timeline_start, timeline_start=state_at_timeline_start,
previous=state_at_previous_sync, previous=state_at_previous_sync,
current=current_state_ids, current=current_state_ids,
# we have to include LL members in case LL initial sync missed them
lazy_load_members=lazy_load_members, lazy_load_members=lazy_load_members,
) )
else: else:
state_ids = {} state_ids = {}
if lazy_load_members: if lazy_load_members:
if types: if types:
# We're returning an incremental sync, with no "gap" since # We're returning an incremental sync, with no
# the previous sync, so normally there would be no state to return # "gap" since the previous sync, so normally there would be
# no state to return.
# But we're lazy-loading, so the client might need some more # But we're lazy-loading, so the client might need some more
# member events to understand the events in this timeline. # member events to understand the events in this timeline.
# So we fish out all the member events corresponding to the # So we fish out all the member events corresponding to the
@ -1616,10 +1650,24 @@ class SyncHandler(object):
) )
summary = {} summary = {}
# we include a summary in room responses when we're lazy loading
# members (as the client otherwise doesn't have enough info to form
# the name itself).
if ( if (
sync_config.filter_collection.lazy_load_members() and sync_config.filter_collection.lazy_load_members() and
( (
# we recalulate the summary:
# if there are membership changes in the timeline, or
# if membership has changed during a gappy sync, or
# if this is an initial sync.
any(ev.type == EventTypes.Member for ev in batch.events) or any(ev.type == EventTypes.Member for ev in batch.events) or
(
# XXX: this may include false positives in the form of LL
# members which have snuck into state
batch.limited and
any(t == EventTypes.Member for (t, k) in state)
) or
since_token is None since_token is None
) )
): ):
@ -1649,6 +1697,16 @@ class SyncHandler(object):
unread_notifications["highlight_count"] = notifs["highlight_count"] unread_notifications["highlight_count"] = notifs["highlight_count"]
sync_result_builder.joined.append(room_sync) sync_result_builder.joined.append(room_sync)
if batch.limited:
user_id = sync_result_builder.sync_config.user.to_string()
logger.info(
"Incremental syncing room %s for user %s with %d state events" % (
room_id,
user_id,
len(state),
)
)
elif room_builder.rtype == "archived": elif room_builder.rtype == "archived":
room_sync = ArchivedSyncResult( room_sync = ArchivedSyncResult(
room_id=room_id, room_id=room_id,

View File

@ -929,6 +929,10 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
txn, self.get_users_in_room, (room_id,) txn, self.get_users_in_room, (room_id,)
) )
self._invalidate_cache_and_stream(
txn, self.get_room_summary, (room_id,)
)
self._invalidate_cache_and_stream( self._invalidate_cache_and_stream(
txn, self.get_current_state_ids, (room_id,) txn, self.get_current_state_ids, (room_id,)
) )

View File

@ -51,6 +51,12 @@ ProfileInfo = namedtuple(
"ProfileInfo", ("avatar_url", "display_name") "ProfileInfo", ("avatar_url", "display_name")
) )
# "members" points to a truncated list of (user_id, event_id) tuples for users of
# a given membership type, suitable for use in calculating heroes for a room.
# "count" points to the total numberr of users of a given membership type.
MemberSummary = namedtuple(
"MemberSummary", ("members", "count")
)
_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update" _MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
@ -82,6 +88,65 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return [to_ascii(r[0]) for r in txn] return [to_ascii(r[0]) for r in txn]
return self.runInteraction("get_users_in_room", f) return self.runInteraction("get_users_in_room", f)
@cached(max_entries=100000)
def get_room_summary(self, room_id):
""" Get the details of a room roughly suitable for use by the room
summary extension to /sync. Useful when lazy loading room members.
Args:
room_id (str): The room ID to query
Returns:
Deferred[dict[str, MemberSummary]:
dict of membership states, pointing to a MemberSummary named tuple.
"""
def _get_room_summary_txn(txn):
# first get counts.
# We do this all in one transaction to keep the cache small.
# FIXME: get rid of this when we have room_stats
sql = """
SELECT count(*), m.membership FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ?
GROUP BY m.membership
"""
txn.execute(sql, (room_id,))
res = {}
for count, membership in txn:
summary = res.setdefault(to_ascii(membership), MemberSummary([], count))
# we order by membership and then fairly arbitrarily by event_id so
# heroes are consistent
sql = """
SELECT m.user_id, m.membership, m.event_id
FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ?
ORDER BY
CASE m.membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
m.event_id ASC
LIMIT ?
"""
# 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
txn.execute(sql, (room_id, Membership.JOIN, Membership.INVITE, 6))
for user_id, membership, event_id in txn:
summary = res[to_ascii(membership)]
# we will always have a summary for this membership type at this
# point given the summary currently contains the counts.
members = summary.members
members.append((to_ascii(user_id), to_ascii(event_id)))
return res
return self.runInteraction("get_room_summary", _get_room_summary_txn)
@cached() @cached()
def get_invited_rooms_for_user(self, user_id): def get_invited_rooms_for_user(self, user_id):
""" Get all the rooms the user is invited to """ Get all the rooms the user is invited to