Track unconverted device list outbound pokes using a position instead (#14516)

When a local device list change is added to
`device_lists_changes_in_room`, the `converted_to_destinations` flag is
set to `FALSE` and the `_handle_new_device_update_async` background
process is started. This background process looks for unconverted rows
in `device_lists_changes_in_room`, copies them to
`device_lists_outbound_pokes` and updates the flag.

To update the `converted_to_destinations` flag, the database performs a
`DELETE` and `INSERT` internally, which fragments the table. To avoid
this, track unconverted rows using a `(stream ID, room ID)` position
instead of the flag.

From now on, the `converted_to_destinations` column indicates rows that
need converting to outbound pokes, but does not indicate whether the
conversion has already taken place.

Closes #14037.

Signed-off-by: Sean Quah <seanq@matrix.org>
pull/14526/head
Sean Quah 2022-11-22 16:46:52 +00:00 committed by GitHub
parent 7eb7460042
commit 9cae44f49e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 158 additions and 49 deletions

1
changelog.d/14516.misc Normal file
View File

@ -0,0 +1 @@
Refactor conversion of device list changes in room to outbound pokes to track unconverted rows using a `(stream ID, room ID)` position instead of updating the `converted_to_destinations` flag on every row.

View File

@ -682,13 +682,33 @@ class DeviceHandler(DeviceWorkerHandler):
hosts_already_sent_to: Set[str] = set() hosts_already_sent_to: Set[str] = set()
try: try:
stream_id, room_id = await self.store.get_device_change_last_converted_pos()
while True: while True:
self._handle_new_device_update_new_data = False self._handle_new_device_update_new_data = False
rows = await self.store.get_uncoverted_outbound_room_pokes() max_stream_id = self.store.get_device_stream_token()
rows = await self.store.get_uncoverted_outbound_room_pokes(
stream_id, room_id
)
if not rows: if not rows:
# If the DB returned nothing then there is nothing left to # If the DB returned nothing then there is nothing left to
# do, *unless* a new device list update happened during the # do, *unless* a new device list update happened during the
# DB query. # DB query.
# Advance `(stream_id, room_id)`.
# `max_stream_id` comes from *before* the query for unconverted
# rows, which means that any unconverted rows must have a larger
# stream ID.
if max_stream_id > stream_id:
stream_id, room_id = max_stream_id, ""
await self.store.set_device_change_last_converted_pos(
stream_id, room_id
)
else:
assert max_stream_id == stream_id
# Avoid moving `room_id` backwards.
pass
if self._handle_new_device_update_new_data: if self._handle_new_device_update_new_data:
continue continue
else: else:
@ -718,7 +738,6 @@ class DeviceHandler(DeviceWorkerHandler):
user_id=user_id, user_id=user_id,
device_id=device_id, device_id=device_id,
room_id=room_id, room_id=room_id,
stream_id=stream_id,
hosts=hosts, hosts=hosts,
context=opentracing_context, context=opentracing_context,
) )
@ -752,6 +771,12 @@ class DeviceHandler(DeviceWorkerHandler):
hosts_already_sent_to.update(hosts) hosts_already_sent_to.update(hosts)
current_stream_id = stream_id current_stream_id = stream_id
# Advance `(stream_id, room_id)`.
_, _, room_id, stream_id, _ = rows[-1]
await self.store.set_device_change_last_converted_pos(
stream_id, room_id
)
finally: finally:
self._handle_new_device_update_is_processing = False self._handle_new_device_update_is_processing = False
@ -834,7 +859,6 @@ class DeviceHandler(DeviceWorkerHandler):
user_id=user_id, user_id=user_id,
device_id=device_id, device_id=device_id,
room_id=room_id, room_id=room_id,
stream_id=None,
hosts=potentially_changed_hosts, hosts=potentially_changed_hosts,
context=None, context=None,
) )

View File

@ -2075,13 +2075,14 @@ class DatabasePool:
retcols: Collection[str], retcols: Collection[str],
allow_none: bool = False, allow_none: bool = False,
) -> Optional[Dict[str, Any]]: ) -> Optional[Dict[str, Any]]:
select_sql = "SELECT %s FROM %s WHERE %s" % ( select_sql = "SELECT %s FROM %s" % (", ".join(retcols), table)
", ".join(retcols),
table,
" AND ".join("%s = ?" % (k,) for k in keyvalues),
)
if keyvalues:
select_sql += " WHERE %s" % (" AND ".join("%s = ?" % k for k in keyvalues),)
txn.execute(select_sql, list(keyvalues.values())) txn.execute(select_sql, list(keyvalues.values()))
else:
txn.execute(select_sql)
row = txn.fetchone() row = txn.fetchone()
if not row: if not row:

View File

@ -2008,27 +2008,48 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
) )
async def get_uncoverted_outbound_room_pokes( async def get_uncoverted_outbound_room_pokes(
self, limit: int = 10 self, start_stream_id: int, start_room_id: str, limit: int = 10
) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]: ) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
"""Get device list changes by room that have not yet been handled and """Get device list changes by room that have not yet been handled and
written to `device_lists_outbound_pokes`. written to `device_lists_outbound_pokes`.
Args:
start_stream_id: Together with `start_room_id`, indicates the position after
which to return device list changes.
start_room_id: Together with `start_stream_id`, indicates the position after
which to return device list changes.
limit: The maximum number of device list changes to return.
Returns: Returns:
A list of user ID, device ID, room ID, stream ID and optional opentracing context. A list of user ID, device ID, room ID, stream ID and optional opentracing
context, in order of ascending (stream ID, room ID).
""" """
sql = """ sql = """
SELECT user_id, device_id, room_id, stream_id, opentracing_context SELECT user_id, device_id, room_id, stream_id, opentracing_context
FROM device_lists_changes_in_room FROM device_lists_changes_in_room
WHERE NOT converted_to_destinations WHERE
ORDER BY stream_id (stream_id, room_id) > (?, ?) AND
stream_id <= ? AND
NOT converted_to_destinations
ORDER BY stream_id ASC, room_id ASC
LIMIT ? LIMIT ?
""" """
def get_uncoverted_outbound_room_pokes_txn( def get_uncoverted_outbound_room_pokes_txn(
txn: LoggingTransaction, txn: LoggingTransaction,
) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]: ) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
txn.execute(sql, (limit,)) txn.execute(
sql,
(
start_stream_id,
start_room_id,
# Avoid returning rows if there may be uncommitted device list
# changes with smaller stream IDs.
self._device_list_id_gen.get_current_token(),
limit,
),
)
return [ return [
( (
@ -2050,21 +2071,18 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
user_id: str, user_id: str,
device_id: str, device_id: str,
room_id: str, room_id: str,
stream_id: Optional[int],
hosts: Collection[str], hosts: Collection[str],
context: Optional[Dict[str, str]], context: Optional[Dict[str, str]],
) -> None: ) -> None:
"""Queue the device update to be sent to the given set of hosts, """Queue the device update to be sent to the given set of hosts,
calculated from the room ID. calculated from the room ID.
Marks the associated row in `device_lists_changes_in_room` as handled,
if `stream_id` is provided.
""" """
if not hosts:
return
def add_device_list_outbound_pokes_txn( def add_device_list_outbound_pokes_txn(
txn: LoggingTransaction, stream_ids: List[int] txn: LoggingTransaction, stream_ids: List[int]
) -> None: ) -> None:
if hosts:
self._add_device_outbound_poke_to_stream_txn( self._add_device_outbound_poke_to_stream_txn(
txn, txn,
user_id=user_id, user_id=user_id,
@ -2074,27 +2092,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
context=context, context=context,
) )
if stream_id:
self.db_pool.simple_update_txn(
txn,
table="device_lists_changes_in_room",
keyvalues={
"user_id": user_id,
"device_id": device_id,
"stream_id": stream_id,
"room_id": room_id,
},
updatevalues={"converted_to_destinations": True},
)
if not hosts:
# If there are no hosts then we don't try and generate stream IDs.
return await self.db_pool.runInteraction(
"add_device_list_outbound_pokes",
add_device_list_outbound_pokes_txn,
[],
)
async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids: async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids:
return await self.db_pool.runInteraction( return await self.db_pool.runInteraction(
"add_device_list_outbound_pokes", "add_device_list_outbound_pokes",
@ -2156,3 +2153,37 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
"get_pending_remote_device_list_updates_for_room", "get_pending_remote_device_list_updates_for_room",
get_pending_remote_device_list_updates_for_room_txn, get_pending_remote_device_list_updates_for_room_txn,
) )
async def get_device_change_last_converted_pos(self) -> Tuple[int, str]:
"""
Get the position of the last row in `device_list_changes_in_room` that has been
converted to `device_lists_outbound_pokes`.
Rows with a strictly greater position where `converted_to_destinations` is
`FALSE` have not been converted.
"""
row = await self.db_pool.simple_select_one(
table="device_lists_changes_converted_stream_position",
keyvalues={},
retcols=["stream_id", "room_id"],
desc="get_device_change_last_converted_pos",
)
return row["stream_id"], row["room_id"]
async def set_device_change_last_converted_pos(
self,
stream_id: int,
room_id: str,
) -> None:
"""
Set the position of the last row in `device_list_changes_in_room` that has been
converted to `device_lists_outbound_pokes`.
"""
await self.db_pool.simple_update_one(
table="device_lists_changes_converted_stream_position",
keyvalues={},
updatevalues={"stream_id": stream_id, "room_id": room_id},
desc="set_device_change_last_converted_pos",
)

View File

@ -0,0 +1,53 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Prior to this schema delta, we tracked the set of unconverted rows in
-- `device_lists_changes_in_room` using the `converted_to_destinations` flag. When rows
-- were converted to `device_lists_outbound_pokes`, the `converted_to_destinations` flag
-- would be set.
--
-- After this schema delta, the `converted_to_destinations` is still populated like
-- before, but the set of unconverted rows is determined by the `stream_id` in the new
-- `device_lists_changes_converted_stream_position` table.
--
-- If rolled back, Synapse will re-send all device list changes that happened since the
-- schema delta.
CREATE TABLE IF NOT EXISTS device_lists_changes_converted_stream_position(
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
-- The (stream id, room id) of the last row in `device_lists_changes_in_room` that
-- has been converted to `device_lists_outbound_pokes`. Rows with a strictly larger
-- (stream id, room id) where `converted_to_destinations` is `FALSE` have not been
-- converted.
stream_id BIGINT NOT NULL,
-- `room_id` may be an empty string, which compares less than all valid room IDs.
room_id TEXT NOT NULL,
CHECK (Lock='X')
);
INSERT INTO device_lists_changes_converted_stream_position (stream_id, room_id) VALUES (
(
SELECT COALESCE(
-- The last converted stream id is the smallest unconverted stream id minus
-- one.
MIN(stream_id) - 1,
-- If there is no unconverted stream id, the last converted stream id is the
-- largest stream id.
-- Otherwise, pick 1, since stream ids start at 2.
(SELECT COALESCE(MAX(stream_id), 1) FROM device_lists_changes_in_room)
) FROM device_lists_changes_in_room WHERE NOT converted_to_destinations
),
''
);

View File

@ -28,7 +28,7 @@ class DeviceStoreTestCase(HomeserverTestCase):
""" """
for device_id in device_ids: for device_id in device_ids:
stream_id = self.get_success( self.get_success(
self.store.add_device_change_to_streams( self.store.add_device_change_to_streams(
user_id, [device_id], ["!some:room"] user_id, [device_id], ["!some:room"]
) )
@ -39,7 +39,6 @@ class DeviceStoreTestCase(HomeserverTestCase):
user_id=user_id, user_id=user_id,
device_id=device_id, device_id=device_id,
room_id="!some:room", room_id="!some:room",
stream_id=stream_id,
hosts=[host], hosts=[host],
context={}, context={},
) )