Faster joins: Fix incompatibility with restricted joins (#14882)
* Avoid clearing out forward extremities when doing a second remote join When joining a restricted room where the local homeserver does not have a user able to issue invites, we perform a second remote join. We want to avoid clearing out forward extremities in this case because the forward extremities we have are up to date and clearing out forward extremities creates a window in which the room can get bricked if Synapse crashes. Signed-off-by: Sean Quah <seanq@matrix.org> * Do a full join when doing a second remote join into a full state room We cannot persist a partial state join event into a joined full state room, so we perform a full state join for such rooms instead. As a future optimization, we could always perform a partial state join and compute or retrieve the full state ourselves if necessary. Signed-off-by: Sean Quah <seanq@matrix.org> * Add lock around partial state flag for rooms Signed-off-by: Sean Quah <seanq@matrix.org> * Preserve partial state info when doing a second partial state join Signed-off-by: Sean Quah <seanq@matrix.org> * Add newsfile * Add a TODO(faster_joins) marker Signed-off-by: Sean Quah <seanq@matrix.org>pull/14896/head
parent
f075f6ae2b
commit
d329a566df
|
@ -0,0 +1 @@
|
||||||
|
Faster joins: Fix incompatibility with joins into restricted rooms where no local users have the ability to invite.
|
|
@ -1157,6 +1157,11 @@ class FederationClient(FederationBase):
|
||||||
"members_omitted was set, but no servers were listed in the room"
|
"members_omitted was set, but no servers were listed in the room"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if response.members_omitted and not partial_state:
|
||||||
|
raise InvalidResponseError(
|
||||||
|
"members_omitted was set, but we asked for full state"
|
||||||
|
)
|
||||||
|
|
||||||
return SendJoinResult(
|
return SendJoinResult(
|
||||||
event=event,
|
event=event,
|
||||||
state=signed_state,
|
state=signed_state,
|
||||||
|
|
|
@ -48,7 +48,6 @@ from synapse.api.errors import (
|
||||||
FederationError,
|
FederationError,
|
||||||
FederationPullAttemptBackoffError,
|
FederationPullAttemptBackoffError,
|
||||||
HttpResponseException,
|
HttpResponseException,
|
||||||
LimitExceededError,
|
|
||||||
NotFoundError,
|
NotFoundError,
|
||||||
RequestSendFailed,
|
RequestSendFailed,
|
||||||
SynapseError,
|
SynapseError,
|
||||||
|
@ -182,6 +181,12 @@ class FederationHandler:
|
||||||
self._partial_state_syncs_maybe_needing_restart: Dict[
|
self._partial_state_syncs_maybe_needing_restart: Dict[
|
||||||
str, Tuple[Optional[str], Collection[str]]
|
str, Tuple[Optional[str], Collection[str]]
|
||||||
] = {}
|
] = {}
|
||||||
|
# A lock guarding the partial state flag for rooms.
|
||||||
|
# When the lock is held for a given room, no other concurrent code may
|
||||||
|
# partial state or un-partial state the room.
|
||||||
|
self._is_partial_state_room_linearizer = Linearizer(
|
||||||
|
name="_is_partial_state_room_linearizer"
|
||||||
|
)
|
||||||
|
|
||||||
# if this is the main process, fire off a background process to resume
|
# if this is the main process, fire off a background process to resume
|
||||||
# any partial-state-resync operations which were in flight when we
|
# any partial-state-resync operations which were in flight when we
|
||||||
|
@ -599,7 +604,23 @@ class FederationHandler:
|
||||||
|
|
||||||
self._federation_event_handler.room_queues[room_id] = []
|
self._federation_event_handler.room_queues[room_id] = []
|
||||||
|
|
||||||
await self._clean_room_for_join(room_id)
|
is_host_joined = await self.store.is_host_joined(room_id, self.server_name)
|
||||||
|
|
||||||
|
if not is_host_joined:
|
||||||
|
# We may have old forward extremities lying around if the homeserver left
|
||||||
|
# the room completely in the past. Clear them out.
|
||||||
|
#
|
||||||
|
# Note that this check-then-clear is subject to races where
|
||||||
|
# * the homeserver is in the room and stops being in the room just after
|
||||||
|
# the check. We won't reset the forward extremities, but that's okay,
|
||||||
|
# since they will be almost up to date.
|
||||||
|
# * the homeserver is not in the room and starts being in the room just
|
||||||
|
# after the check. This can't happen, since `RoomMemberHandler` has a
|
||||||
|
# linearizer lock which prevents concurrent remote joins into the same
|
||||||
|
# room.
|
||||||
|
# In short, the races either have an acceptable outcome or should be
|
||||||
|
# impossible.
|
||||||
|
await self._clean_room_for_join(room_id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Try the host we successfully got a response to /make_join/
|
# Try the host we successfully got a response to /make_join/
|
||||||
|
@ -611,92 +632,116 @@ class FederationHandler:
|
||||||
except ValueError:
|
except ValueError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
ret = await self.federation_client.send_join(
|
async with self._is_partial_state_room_linearizer.queue(room_id):
|
||||||
host_list, event, room_version_obj
|
already_partial_state_room = await self.store.is_partial_state_room(
|
||||||
)
|
room_id
|
||||||
|
)
|
||||||
|
|
||||||
event = ret.event
|
ret = await self.federation_client.send_join(
|
||||||
origin = ret.origin
|
host_list,
|
||||||
state = ret.state
|
event,
|
||||||
auth_chain = ret.auth_chain
|
room_version_obj,
|
||||||
auth_chain.sort(key=lambda e: e.depth)
|
# Perform a full join when we are already in the room and it is a
|
||||||
|
# full state room, since we are not allowed to persist a partial
|
||||||
|
# state join event in a full state room. In the future, we could
|
||||||
|
# optimize this by always performing a partial state join and
|
||||||
|
# computing the state ourselves or retrieving it from the remote
|
||||||
|
# homeserver if necessary.
|
||||||
|
#
|
||||||
|
# There's a race where we leave the room, then perform a full join
|
||||||
|
# anyway. This should end up being fast anyway, since we would
|
||||||
|
# already have the full room state and auth chain persisted.
|
||||||
|
partial_state=not is_host_joined or already_partial_state_room,
|
||||||
|
)
|
||||||
|
|
||||||
logger.debug("do_invite_join auth_chain: %s", auth_chain)
|
event = ret.event
|
||||||
logger.debug("do_invite_join state: %s", state)
|
origin = ret.origin
|
||||||
|
state = ret.state
|
||||||
|
auth_chain = ret.auth_chain
|
||||||
|
auth_chain.sort(key=lambda e: e.depth)
|
||||||
|
|
||||||
logger.debug("do_invite_join event: %s", event)
|
logger.debug("do_invite_join auth_chain: %s", auth_chain)
|
||||||
|
logger.debug("do_invite_join state: %s", state)
|
||||||
|
|
||||||
# if this is the first time we've joined this room, it's time to add
|
logger.debug("do_invite_join event: %s", event)
|
||||||
# a row to `rooms` with the correct room version. If there's already a
|
|
||||||
# row there, we should override it, since it may have been populated
|
|
||||||
# based on an invite request which lied about the room version.
|
|
||||||
#
|
|
||||||
# federation_client.send_join has already checked that the room
|
|
||||||
# version in the received create event is the same as room_version_obj,
|
|
||||||
# so we can rely on it now.
|
|
||||||
#
|
|
||||||
await self.store.upsert_room_on_join(
|
|
||||||
room_id=room_id,
|
|
||||||
room_version=room_version_obj,
|
|
||||||
state_events=state,
|
|
||||||
)
|
|
||||||
|
|
||||||
if ret.partial_state:
|
# if this is the first time we've joined this room, it's time to add
|
||||||
# Mark the room as having partial state.
|
# a row to `rooms` with the correct room version. If there's already a
|
||||||
# The background process is responsible for unmarking this flag,
|
# row there, we should override it, since it may have been populated
|
||||||
# even if the join fails.
|
# based on an invite request which lied about the room version.
|
||||||
await self.store.store_partial_state_room(
|
#
|
||||||
|
# federation_client.send_join has already checked that the room
|
||||||
|
# version in the received create event is the same as room_version_obj,
|
||||||
|
# so we can rely on it now.
|
||||||
|
#
|
||||||
|
await self.store.upsert_room_on_join(
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
servers=ret.servers_in_room,
|
room_version=room_version_obj,
|
||||||
device_lists_stream_id=self.store.get_device_stream_token(),
|
state_events=state,
|
||||||
joined_via=origin,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
if ret.partial_state and not already_partial_state_room:
|
||||||
max_stream_id = (
|
# Mark the room as having partial state.
|
||||||
await self._federation_event_handler.process_remote_join(
|
# The background process is responsible for unmarking this flag,
|
||||||
origin,
|
# even if the join fails.
|
||||||
room_id,
|
# TODO(faster_joins):
|
||||||
auth_chain,
|
# We may want to reset the partial state info if it's from an
|
||||||
state,
|
# old, failed partial state join.
|
||||||
event,
|
# https://github.com/matrix-org/synapse/issues/13000
|
||||||
room_version_obj,
|
await self.store.store_partial_state_room(
|
||||||
partial_state=ret.partial_state,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
except PartialStateConflictError as e:
|
|
||||||
# The homeserver was already in the room and it is no longer partial
|
|
||||||
# stated. We ought to be doing a local join instead. Turn the error into
|
|
||||||
# a 429, as a hint to the client to try again.
|
|
||||||
# TODO(faster_joins): `_should_perform_remote_join` suggests that we may
|
|
||||||
# do a remote join for restricted rooms even if we have full state.
|
|
||||||
logger.error(
|
|
||||||
"Room %s was un-partial stated while processing remote join.",
|
|
||||||
room_id,
|
|
||||||
)
|
|
||||||
raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
|
|
||||||
else:
|
|
||||||
# Record the join event id for future use (when we finish the full
|
|
||||||
# join). We have to do this after persisting the event to keep foreign
|
|
||||||
# key constraints intact.
|
|
||||||
if ret.partial_state:
|
|
||||||
await self.store.write_partial_state_rooms_join_event_id(
|
|
||||||
room_id, event.event_id
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
# Always kick off the background process that asynchronously fetches
|
|
||||||
# state for the room.
|
|
||||||
# If the join failed, the background process is responsible for
|
|
||||||
# cleaning up — including unmarking the room as a partial state room.
|
|
||||||
if ret.partial_state:
|
|
||||||
# Kick off the process of asynchronously fetching the state for this
|
|
||||||
# room.
|
|
||||||
self._start_partial_state_room_sync(
|
|
||||||
initial_destination=origin,
|
|
||||||
other_destinations=ret.servers_in_room,
|
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
|
servers=ret.servers_in_room,
|
||||||
|
device_lists_stream_id=self.store.get_device_stream_token(),
|
||||||
|
joined_via=origin,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
max_stream_id = (
|
||||||
|
await self._federation_event_handler.process_remote_join(
|
||||||
|
origin,
|
||||||
|
room_id,
|
||||||
|
auth_chain,
|
||||||
|
state,
|
||||||
|
event,
|
||||||
|
room_version_obj,
|
||||||
|
partial_state=ret.partial_state,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except PartialStateConflictError:
|
||||||
|
# This should be impossible, since we hold the lock on the room's
|
||||||
|
# partial statedness.
|
||||||
|
logger.error(
|
||||||
|
"Room %s was un-partial stated while processing remote join.",
|
||||||
|
room_id,
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
# Record the join event id for future use (when we finish the full
|
||||||
|
# join). We have to do this after persisting the event to keep
|
||||||
|
# foreign key constraints intact.
|
||||||
|
if ret.partial_state and not already_partial_state_room:
|
||||||
|
# TODO(faster_joins):
|
||||||
|
# We may want to reset the partial state info if it's from
|
||||||
|
# an old, failed partial state join.
|
||||||
|
# https://github.com/matrix-org/synapse/issues/13000
|
||||||
|
await self.store.write_partial_state_rooms_join_event_id(
|
||||||
|
room_id, event.event_id
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
# Always kick off the background process that asynchronously fetches
|
||||||
|
# state for the room.
|
||||||
|
# If the join failed, the background process is responsible for
|
||||||
|
# cleaning up — including unmarking the room as a partial state
|
||||||
|
# room.
|
||||||
|
if ret.partial_state:
|
||||||
|
# Kick off the process of asynchronously fetching the state for
|
||||||
|
# this room.
|
||||||
|
self._start_partial_state_room_sync(
|
||||||
|
initial_destination=origin,
|
||||||
|
other_destinations=ret.servers_in_room,
|
||||||
|
room_id=room_id,
|
||||||
|
)
|
||||||
|
|
||||||
# We wait here until this instance has seen the events come down
|
# We wait here until this instance has seen the events come down
|
||||||
# replication (if we're using replication) as the below uses caches.
|
# replication (if we're using replication) as the below uses caches.
|
||||||
await self._replication.wait_for_stream_position(
|
await self._replication.wait_for_stream_position(
|
||||||
|
@ -1778,6 +1823,12 @@ class FederationHandler:
|
||||||
`initial_destination` is unavailable
|
`initial_destination` is unavailable
|
||||||
room_id: room to be resynced
|
room_id: room to be resynced
|
||||||
"""
|
"""
|
||||||
|
# Assume that we run on the main process for now.
|
||||||
|
# TODO(faster_joins,multiple workers)
|
||||||
|
# When moving the sync to workers, we need to ensure that
|
||||||
|
# * `_start_partial_state_room_sync` still prevents duplicate resyncs
|
||||||
|
# * `_is_partial_state_room_linearizer` correctly guards partial state flags
|
||||||
|
# for rooms between the workers doing remote joins and resync.
|
||||||
assert not self.config.worker.worker_app
|
assert not self.config.worker.worker_app
|
||||||
|
|
||||||
# TODO(faster_joins): do we need to lock to avoid races? What happens if other
|
# TODO(faster_joins): do we need to lock to avoid races? What happens if other
|
||||||
|
@ -1815,8 +1866,10 @@ class FederationHandler:
|
||||||
logger.info("Handling any pending device list updates")
|
logger.info("Handling any pending device list updates")
|
||||||
await self._device_handler.handle_room_un_partial_stated(room_id)
|
await self._device_handler.handle_room_un_partial_stated(room_id)
|
||||||
|
|
||||||
logger.info("Clearing partial-state flag for %s", room_id)
|
async with self._is_partial_state_room_linearizer.queue(room_id):
|
||||||
success = await self.store.clear_partial_state_room(room_id)
|
logger.info("Clearing partial-state flag for %s", room_id)
|
||||||
|
success = await self.store.clear_partial_state_room(room_id)
|
||||||
|
|
||||||
if success:
|
if success:
|
||||||
logger.info("State resync complete for %s", room_id)
|
logger.info("State resync complete for %s", room_id)
|
||||||
self._storage_controllers.state.notify_room_un_partial_stated(
|
self._storage_controllers.state.notify_room_un_partial_stated(
|
||||||
|
|
Loading…
Reference in New Issue