Increase DB/CPU perf of `_is_server_still_joined` check. (#6936)
* Increase DB/CPU perf of `_is_server_still_joined` check. For rooms with large amount of state a single user leaving could cause us to go and load a lot of membership events and then pull out membership state in a large number of batches. * Newsfile * Update synapse/storage/persist_events.py Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> * Fix adding if too soon * Update docstring * Review comments * Woops typo Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>pull/6951/head
parent
5e4a438556
commit
0d0bc35792
|
@ -0,0 +1 @@
|
||||||
|
Increase DB/CPU perf of `_is_server_still_joined` check.
|
|
@ -868,6 +868,37 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||||
desc="get_membership_from_event_ids",
|
desc="get_membership_from_event_ids",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def is_local_host_in_room_ignoring_users(
|
||||||
|
self, room_id: str, ignore_users: Collection[str]
|
||||||
|
) -> bool:
|
||||||
|
"""Check if there are any local users, excluding those in the given
|
||||||
|
list, in the room.
|
||||||
|
"""
|
||||||
|
|
||||||
|
clause, args = make_in_list_sql_clause(
|
||||||
|
self.database_engine, "user_id", ignore_users
|
||||||
|
)
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
SELECT 1 FROM local_current_membership
|
||||||
|
WHERE
|
||||||
|
room_id = ? AND membership = ?
|
||||||
|
AND NOT (%s)
|
||||||
|
LIMIT 1
|
||||||
|
""" % (
|
||||||
|
clause,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _is_local_host_in_room_ignoring_users_txn(txn):
|
||||||
|
txn.execute(sql, (room_id, Membership.JOIN, *args))
|
||||||
|
|
||||||
|
return bool(txn.fetchone())
|
||||||
|
|
||||||
|
return await self.db.runInteraction(
|
||||||
|
"is_local_host_in_room_ignoring_users",
|
||||||
|
_is_local_host_in_room_ignoring_users_txn,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class RoomMemberBackgroundUpdateStore(SQLBaseStore):
|
class RoomMemberBackgroundUpdateStore(SQLBaseStore):
|
||||||
def __init__(self, database: Database, db_conn, hs):
|
def __init__(self, database: Database, db_conn, hs):
|
||||||
|
|
|
@ -727,6 +727,7 @@ class EventsPersistenceStorage(object):
|
||||||
|
|
||||||
# Check if any of the given events are a local join that appear in the
|
# Check if any of the given events are a local join that appear in the
|
||||||
# current state
|
# current state
|
||||||
|
events_to_check = [] # Event IDs that aren't an event we're persisting
|
||||||
for (typ, state_key), event_id in delta.to_insert.items():
|
for (typ, state_key), event_id in delta.to_insert.items():
|
||||||
if typ != EventTypes.Member or not self.is_mine_id(state_key):
|
if typ != EventTypes.Member or not self.is_mine_id(state_key):
|
||||||
continue
|
continue
|
||||||
|
@ -736,8 +737,33 @@ class EventsPersistenceStorage(object):
|
||||||
if event.membership == Membership.JOIN:
|
if event.membership == Membership.JOIN:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# There's been a change of membership but we don't have a local join
|
# The event is not in `ev_ctx_rm`, so we need to pull it out of
|
||||||
# event in the new events, so we need to check the full state.
|
# the DB.
|
||||||
|
events_to_check.append(event_id)
|
||||||
|
|
||||||
|
# Check if any of the changes that we don't have events for are joins.
|
||||||
|
if events_to_check:
|
||||||
|
rows = await self.main_store.get_membership_from_event_ids(events_to_check)
|
||||||
|
is_still_joined = any(row["membership"] == Membership.JOIN for row in rows)
|
||||||
|
if is_still_joined:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# None of the new state events are local joins, so we check the database
|
||||||
|
# to see if there are any other local users in the room. We ignore users
|
||||||
|
# whose state has changed as we've already their new state above.
|
||||||
|
users_to_ignore = [
|
||||||
|
state_key
|
||||||
|
for _, state_key in itertools.chain(delta.to_insert, delta.to_delete)
|
||||||
|
if self.is_mine_id(state_key)
|
||||||
|
]
|
||||||
|
|
||||||
|
if await self.main_store.is_local_host_in_room_ignoring_users(
|
||||||
|
room_id, users_to_ignore
|
||||||
|
):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# The server will leave the room, so we go and find out which remote
|
||||||
|
# users will still be joined when we leave.
|
||||||
if current_state is None:
|
if current_state is None:
|
||||||
current_state = await self.main_store.get_current_state_ids(room_id)
|
current_state = await self.main_store.get_current_state_ids(room_id)
|
||||||
current_state = dict(current_state)
|
current_state = dict(current_state)
|
||||||
|
@ -746,19 +772,6 @@ class EventsPersistenceStorage(object):
|
||||||
|
|
||||||
current_state.update(delta.to_insert)
|
current_state.update(delta.to_insert)
|
||||||
|
|
||||||
event_ids = [
|
|
||||||
event_id
|
|
||||||
for (typ, state_key,), event_id in current_state.items()
|
|
||||||
if typ == EventTypes.Member and self.is_mine_id(state_key)
|
|
||||||
]
|
|
||||||
|
|
||||||
rows = await self.main_store.get_membership_from_event_ids(event_ids)
|
|
||||||
is_still_joined = any(row["membership"] == Membership.JOIN for row in rows)
|
|
||||||
if is_still_joined:
|
|
||||||
return True
|
|
||||||
|
|
||||||
# The server will leave the room, so we go and find out which remote
|
|
||||||
# users will still be joined when we leave.
|
|
||||||
remote_event_ids = [
|
remote_event_ids = [
|
||||||
event_id
|
event_id
|
||||||
for (typ, state_key,), event_id in current_state.items()
|
for (typ, state_key,), event_id in current_state.items()
|
||||||
|
|
Loading…
Reference in New Issue