diff --git a/changelog.d/11806.bugfix b/changelog.d/11806.bugfix new file mode 100644 index 0000000000..e4beaf103b --- /dev/null +++ b/changelog.d/11806.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse 1.40.0 that caused Synapse to fail to process incoming federation traffic after handling a large amount of events in a v1 room. \ No newline at end of file diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 270b30800b..a556f17dac 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1432,7 +1432,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas if room_version.event_format == EventFormatVersions.V1: for prev_event_tuple in prev_events: - if not isinstance(prev_event_tuple, list) or len(prev_events) != 2: + if ( + not isinstance(prev_event_tuple, list) + or len(prev_event_tuple) != 2 + ): logger.info("Invalid prev_events for %s", event_id) break diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 632bbc9de7..2bc89512f8 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -12,10 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Tuple, Union + import attr from parameterized import parameterized -from synapse.api.room_versions import RoomVersions +from synapse.api.room_versions import ( + KNOWN_ROOM_VERSIONS, + EventFormatVersions, + RoomVersion, +) from synapse.events import _EventInternalMetadata from synapse.util import json_encoder @@ -506,11 +512,21 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): ) self.assertSetEqual(difference, set()) - def test_prune_inbound_federation_queue(self): - "Test that pruning of inbound federation queues work" + @parameterized.expand( + [(room_version,) for room_version in KNOWN_ROOM_VERSIONS.values()] + ) + def test_prune_inbound_federation_queue(self, room_version: RoomVersion): + """Test that pruning of inbound federation queues work""" room_id = "some_room_id" + def prev_event_format(prev_event_id: str) -> Union[Tuple[str, dict], str]: + """Account for differences in prev_events format across room versions""" + if room_version.event_format == EventFormatVersions.V1: + return prev_event_id, {} + + return prev_event_id + # Insert a bunch of events that all reference the previous one. self.get_success( self.store.db_pool.simple_insert_many( @@ -529,7 +545,9 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): room_id, 0, f"$fake_event_id_{i + 1}", - json_encoder.encode({"prev_events": [f"$fake_event_id_{i}"]}), + json_encoder.encode( + {"prev_events": [prev_event_format(f"$fake_event_id_{i}")]} + ), "{}", ) for i in range(500) @@ -541,12 +559,12 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): # Calling prune once should return True, i.e. a prune happen. The second # time it shouldn't. pruned = self.get_success( - self.store.prune_staged_events_in_room(room_id, RoomVersions.V6) + self.store.prune_staged_events_in_room(room_id, room_version) ) self.assertTrue(pruned) pruned = self.get_success( - self.store.prune_staged_events_in_room(room_id, RoomVersions.V6) + self.store.prune_staged_events_in_room(room_id, room_version) ) self.assertFalse(pruned)