Remove useless async job to delete device messages on sync (#16491)
parent
9be4db29f2
commit
eee6474bce
|
@ -0,0 +1 @@
|
||||||
|
Remove useless async job to delete device messages on sync, since we only deliver (and hence delete) up to 100 device messages at a time.
|
|
@ -40,7 +40,6 @@ from synapse.api.filtering import FilterCollection
|
||||||
from synapse.api.presence import UserPresenceState
|
from synapse.api.presence import UserPresenceState
|
||||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
from synapse.handlers.device import DELETE_DEVICE_MSGS_TASK_NAME
|
|
||||||
from synapse.handlers.relations import BundledAggregations
|
from synapse.handlers.relations import BundledAggregations
|
||||||
from synapse.logging import issue9533_logger
|
from synapse.logging import issue9533_logger
|
||||||
from synapse.logging.context import current_context
|
from synapse.logging.context import current_context
|
||||||
|
@ -363,36 +362,15 @@ class SyncHandler:
|
||||||
# (since we now know that the device has received them)
|
# (since we now know that the device has received them)
|
||||||
if since_token is not None:
|
if since_token is not None:
|
||||||
since_stream_id = since_token.to_device_key
|
since_stream_id = since_token.to_device_key
|
||||||
# Fast path: delete a limited number of to-device messages up front.
|
|
||||||
# We do this to avoid the overhead of scheduling a task for every
|
|
||||||
# sync.
|
|
||||||
device_deletion_limit = 100
|
|
||||||
deleted = await self.store.delete_messages_for_device(
|
deleted = await self.store.delete_messages_for_device(
|
||||||
sync_config.user.to_string(),
|
sync_config.user.to_string(),
|
||||||
sync_config.device_id,
|
sync_config.device_id,
|
||||||
since_stream_id,
|
since_stream_id,
|
||||||
limit=device_deletion_limit,
|
|
||||||
)
|
)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Deleted %d to-device messages up to %d", deleted, since_stream_id
|
"Deleted %d to-device messages up to %d", deleted, since_stream_id
|
||||||
)
|
)
|
||||||
|
|
||||||
# If we hit the limit, schedule a background task to delete the rest.
|
|
||||||
if deleted >= device_deletion_limit:
|
|
||||||
await self._task_scheduler.schedule_task(
|
|
||||||
DELETE_DEVICE_MSGS_TASK_NAME,
|
|
||||||
resource_id=sync_config.device_id,
|
|
||||||
params={
|
|
||||||
"user_id": sync_config.user.to_string(),
|
|
||||||
"device_id": sync_config.device_id,
|
|
||||||
"up_to_stream_id": since_stream_id,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
logger.debug(
|
|
||||||
"Deletion of to-device messages up to %d scheduled",
|
|
||||||
since_stream_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
if timeout == 0 or since_token is None or full_state:
|
if timeout == 0 or since_token is None or full_state:
|
||||||
# we are going to return immediately, so don't bother calling
|
# we are going to return immediately, so don't bother calling
|
||||||
# notifier.wait_for_events.
|
# notifier.wait_for_events.
|
||||||
|
|
|
@ -450,7 +450,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||||
user_id: str,
|
user_id: str,
|
||||||
device_id: Optional[str],
|
device_id: Optional[str],
|
||||||
up_to_stream_id: int,
|
up_to_stream_id: int,
|
||||||
limit: int,
|
limit: Optional[int] = None,
|
||||||
) -> int:
|
) -> int:
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
|
@ -481,11 +481,12 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||||
ROW_ID_NAME = self.database_engine.row_id_name
|
ROW_ID_NAME = self.database_engine.row_id_name
|
||||||
|
|
||||||
def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
|
def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
|
||||||
|
limit_statement = "" if limit is None else f"LIMIT {limit}"
|
||||||
sql = f"""
|
sql = f"""
|
||||||
DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN (
|
DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN (
|
||||||
SELECT {ROW_ID_NAME} FROM device_inbox
|
SELECT {ROW_ID_NAME} FROM device_inbox
|
||||||
WHERE user_id = ? AND device_id = ? AND stream_id <= ?
|
WHERE user_id = ? AND device_id = ? AND stream_id <= ?
|
||||||
LIMIT {limit}
|
{limit_statement}
|
||||||
)
|
)
|
||||||
"""
|
"""
|
||||||
txn.execute(sql, (user_id, device_id, up_to_stream_id))
|
txn.execute(sql, (user_id, device_id, up_to_stream_id))
|
||||||
|
|
Loading…
Reference in New Issue