Ensure fed-sender catchup does not block for full state (#15248)
* Reproduce bad scenario in test * Avoid catchup optimisation for partial state roomspull/15262/head
parent
4bb26c95a9
commit
c071cd5a0e
|
@ -0,0 +1 @@
|
||||||
|
Fix a rare bug introduced in Synapse 1.73 where events could remain unsent to other homeservers after a faster-join to a room.
|
|
@ -497,8 +497,8 @@ class PerDestinationQueue:
|
||||||
#
|
#
|
||||||
# Note: `catchup_pdus` will have exactly one PDU per room.
|
# Note: `catchup_pdus` will have exactly one PDU per room.
|
||||||
for pdu in catchup_pdus:
|
for pdu in catchup_pdus:
|
||||||
# The PDU from the DB will be the last PDU in the room from
|
# The PDU from the DB will be the newest PDU in the room from
|
||||||
# *this server* that wasn't sent to the remote. However, other
|
# *this server* that we tried---but were unable---to send to the remote.
|
||||||
# servers may have sent lots of events since then, and we want
|
# servers may have sent lots of events since then, and we want
|
||||||
# to try and tell the remote only about the *latest* events in
|
# to try and tell the remote only about the *latest* events in
|
||||||
# the room. This is so that it doesn't get inundated by events
|
# the room. This is so that it doesn't get inundated by events
|
||||||
|
@ -516,6 +516,11 @@ class PerDestinationQueue:
|
||||||
# If the event is in the extremities, then great! We can just
|
# If the event is in the extremities, then great! We can just
|
||||||
# use that without having to do further checks.
|
# use that without having to do further checks.
|
||||||
room_catchup_pdus = [pdu]
|
room_catchup_pdus = [pdu]
|
||||||
|
elif await self._store.is_partial_state_room(pdu.room_id):
|
||||||
|
# We can't be sure which events the destination should
|
||||||
|
# see using only partial state. Avoid doing so, and just retry
|
||||||
|
# sending our the newest PDU the remote is missing from us.
|
||||||
|
room_catchup_pdus = [pdu]
|
||||||
else:
|
else:
|
||||||
# If not, fetch the extremities and figure out which we can
|
# If not, fetch the extremities and figure out which we can
|
||||||
# send.
|
# send.
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
from typing import Callable, List, Optional, Tuple
|
from typing import Callable, Collection, List, Optional, Tuple
|
||||||
|
from unittest import mock
|
||||||
from unittest.mock import Mock
|
from unittest.mock import Mock
|
||||||
|
|
||||||
from twisted.test.proto_helpers import MemoryReactor
|
from twisted.test.proto_helpers import MemoryReactor
|
||||||
|
@ -500,3 +501,87 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
|
||||||
self.assertEqual(len(sent_pdus), 1)
|
self.assertEqual(len(sent_pdus), 1)
|
||||||
self.assertEqual(sent_pdus[0].event_id, event_2.event_id)
|
self.assertEqual(sent_pdus[0].event_id, event_2.event_id)
|
||||||
self.assertFalse(per_dest_queue._catching_up)
|
self.assertFalse(per_dest_queue._catching_up)
|
||||||
|
|
||||||
|
def test_catch_up_is_not_blocked_by_remote_event_in_partial_state_room(
|
||||||
|
self,
|
||||||
|
) -> None:
|
||||||
|
"""Detects (part of?) https://github.com/matrix-org/synapse/issues/15220."""
|
||||||
|
# ARRANGE:
|
||||||
|
# - a local user (u1)
|
||||||
|
# - a room which contains u1 and two remote users, @u2:host2 and @u3:other
|
||||||
|
# - events in that room such that
|
||||||
|
# - history visibility is restricted
|
||||||
|
# - u1 sent message events e1 and e2
|
||||||
|
# - afterwards, u3 sent a remote event e3
|
||||||
|
# - catchup to begin for host2; last successfully sent event was e1
|
||||||
|
per_dest_queue, sent_pdus = self.make_fake_destination_queue()
|
||||||
|
|
||||||
|
self.register_user("u1", "you the one")
|
||||||
|
u1_token = self.login("u1", "you the one")
|
||||||
|
room = self.helper.create_room_as("u1", tok=u1_token)
|
||||||
|
self.helper.send_state(
|
||||||
|
room_id=room,
|
||||||
|
event_type="m.room.history_visibility",
|
||||||
|
body={"history_visibility": "joined"},
|
||||||
|
tok=u1_token,
|
||||||
|
)
|
||||||
|
self.get_success(
|
||||||
|
event_injection.inject_member_event(self.hs, room, "@u2:host2", "join")
|
||||||
|
)
|
||||||
|
self.get_success(
|
||||||
|
event_injection.inject_member_event(self.hs, room, "@u3:other", "join")
|
||||||
|
)
|
||||||
|
|
||||||
|
# create some events
|
||||||
|
event_id_1 = self.helper.send(room, "hello", tok=u1_token)["event_id"]
|
||||||
|
event_id_2 = self.helper.send(room, "world", tok=u1_token)["event_id"]
|
||||||
|
# pretend that u3 changes their displayname
|
||||||
|
event_id_3 = self.get_success(
|
||||||
|
event_injection.inject_member_event(self.hs, room, "@u3:other", "join")
|
||||||
|
).event_id
|
||||||
|
|
||||||
|
# destination_rooms should already be populated, but let us pretend that we already
|
||||||
|
# sent (successfully) up to and including event id 1
|
||||||
|
event_1 = self.get_success(self.hs.get_datastores().main.get_event(event_id_1))
|
||||||
|
assert event_1.internal_metadata.stream_ordering is not None
|
||||||
|
self.get_success(
|
||||||
|
self.hs.get_datastores().main.set_destination_last_successful_stream_ordering(
|
||||||
|
"host2", event_1.internal_metadata.stream_ordering
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# also fetch event 2 so we can compare its stream ordering to the sender's
|
||||||
|
# last_successful_stream_ordering later
|
||||||
|
event_2 = self.get_success(self.hs.get_datastores().main.get_event(event_id_2))
|
||||||
|
|
||||||
|
# Mock event 3 as having partial state
|
||||||
|
self.get_success(
|
||||||
|
event_injection.mark_event_as_partial_state(self.hs, event_id_3, room)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Fail the test if we block on full state for event 3.
|
||||||
|
async def mock_await_full_state(event_ids: Collection[str]) -> None:
|
||||||
|
if event_id_3 in event_ids:
|
||||||
|
raise AssertionError("Tried to await full state for event_id_3")
|
||||||
|
|
||||||
|
# ACT
|
||||||
|
with mock.patch.object(
|
||||||
|
self.hs.get_storage_controllers().state._partial_state_events_tracker,
|
||||||
|
"await_full_state",
|
||||||
|
mock_await_full_state,
|
||||||
|
):
|
||||||
|
self.get_success(per_dest_queue._catch_up_transmission_loop())
|
||||||
|
|
||||||
|
# ASSERT
|
||||||
|
# We should have:
|
||||||
|
# - not sent event 3: it's not ours, and the room is partial stated
|
||||||
|
# - fallen back to sending event 2: it's the most recent event in the room
|
||||||
|
# we tried to send to host2
|
||||||
|
# - completed catch-up
|
||||||
|
self.assertEqual(len(sent_pdus), 1)
|
||||||
|
self.assertEqual(sent_pdus[0].event_id, event_id_2)
|
||||||
|
self.assertFalse(per_dest_queue._catching_up)
|
||||||
|
self.assertEqual(
|
||||||
|
per_dest_queue._last_successful_stream_ordering,
|
||||||
|
event_2.internal_metadata.stream_ordering,
|
||||||
|
)
|
||||||
|
|
|
@ -102,3 +102,34 @@ async def create_event(
|
||||||
context = await unpersisted_context.persist(event)
|
context = await unpersisted_context.persist(event)
|
||||||
|
|
||||||
return event, context
|
return event, context
|
||||||
|
|
||||||
|
|
||||||
|
async def mark_event_as_partial_state(
|
||||||
|
hs: synapse.server.HomeServer,
|
||||||
|
event_id: str,
|
||||||
|
room_id: str,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
(Falsely) mark an event as having partial state.
|
||||||
|
|
||||||
|
Naughty, but occasionally useful when checking that partial state doesn't
|
||||||
|
block something from happening.
|
||||||
|
|
||||||
|
If the event already has partial state, this insert will fail (event_id is unique
|
||||||
|
in this table).
|
||||||
|
"""
|
||||||
|
store = hs.get_datastores().main
|
||||||
|
await store.db_pool.simple_upsert(
|
||||||
|
table="partial_state_rooms",
|
||||||
|
keyvalues={"room_id": room_id},
|
||||||
|
values={},
|
||||||
|
insertion_values={"room_id": room_id},
|
||||||
|
)
|
||||||
|
|
||||||
|
await store.db_pool.simple_insert(
|
||||||
|
table="partial_state_events",
|
||||||
|
values={
|
||||||
|
"room_id": room_id,
|
||||||
|
"event_id": event_id,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in New Issue