Batch up the DB writes when marking failures
parent
3d7451e04f
commit
712144e768
|
@ -1212,9 +1212,18 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
|
|||
raise InvalidAPICallError(f"Only one origin permitted, got {origins!r}")
|
||||
|
||||
result = {}
|
||||
failed = set()
|
||||
# TODO(Perf): Actually batch these up
|
||||
for user_id in user_ids:
|
||||
result[user_id] = await self.user_device_resync(user_id)
|
||||
user_result, user_failed = await self._user_device_resync_returning_failed(
|
||||
user_id
|
||||
)
|
||||
result[user_id] = user_result
|
||||
if user_failed:
|
||||
failed.add(user_id)
|
||||
|
||||
if mark_failed_as_stale:
|
||||
await self.store.mark_remote_users_device_caches_as_stale(failed)
|
||||
|
||||
return result
|
||||
|
||||
|
@ -1226,7 +1235,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
|
|||
if failed and mark_failed_as_stale:
|
||||
# Mark the remote user's device list as stale so we know we need to retry
|
||||
# it later.
|
||||
await self.store.mark_remote_user_device_cache_as_stale(user_id)
|
||||
await self.store.mark_remote_users_device_caches_as_stale((user_id,))
|
||||
|
||||
return result
|
||||
|
||||
|
|
|
@ -195,7 +195,7 @@ class DeviceMessageHandler:
|
|||
sender_user_id,
|
||||
unknown_devices,
|
||||
)
|
||||
await self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
|
||||
await self.store.mark_remote_users_device_caches_as_stale((sender_user_id,))
|
||||
|
||||
# Immediately attempt a resync in the background
|
||||
run_in_background(self._user_device_resync, user_id=sender_user_id)
|
||||
|
|
|
@ -1423,7 +1423,7 @@ class FederationEventHandler:
|
|||
"""
|
||||
|
||||
try:
|
||||
await self._store.mark_remote_user_device_cache_as_stale(sender)
|
||||
await self._store.mark_remote_users_device_caches_as_stale((sender,))
|
||||
|
||||
# Immediately attempt a resync in the background
|
||||
if self._config.worker.worker_app:
|
||||
|
|
|
@ -54,7 +54,7 @@ from synapse.storage.util.id_generators import (
|
|||
AbstractStreamIdTracker,
|
||||
StreamIdGenerator,
|
||||
)
|
||||
from synapse.types import JsonDict, get_verify_key_from_cross_signing_key
|
||||
from synapse.types import JsonDict, StrCollection, get_verify_key_from_cross_signing_key
|
||||
from synapse.util import json_decoder, json_encoder
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
|
@ -1062,16 +1062,30 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
|||
|
||||
return {row["user_id"] for row in rows}
|
||||
|
||||
async def mark_remote_user_device_cache_as_stale(self, user_id: str) -> None:
|
||||
async def mark_remote_users_device_caches_as_stale(
|
||||
self, user_ids: StrCollection
|
||||
) -> None:
|
||||
"""Records that the server has reason to believe the cache of the devices
|
||||
for the remote users is out of date.
|
||||
"""
|
||||
await self.db_pool.simple_upsert(
|
||||
table="device_lists_remote_resync",
|
||||
keyvalues={"user_id": user_id},
|
||||
values={},
|
||||
insertion_values={"added_ts": self._clock.time_msec()},
|
||||
desc="mark_remote_user_device_cache_as_stale",
|
||||
|
||||
def _mark_remote_users_device_caches_as_stale_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> None:
|
||||
# TODO add insertion_values support to simple_upsert_many and use
|
||||
# that!
|
||||
for user_id in user_ids:
|
||||
self.db_pool.simple_upsert_txn(
|
||||
txn,
|
||||
table="device_lists_remote_resync",
|
||||
keyvalues={"user_id": user_id},
|
||||
values={},
|
||||
insertion_values={"added_ts": self._clock.time_msec()},
|
||||
)
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"mark_remote_users_device_caches_as_stale",
|
||||
_mark_remote_users_device_caches_as_stale_txn,
|
||||
)
|
||||
|
||||
async def mark_remote_user_device_cache_as_valid(self, user_id: str) -> None:
|
||||
|
|
|
@ -77,6 +77,10 @@ JsonMapping = Mapping[str, Any]
|
|||
# A JSON-serialisable object.
|
||||
JsonSerializable = object
|
||||
|
||||
# Collection[str] that does not include str itself; str being a Sequence[str]
|
||||
# is very misleading and results in bugs.
|
||||
StrCollection = Union[Tuple[str, ...], List[str], Set[str]]
|
||||
|
||||
|
||||
# Note that this seems to require inheriting *directly* from Interface in order
|
||||
# for mypy-zope to realize it is an interface.
|
||||
|
|
Loading…
Reference in New Issue