Add multi-user device resync in handler

pull/14716/head
Olivier Wilkinson (reivilibre) 2022-12-20 14:32:52 +00:00
parent 7d2261f922
commit 55f46d499b
1 changed files with 58 additions and 0 deletions

View File

@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging
from http import HTTPStatus
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Any, Any,
@ -33,6 +34,7 @@ from synapse.api.errors import (
Codes, Codes,
FederationDeniedError, FederationDeniedError,
HttpResponseException, HttpResponseException,
InvalidAPICallError,
RequestSendFailed, RequestSendFailed,
SynapseError, SynapseError,
) )
@ -45,6 +47,7 @@ from synapse.types import (
JsonDict, JsonDict,
StreamKeyType, StreamKeyType,
StreamToken, StreamToken,
UserID,
get_domain_from_id, get_domain_from_id,
get_verify_key_from_cross_signing_key, get_verify_key_from_cross_signing_key,
) )
@ -893,12 +896,41 @@ class DeviceListWorkerUpdater:
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
from synapse.replication.http.devices import ( from synapse.replication.http.devices import (
ReplicationMultiUserDevicesResyncRestServlet,
ReplicationUserDevicesResyncRestServlet, ReplicationUserDevicesResyncRestServlet,
) )
self._user_device_resync_client = ( self._user_device_resync_client = (
ReplicationUserDevicesResyncRestServlet.make_client(hs) ReplicationUserDevicesResyncRestServlet.make_client(hs)
) )
self._multi_user_device_resync_client = (
ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
)
async def multi_user_device_resync(
self, user_ids: List[str], mark_failed_as_stale: bool = True
) -> Dict[str, Optional[JsonDict]]:
"""
Like `user_device_resync` but operates on multiple users **from the same origin**
at once.
Returns:
Dict from User ID to the same Dict as `user_device_resync`.
"""
# TODO(BUG): mark_failed_as_stale is not sent.
try:
return await self._multi_user_device_resync_client(user_ids=user_ids)
except SynapseError as err:
if not (
err.code == HTTPStatus.NOT_FOUND and err.errcode == Codes.UNRECOGNIZED
):
raise
# Fall back to single requests
result: Dict[str, Optional[JsonDict]] = {}
for user_id in user_ids:
result[user_id] = await self._user_device_resync_client(user_id=user_id)
return result
async def user_device_resync( async def user_device_resync(
self, user_id: str, mark_failed_as_stale: bool = True self, user_id: str, mark_failed_as_stale: bool = True
@ -914,6 +946,7 @@ class DeviceListWorkerUpdater:
request: request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
""" """
# TODO(BUG): mark_failed_as_stale is not sent.
return await self._user_device_resync_client(user_id=user_id) return await self._user_device_resync_client(user_id=user_id)
@ -1160,6 +1193,31 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
# Allow future calls to retry resyncinc out of sync device lists. # Allow future calls to retry resyncinc out of sync device lists.
self._resync_retry_in_progress = False self._resync_retry_in_progress = False
async def multi_user_device_resync(
self, user_ids: List[str], mark_failed_as_stale: bool = True
) -> Dict[str, Optional[JsonDict]]:
"""
Like `user_device_resync` but operates on multiple users **from the same origin**
at once.
Returns:
Dict from User ID to the same Dict as `user_device_resync`.
"""
if not user_ids:
return {}
origins = {UserID.from_string(user_id).domain for user_id in user_ids}
if len(origins) != 1:
raise InvalidAPICallError(f"Only one origin permitted, got {origins!r}")
result = {}
# TODO(Perf): Actually batch these up
for user_id in user_ids:
result[user_id] = await self.user_device_resync(user_id)
return result
async def user_device_resync( async def user_device_resync(
self, user_id: str, mark_failed_as_stale: bool = True self, user_id: str, mark_failed_as_stale: bool = True
) -> Optional[JsonDict]: ) -> Optional[JsonDict]: