Fix logic for dropping old events in fed queue (#11806)
Co-authored-by: Brendan Abolivier <babolivier@matrix.org> Co-authored-by: Richard van der Hoff <richard@matrix.org>release-v1.51
parent
2d295a4be9
commit
dc671d3ea7
|
@ -0,0 +1 @@
|
||||||
|
Fix a bug introduced in Synapse 1.40.0 that caused Synapse to fail to process incoming federation traffic after handling a large amount of events in a v1 room.
|
|
@ -1432,7 +1432,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||||
|
|
||||||
if room_version.event_format == EventFormatVersions.V1:
|
if room_version.event_format == EventFormatVersions.V1:
|
||||||
for prev_event_tuple in prev_events:
|
for prev_event_tuple in prev_events:
|
||||||
if not isinstance(prev_event_tuple, list) or len(prev_events) != 2:
|
if (
|
||||||
|
not isinstance(prev_event_tuple, list)
|
||||||
|
or len(prev_event_tuple) != 2
|
||||||
|
):
|
||||||
logger.info("Invalid prev_events for %s", event_id)
|
logger.info("Invalid prev_events for %s", event_id)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
|
@ -12,10 +12,16 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
from typing import Tuple, Union
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
from parameterized import parameterized
|
from parameterized import parameterized
|
||||||
|
|
||||||
from synapse.api.room_versions import RoomVersions
|
from synapse.api.room_versions import (
|
||||||
|
KNOWN_ROOM_VERSIONS,
|
||||||
|
EventFormatVersions,
|
||||||
|
RoomVersion,
|
||||||
|
)
|
||||||
from synapse.events import _EventInternalMetadata
|
from synapse.events import _EventInternalMetadata
|
||||||
from synapse.util import json_encoder
|
from synapse.util import json_encoder
|
||||||
|
|
||||||
|
@ -506,11 +512,21 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
self.assertSetEqual(difference, set())
|
self.assertSetEqual(difference, set())
|
||||||
|
|
||||||
def test_prune_inbound_federation_queue(self):
|
@parameterized.expand(
|
||||||
"Test that pruning of inbound federation queues work"
|
[(room_version,) for room_version in KNOWN_ROOM_VERSIONS.values()]
|
||||||
|
)
|
||||||
|
def test_prune_inbound_federation_queue(self, room_version: RoomVersion):
|
||||||
|
"""Test that pruning of inbound federation queues work"""
|
||||||
|
|
||||||
room_id = "some_room_id"
|
room_id = "some_room_id"
|
||||||
|
|
||||||
|
def prev_event_format(prev_event_id: str) -> Union[Tuple[str, dict], str]:
|
||||||
|
"""Account for differences in prev_events format across room versions"""
|
||||||
|
if room_version.event_format == EventFormatVersions.V1:
|
||||||
|
return prev_event_id, {}
|
||||||
|
|
||||||
|
return prev_event_id
|
||||||
|
|
||||||
# Insert a bunch of events that all reference the previous one.
|
# Insert a bunch of events that all reference the previous one.
|
||||||
self.get_success(
|
self.get_success(
|
||||||
self.store.db_pool.simple_insert_many(
|
self.store.db_pool.simple_insert_many(
|
||||||
|
@ -529,7 +545,9 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
|
||||||
room_id,
|
room_id,
|
||||||
0,
|
0,
|
||||||
f"$fake_event_id_{i + 1}",
|
f"$fake_event_id_{i + 1}",
|
||||||
json_encoder.encode({"prev_events": [f"$fake_event_id_{i}"]}),
|
json_encoder.encode(
|
||||||
|
{"prev_events": [prev_event_format(f"$fake_event_id_{i}")]}
|
||||||
|
),
|
||||||
"{}",
|
"{}",
|
||||||
)
|
)
|
||||||
for i in range(500)
|
for i in range(500)
|
||||||
|
@ -541,12 +559,12 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
|
||||||
# Calling prune once should return True, i.e. a prune happen. The second
|
# Calling prune once should return True, i.e. a prune happen. The second
|
||||||
# time it shouldn't.
|
# time it shouldn't.
|
||||||
pruned = self.get_success(
|
pruned = self.get_success(
|
||||||
self.store.prune_staged_events_in_room(room_id, RoomVersions.V6)
|
self.store.prune_staged_events_in_room(room_id, room_version)
|
||||||
)
|
)
|
||||||
self.assertTrue(pruned)
|
self.assertTrue(pruned)
|
||||||
|
|
||||||
pruned = self.get_success(
|
pruned = self.get_success(
|
||||||
self.store.prune_staged_events_in_room(room_id, RoomVersions.V6)
|
self.store.prune_staged_events_in_room(room_id, room_version)
|
||||||
)
|
)
|
||||||
self.assertFalse(pruned)
|
self.assertFalse(pruned)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue