Improve performance of delete device messages query (#16492)
parent
8841db4d27
commit
bcff01b406
|
@ -0,0 +1 @@
|
||||||
|
Improve performance of delete device messages query, cf issue [16479](https://github.com/matrix-org/synapse/issues/16479).
|
|
@ -592,6 +592,8 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||||
)
|
)
|
||||||
|
|
||||||
# Delete device messages asynchronously and in batches using the task scheduler
|
# Delete device messages asynchronously and in batches using the task scheduler
|
||||||
|
# We specify an upper stream id to avoid deleting non delivered messages
|
||||||
|
# if an user re-uses a device ID.
|
||||||
await self._task_scheduler.schedule_task(
|
await self._task_scheduler.schedule_task(
|
||||||
DELETE_DEVICE_MSGS_TASK_NAME,
|
DELETE_DEVICE_MSGS_TASK_NAME,
|
||||||
resource_id=device_id,
|
resource_id=device_id,
|
||||||
|
|
|
@ -478,18 +478,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||||
log_kv({"message": "No changes in cache since last check"})
|
log_kv({"message": "No changes in cache since last check"})
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
ROW_ID_NAME = self.database_engine.row_id_name
|
|
||||||
|
|
||||||
def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
|
def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
|
||||||
limit_statement = "" if limit is None else f"LIMIT {limit}"
|
limit_statement = "" if limit is None else f"LIMIT {limit}"
|
||||||
sql = f"""
|
sql = f"""
|
||||||
DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN (
|
DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= (
|
||||||
SELECT {ROW_ID_NAME} FROM device_inbox
|
SELECT MAX(stream_id) FROM (
|
||||||
|
SELECT stream_id FROM device_inbox
|
||||||
WHERE user_id = ? AND device_id = ? AND stream_id <= ?
|
WHERE user_id = ? AND device_id = ? AND stream_id <= ?
|
||||||
|
ORDER BY stream_id
|
||||||
{limit_statement}
|
{limit_statement}
|
||||||
|
) AS q1
|
||||||
)
|
)
|
||||||
"""
|
"""
|
||||||
txn.execute(sql, (user_id, device_id, up_to_stream_id))
|
txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id))
|
||||||
return txn.rowcount
|
return txn.rowcount
|
||||||
|
|
||||||
count = await self.db_pool.runInteraction(
|
count = await self.db_pool.runInteraction(
|
||||||
|
|
Loading…
Reference in New Issue