diff --git a/changelog.d/7117.bugfix b/changelog.d/7117.bugfix new file mode 100644 index 0000000000..1896d7ad49 --- /dev/null +++ b/changelog.d/7117.bugfix @@ -0,0 +1 @@ +Fix a bug which meant that groups updates were not correctly replicated between workers. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index cdc078cf11..136babe6ce 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -65,12 +65,23 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.replication.tcp.streams._base import ( +from synapse.replication.tcp.streams import ( + AccountDataStream, DeviceListsStream, + GroupServerStream, + PresenceStream, + PushersStream, + PushRulesStream, ReceiptsStream, + TagAccountDataStream, ToDeviceStream, + TypingStream, +) +from synapse.replication.tcp.streams.events import ( + EventsStream, + EventsStreamEventRow, + EventsStreamRow, ) -from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.client.v1 import events from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet @@ -626,7 +637,7 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler): if self.send_handler: self.send_handler.process_replication_rows(stream_name, token, rows) - if stream_name == "events": + if stream_name == EventsStream.NAME: # We shouldn't get multiple rows per token for events stream, so # we don't need to optimise this for multiple rows. for row in rows: @@ -649,44 +660,44 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler): ) await self.pusher_pool.on_new_notifications(token, token) - elif stream_name == "push_rules": + elif stream_name == PushRulesStream.NAME: self.notifier.on_new_event( "push_rules_key", token, users=[row.user_id for row in rows] ) - elif stream_name in ("account_data", "tag_account_data"): + elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME): self.notifier.on_new_event( "account_data_key", token, users=[row.user_id for row in rows] ) - elif stream_name == "receipts": + elif stream_name == ReceiptsStream.NAME: self.notifier.on_new_event( "receipt_key", token, rooms=[row.room_id for row in rows] ) await self.pusher_pool.on_new_receipts( token, token, {row.room_id for row in rows} ) - elif stream_name == "typing": + elif stream_name == TypingStream.NAME: self.typing_handler.process_replication_rows(token, rows) self.notifier.on_new_event( "typing_key", token, rooms=[row.room_id for row in rows] ) - elif stream_name == "to_device": + elif stream_name == ToDeviceStream.NAME: entities = [row.entity for row in rows if row.entity.startswith("@")] if entities: self.notifier.on_new_event("to_device_key", token, users=entities) - elif stream_name == "device_lists": + elif stream_name == DeviceListsStream.NAME: all_room_ids = set() for row in rows: if row.entity.startswith("@"): room_ids = await self.store.get_rooms_for_user(row.entity) all_room_ids.update(room_ids) self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) - elif stream_name == "presence": + elif stream_name == PresenceStream.NAME: await self.presence_handler.process_replication_rows(token, rows) - elif stream_name == "receipts": + elif stream_name == GroupServerStream.NAME: self.notifier.on_new_event( "groups_key", token, users=[row.user_id for row in rows] ) - elif stream_name == "pushers": + elif stream_name == PushersStream.NAME: for row in rows: if row.deleted: self.stop_pusher(row.user_id, row.app_id, row.pushkey) diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index 5f52264e84..29199f5b46 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -24,27 +24,61 @@ Each stream is defined by the following information: current_token: The function that returns the current token for the stream update_function: The function that returns a list of updates between two tokens """ - -from . import _base, events, federation +from synapse.replication.tcp.streams._base import ( + AccountDataStream, + BackfillStream, + CachesStream, + DeviceListsStream, + GroupServerStream, + PresenceStream, + PublicRoomsStream, + PushersStream, + PushRulesStream, + ReceiptsStream, + TagAccountDataStream, + ToDeviceStream, + TypingStream, + UserSignatureStream, +) +from synapse.replication.tcp.streams.events import EventsStream +from synapse.replication.tcp.streams.federation import FederationStream STREAMS_MAP = { stream.NAME: stream for stream in ( - events.EventsStream, - _base.BackfillStream, - _base.PresenceStream, - _base.TypingStream, - _base.ReceiptsStream, - _base.PushRulesStream, - _base.PushersStream, - _base.CachesStream, - _base.PublicRoomsStream, - _base.DeviceListsStream, - _base.ToDeviceStream, - federation.FederationStream, - _base.TagAccountDataStream, - _base.AccountDataStream, - _base.GroupServerStream, - _base.UserSignatureStream, + EventsStream, + BackfillStream, + PresenceStream, + TypingStream, + ReceiptsStream, + PushRulesStream, + PushersStream, + CachesStream, + PublicRoomsStream, + DeviceListsStream, + ToDeviceStream, + FederationStream, + TagAccountDataStream, + AccountDataStream, + GroupServerStream, + UserSignatureStream, ) } + +__all__ = [ + "STREAMS_MAP", + "BackfillStream", + "PresenceStream", + "TypingStream", + "ReceiptsStream", + "PushRulesStream", + "PushersStream", + "CachesStream", + "PublicRoomsStream", + "DeviceListsStream", + "ToDeviceStream", + "TagAccountDataStream", + "AccountDataStream", + "GroupServerStream", + "UserSignatureStream", +]