Speed up deleting to-device messages task (#16318)
							parent
							
								
									39dc5de399
								
							
						
					
					
						commit
						e9e2904eb2
					
				|  | @ -0,0 +1 @@ | |||
| Speed up task to delete to-device messages. | ||||
|  | @ -388,7 +388,8 @@ class DeviceWorkerHandler: | |||
|             "Trying handling device list state for partial join: not supported on workers." | ||||
|         ) | ||||
| 
 | ||||
|     DEVICE_MSGS_DELETE_BATCH_LIMIT = 100 | ||||
|     DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000 | ||||
|     DEVICE_MSGS_DELETE_SLEEP_MS = 1000 | ||||
| 
 | ||||
|     async def _delete_device_messages( | ||||
|         self, | ||||
|  | @ -400,19 +401,19 @@ class DeviceWorkerHandler: | |||
|         device_id = task.params["device_id"] | ||||
|         up_to_stream_id = task.params["up_to_stream_id"] | ||||
| 
 | ||||
|         res = await self.store.delete_messages_for_device( | ||||
|             user_id=user_id, | ||||
|             device_id=device_id, | ||||
|             up_to_stream_id=up_to_stream_id, | ||||
|             limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, | ||||
|         ) | ||||
|         # Delete the messages in batches to avoid too much DB load. | ||||
|         while True: | ||||
|             res = await self.store.delete_messages_for_device( | ||||
|                 user_id=user_id, | ||||
|                 device_id=device_id, | ||||
|                 up_to_stream_id=up_to_stream_id, | ||||
|                 limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, | ||||
|             ) | ||||
| 
 | ||||
|         if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: | ||||
|             return TaskStatus.COMPLETE, None, None | ||||
|         else: | ||||
|             # There is probably still device messages to be deleted, let's keep the task active and it will be run | ||||
|             # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running). | ||||
|             return TaskStatus.ACTIVE, None, None | ||||
|             if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: | ||||
|                 return TaskStatus.COMPLETE, None, None | ||||
| 
 | ||||
|             await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0) | ||||
| 
 | ||||
| 
 | ||||
| class DeviceHandler(DeviceWorkerHandler): | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Erik Johnston
						Erik Johnston