From e5c67d04dbe5ed45d659e826a5dfcd5044a4e374 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2020 16:11:35 +0100 Subject: [PATCH] Add option to move event persistence off master (#7517) --- changelog.d/7517.feature | 1 + scripts/synapse_port_db | 3 + synapse/app/generic_worker.py | 53 +++++++- synapse/config/logger.py | 1 + synapse/config/workers.py | 30 ++++- synapse/handlers/federation.py | 16 ++- synapse/handlers/message.py | 12 +- synapse/handlers/presence.py | 6 + synapse/handlers/room.py | 7 ++ synapse/handlers/room_member.py | 39 ++++-- synapse/replication/http/__init__.py | 4 +- synapse/replication/http/_base.py | 3 + synapse/replication/http/membership.py | 40 +++++- synapse/replication/http/presence.py | 116 ++++++++++++++++++ synapse/replication/tcp/handler.py | 10 ++ synapse/rest/admin/rooms.py | 11 +- synapse/storage/data_stores/__init__.py | 6 +- synapse/storage/data_stores/main/devices.py | 30 +++-- synapse/storage/data_stores/main/events.py | 34 ++++- .../storage/data_stores/main/events_worker.py | 2 +- .../storage/data_stores/main/roommember.py | 25 ---- synapse/storage/persist_events.py | 6 + 22 files changed, 382 insertions(+), 73 deletions(-) create mode 100644 changelog.d/7517.feature create mode 100644 synapse/replication/http/presence.py diff --git a/changelog.d/7517.feature b/changelog.d/7517.feature new file mode 100644 index 0000000000..6062f0061c --- /dev/null +++ b/changelog.d/7517.feature @@ -0,0 +1 @@ +Add option to move event persistence off master. diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index acd9ac4b75..9a0fbc61d8 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -196,6 +196,9 @@ class MockHomeserver: def get_reactor(self): return reactor + def get_instance_name(self): + return "master" + class Porter(object): def __init__(self, **kwargs): diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index a37520000a..2906b93f6a 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -39,7 +39,11 @@ from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.federation import send_queue from synapse.federation.transport.server import TransportLayerServer -from synapse.handlers.presence import BasePresenceHandler, get_interested_parties +from synapse.handlers.presence import ( + BasePresenceHandler, + PresenceState, + get_interested_parties, +) from synapse.http.server import JsonResource, OptionsResource from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseSite @@ -47,6 +51,10 @@ from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource +from synapse.replication.http.presence import ( + ReplicationBumpPresenceActiveTime, + ReplicationPresenceSetState, +) from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore @@ -247,6 +255,9 @@ class GenericWorkerPresence(BasePresenceHandler): # but we haven't notified the master of that yet self.users_going_offline = {} + self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs) + self._set_state_client = ReplicationPresenceSetState.make_client(hs) + self._send_stop_syncing_loop = self.clock.looping_call( self.send_stop_syncing, UPDATE_SYNCING_USERS_MS ) @@ -304,10 +315,6 @@ class GenericWorkerPresence(BasePresenceHandler): self.users_going_offline.pop(user_id, None) self.send_user_sync(user_id, False, last_sync_ms) - def set_state(self, user, state, ignore_status_msg=False): - # TODO Hows this supposed to work? - return defer.succeed(None) - async def user_syncing( self, user_id: str, affect_presence: bool ) -> ContextManager[None]: @@ -386,6 +393,42 @@ class GenericWorkerPresence(BasePresenceHandler): if count > 0 ] + async def set_state(self, target_user, state, ignore_status_msg=False): + """Set the presence state of the user. + """ + presence = state["presence"] + + valid_presence = ( + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + PresenceState.OFFLINE, + ) + if presence not in valid_presence: + raise SynapseError(400, "Invalid presence state") + + user_id = target_user.to_string() + + # If presence is disabled, no-op + if not self.hs.config.use_presence: + return + + # Proxy request to master + await self._set_state_client( + user_id=user_id, state=state, ignore_status_msg=ignore_status_msg + ) + + async def bump_presence_active_time(self, user): + """We've seen the user do something that indicates they're interacting + with the app. + """ + # If presence is disabled, no-op + if not self.hs.config.use_presence: + return + + # Proxy request to master + user_id = user.to_string() + await self._bump_active_client(user_id=user_id) + class GenericWorkerTyping(object): def __init__(self, hs): diff --git a/synapse/config/logger.py b/synapse/config/logger.py index a25c70e928..49f6c32beb 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -257,5 +257,6 @@ def setup_logging( logging.warning("***** STARTING SERVER *****") logging.warning("Server %s version %s", sys.argv[0], get_version_string(synapse)) logging.info("Server hostname: %s", config.server_name) + logging.info("Instance name: %s", hs.get_instance_name()) return logger diff --git a/synapse/config/workers.py b/synapse/config/workers.py index c80c338584..ed06b91a54 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -15,7 +15,7 @@ import attr -from ._base import Config +from ._base import Config, ConfigError @attr.s @@ -27,6 +27,17 @@ class InstanceLocationConfig: port = attr.ib(type=int) +@attr.s +class WriterLocations: + """Specifies the instances that write various streams. + + Attributes: + events: The instance that writes to the event and backfill streams. + """ + + events = attr.ib(default="master", type=str) + + class WorkerConfig(Config): """The workers are processes run separately to the main synapse process. They have their own pid_file and listener configuration. They use the @@ -83,11 +94,26 @@ class WorkerConfig(Config): bind_addresses.append("") # A map from instance name to host/port of their HTTP replication endpoint. - instance_map = config.get("instance_map", {}) or {} + instance_map = config.get("instance_map") or {} self.instance_map = { name: InstanceLocationConfig(**c) for name, c in instance_map.items() } + # Map from type of streams to source, c.f. WriterLocations. + writers = config.get("stream_writers") or {} + self.writers = WriterLocations(**writers) + + # Check that the configured writer for events also appears in + # `instance_map`. + if ( + self.writers.events != "master" + and self.writers.events not in self.instance_map + ): + raise ConfigError( + "Instance %r is configured to write events but does not appear in `instance_map` config." + % (self.writers.events,) + ) + def read_arguments(self, args): # We support a bunch of command line arguments that override options in # the config. A lot of these options have a worker_* prefix when running diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index e354c803db..75ec90d267 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -126,11 +126,10 @@ class FederationHandler(BaseHandler): self._server_notices_mxid = hs.config.server_notices_mxid self.config = hs.config self.http_client = hs.get_simple_http_client() + self._instance_name = hs.get_instance_name() self._replication = hs.get_replication_data_handler() - self._send_events_to_master = ReplicationFederationSendEventsRestServlet.make_client( - hs - ) + self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs) self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client( hs ) @@ -1243,6 +1242,10 @@ class FederationHandler(BaseHandler): content: The event content to use for the join event. """ + # TODO: We should be able to call this on workers, but the upgrading of + # room stuff after join currently doesn't work on workers. + assert self.config.worker.worker_app is None + logger.debug("Joining %s to %s", joinee, room_id) origin, event, room_version_obj = await self._make_and_verify_event( @@ -1314,7 +1317,7 @@ class FederationHandler(BaseHandler): # # TODO: Currently the events stream is written to from master await self._replication.wait_for_stream_position( - "master", "events", max_stream_id + self.config.worker.writers.events, "events", max_stream_id ) # Check whether this room is the result of an upgrade of a room we already know @@ -2854,8 +2857,9 @@ class FederationHandler(BaseHandler): backfilled: Whether these events are a result of backfilling or not """ - if self.config.worker_app: - result = await self._send_events_to_master( + if self.config.worker.writers.events != self._instance_name: + result = await self._send_events( + instance_name=self.config.worker.writers.events, store=self.store, event_and_contexts=event_and_contexts, backfilled=backfilled, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index f445e2aa2a..ea25f0515a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -366,10 +366,11 @@ class EventCreationHandler(object): self.notifier = hs.get_notifier() self.config = hs.config self.require_membership_for_aliases = hs.config.require_membership_for_aliases + self._instance_name = hs.get_instance_name() self.room_invite_state_types = self.hs.config.room_invite_state_types - self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs) + self.send_event = ReplicationSendEventRestServlet.make_client(hs) # This is only used to get at ratelimit function, and maybe_kick_guest_users self.base_handler = BaseHandler(hs) @@ -835,8 +836,9 @@ class EventCreationHandler(object): success = False try: # If we're a worker we need to hit out to the master. - if self.config.worker_app: - result = await self.send_event_to_master( + if self.config.worker.writers.events != self._instance_name: + result = await self.send_event( + instance_name=self.config.worker.writers.events, event_id=event.event_id, store=self.store, requester=requester, @@ -902,9 +904,9 @@ class EventCreationHandler(object): """Called when we have fully built the event, have already calculated the push actions for the event, and checked auth. - This should only be run on master. + This should only be run on the instance in charge of persisting events. """ - assert not self.config.worker_app + assert self.config.worker.writers.events == self._instance_name if ratelimit: # We check if this is a room admin redacting an event so that we diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9ea11c0754..3594f3b00f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -193,6 +193,12 @@ class BasePresenceHandler(abc.ABC): ) -> None: """Set the presence state of the user. """ + @abc.abstractmethod + async def bump_presence_active_time(self, user: UserID): + """We've seen the user do something that indicates they're interacting + with the app. + """ + class PresenceHandler(BasePresenceHandler): def __init__(self, hs: "synapse.server.HomeServer"): diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 2698a129ca..61db3ccc43 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -89,6 +89,8 @@ class RoomCreationHandler(BaseHandler): self.room_member_handler = hs.get_room_member_handler() self.config = hs.config + self._replication = hs.get_replication_data_handler() + # linearizer to stop two upgrades happening at once self._upgrade_linearizer = Linearizer("room_upgrade_linearizer") @@ -752,6 +754,11 @@ class RoomCreationHandler(BaseHandler): if room_alias: result["room_alias"] = room_alias.to_string() + # Always wait for room creation to progate before returning + await self._replication.wait_for_stream_position( + self.hs.config.worker.writers.events, "events", last_stream_id + ) + return result, last_stream_id async def _send_events_for_new_room( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 691b6705b2..0f7af982f0 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -26,6 +26,9 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError from synapse.events import EventBase from synapse.events.snapshot import EventContext +from synapse.replication.http.membership import ( + ReplicationLocallyRejectInviteRestServlet, +) from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room, user_left_room @@ -44,11 +47,6 @@ class RoomMemberHandler(object): __metaclass__ = abc.ABCMeta def __init__(self, hs): - """ - - Args: - hs (synapse.server.HomeServer): - """ self.hs = hs self.store = hs.get_datastore() self.auth = hs.get_auth() @@ -71,6 +69,17 @@ class RoomMemberHandler(object): self._enable_lookup = hs.config.enable_3pid_lookup self.allow_per_room_profiles = self.config.allow_per_room_profiles + self._event_stream_writer_instance = hs.config.worker.writers.events + self._is_on_event_persistence_instance = ( + self._event_stream_writer_instance == hs.get_instance_name() + ) + if self._is_on_event_persistence_instance: + self.persist_event_storage = hs.get_storage().persistence + else: + self._locally_reject_client = ReplicationLocallyRejectInviteRestServlet.make_client( + hs + ) + # This is only used to get at ratelimit function, and # maybe_kick_guest_users. It's fine there are multiple of these as # it doesn't store state. @@ -121,6 +130,22 @@ class RoomMemberHandler(object): """ raise NotImplementedError() + async def locally_reject_invite(self, user_id: str, room_id: str) -> int: + """Mark the invite has having been rejected even though we failed to + create a leave event for it. + """ + if self._is_on_event_persistence_instance: + return await self.persist_event_storage.locally_reject_invite( + user_id, room_id + ) + else: + result = await self._locally_reject_client( + instance_name=self._event_stream_writer_instance, + user_id=user_id, + room_id=room_id, + ) + return result["stream_id"] + @abc.abstractmethod async def _user_joined_room(self, target: UserID, room_id: str) -> None: """Notifies distributor on master process that the user has joined the @@ -1015,9 +1040,7 @@ class RoomMemberMasterHandler(RoomMemberHandler): # logger.warning("Failed to reject invite: %s", e) - stream_id = await self.store.locally_reject_invite( - target.to_string(), room_id - ) + stream_id = await self.locally_reject_invite(target.to_string(), room_id) return None, stream_id async def _user_joined_room(self, target: UserID, room_id: str) -> None: diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index a909744e93..19b69e0e11 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -19,6 +19,7 @@ from synapse.replication.http import ( federation, login, membership, + presence, register, send_event, streams, @@ -35,10 +36,11 @@ class ReplicationRestResource(JsonResource): def register_servlets(self, hs): send_event.register_servlets(hs, self) federation.register_servlets(hs, self) + presence.register_servlets(hs, self) + membership.register_servlets(hs, self) # The following can't currently be instantiated on workers. if hs.config.worker.worker_app is None: - membership.register_servlets(hs, self) login.register_servlets(hs, self) register.register_servlets(hs, self) devices.register_servlets(hs, self) diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index c3136a4eb9..793cef6c26 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -142,6 +142,7 @@ class ReplicationEndpoint(object): """ clock = hs.get_clock() client = hs.get_simple_http_client() + local_instance_name = hs.get_instance_name() master_host = hs.config.worker_replication_host master_port = hs.config.worker_replication_http_port @@ -151,6 +152,8 @@ class ReplicationEndpoint(object): @trace(opname="outgoing_replication_request") @defer.inlineCallbacks def send_request(instance_name="master", **kwargs): + if instance_name == local_instance_name: + raise Exception("Trying to send HTTP request to self") if instance_name == "master": host = master_host port = master_port diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index 050fd34562..a7174c4a8f 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -14,12 +14,16 @@ # limitations under the License. import logging +from typing import TYPE_CHECKING from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint from synapse.types import Requester, UserID from synapse.util.distributor import user_joined_room, user_left_room +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -106,6 +110,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): self.federation_handler = hs.get_handlers().federation_handler self.store = hs.get_datastore() self.clock = hs.get_clock() + self.member_handler = hs.get_room_member_handler() @staticmethod def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content): @@ -149,12 +154,44 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): # logger.warning("Failed to reject invite: %s", e) - stream_id = await self.store.locally_reject_invite(user_id, room_id) + stream_id = await self.member_handler.locally_reject_invite( + user_id, room_id + ) event_id = None return 200, {"event_id": event_id, "stream_id": stream_id} +class ReplicationLocallyRejectInviteRestServlet(ReplicationEndpoint): + """Rejects the invite for the user and room locally. + + Request format: + + POST /_synapse/replication/locally_reject_invite/:room_id/:user_id + + {} + """ + + NAME = "locally_reject_invite" + PATH_ARGS = ("room_id", "user_id") + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self.member_handler = hs.get_room_member_handler() + + @staticmethod + def _serialize_payload(room_id, user_id): + return {} + + async def _handle_request(self, request, room_id, user_id): + logger.info("locally_reject_invite: %s out of room: %s", user_id, room_id) + + stream_id = await self.member_handler.locally_reject_invite(user_id, room_id) + + return 200, {"stream_id": stream_id} + + class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): """Notifies that a user has joined or left the room @@ -208,3 +245,4 @@ def register_servlets(hs, http_server): ReplicationRemoteJoinRestServlet(hs).register(http_server) ReplicationRemoteRejectInviteRestServlet(hs).register(http_server) ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server) + ReplicationLocallyRejectInviteRestServlet(hs).register(http_server) diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py new file mode 100644 index 0000000000..ea1b33331b --- /dev/null +++ b/synapse/replication/http/presence.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING + +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint +from synapse.types import UserID + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class ReplicationBumpPresenceActiveTime(ReplicationEndpoint): + """We've seen the user do something that indicates they're interacting + with the app. + + The POST looks like: + + POST /_synapse/replication/bump_presence_active_time/ + + 200 OK + + {} + """ + + NAME = "bump_presence_active_time" + PATH_ARGS = ("user_id",) + METHOD = "POST" + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self._presence_handler = hs.get_presence_handler() + + @staticmethod + def _serialize_payload(user_id): + return {} + + async def _handle_request(self, request, user_id): + await self._presence_handler.bump_presence_active_time( + UserID.from_string(user_id) + ) + + return ( + 200, + {}, + ) + + +class ReplicationPresenceSetState(ReplicationEndpoint): + """Set the presence state for a user. + + The POST looks like: + + POST /_synapse/replication/presence_set_state/ + + { + "state": { ... }, + "ignore_status_msg": false, + } + + 200 OK + + {} + """ + + NAME = "presence_set_state" + PATH_ARGS = ("user_id",) + METHOD = "POST" + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self._presence_handler = hs.get_presence_handler() + + @staticmethod + def _serialize_payload(user_id, state, ignore_status_msg=False): + return { + "state": state, + "ignore_status_msg": ignore_status_msg, + } + + async def _handle_request(self, request, user_id): + content = parse_json_object_from_request(request) + + await self._presence_handler.set_state( + UserID.from_string(user_id), content["state"], content["ignore_status_msg"] + ) + + return ( + 200, + {}, + ) + + +def register_servlets(hs, http_server): + ReplicationBumpPresenceActiveTime(hs).register(http_server) + ReplicationPresenceSetState(hs).register(http_server) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index acfa66a7a8..03300e5336 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -38,7 +38,9 @@ from synapse.replication.tcp.commands import ( from synapse.replication.tcp.protocol import AbstractConnection from synapse.replication.tcp.streams import ( STREAMS_MAP, + BackfillStream, CachesStream, + EventsStream, FederationStream, Stream, ) @@ -87,6 +89,14 @@ class ReplicationCommandHandler: self._streams_to_replicate.append(stream) continue + if isinstance(stream, (EventsStream, BackfillStream)): + # Only add EventStream and BackfillStream as a source on the + # instance in charge of event persistence. + if hs.config.worker.writers.events == hs.get_instance_name(): + self._streams_to_replicate.append(stream) + + continue + # Only add any other streams if we're on master. if hs.config.worker_app is not None: continue diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 0a13e1ed34..8173baef8f 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -100,7 +100,9 @@ class ShutdownRoomRestServlet(RestServlet): # we try and auto join below. # # TODO: Currently the events stream is written to from master - await self._replication.wait_for_stream_position("master", "events", stream_id) + await self._replication.wait_for_stream_position( + self.hs.config.worker.writers.events, "events", stream_id + ) users = await self.state.get_current_users_in_room(room_id) kicked_users = [] @@ -113,7 +115,7 @@ class ShutdownRoomRestServlet(RestServlet): try: target_requester = create_requester(user_id) - await self.room_member_handler.update_membership( + _, stream_id = await self.room_member_handler.update_membership( requester=target_requester, target=target_requester.user, room_id=room_id, @@ -123,6 +125,11 @@ class ShutdownRoomRestServlet(RestServlet): require_consent=False, ) + # Wait for leave to come in over replication before trying to forget. + await self._replication.wait_for_stream_position( + self.hs.config.worker.writers.events, "events", stream_id + ) + await self.room_member_handler.forget(target_requester.user, room_id) await self.room_member_handler.update_membership( diff --git a/synapse/storage/data_stores/__init__.py b/synapse/storage/data_stores/__init__.py index 791961b296..599ee470d4 100644 --- a/synapse/storage/data_stores/__init__.py +++ b/synapse/storage/data_stores/__init__.py @@ -66,9 +66,9 @@ class DataStores(object): self.main = main_store_class(database, db_conn, hs) - # If we're on a process that can persist events (currently - # master), also instantiate a `PersistEventsStore` - if hs.config.worker.worker_app is None: + # If we're on a process that can persist events also + # instantiate a `PersistEventsStore` + if hs.config.worker.writers.events == hs.get_instance_name(): self.persist_events = PersistEventsStore( hs, database, self.main ) diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index 0e8378714a..417ac8dc7c 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -689,6 +689,25 @@ class DeviceWorkerStore(SQLBaseStore): desc="make_remote_user_device_cache_as_stale", ) + def mark_remote_user_device_list_as_unsubscribed(self, user_id): + """Mark that we no longer track device lists for remote user. + """ + + def _mark_remote_user_device_list_as_unsubscribed_txn(txn): + self.db.simple_delete_txn( + txn, + table="device_lists_remote_extremeties", + keyvalues={"user_id": user_id}, + ) + self._invalidate_cache_and_stream( + txn, self.get_device_list_last_stream_id_for_remote, (user_id,) + ) + + return self.db.runInteraction( + "mark_remote_user_device_list_as_unsubscribed", + _mark_remote_user_device_list_as_unsubscribed_txn, + ) + class DeviceBackgroundUpdateStore(SQLBaseStore): def __init__(self, database: Database, db_conn, hs): @@ -969,17 +988,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): desc="update_device", ) - @defer.inlineCallbacks - def mark_remote_user_device_list_as_unsubscribed(self, user_id): - """Mark that we no longer track device lists for remote user. - """ - yield self.db.simple_delete( - table="device_lists_remote_extremeties", - keyvalues={"user_id": user_id}, - desc="mark_remote_user_device_list_as_unsubscribed", - ) - self.get_device_list_last_stream_id_for_remote.invalidate((user_id,)) - def update_remote_device_list_cache_entry( self, user_id, device_id, content, stream_id ): diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index a97f8b3934..a6572571b4 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -138,10 +138,10 @@ class PersistEventsStore: self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator - # This should only exist on master for now + # This should only exist on instances that are configured to write assert ( - hs.config.worker.worker_app is None - ), "Can only instantiate PersistEventsStore on master" + hs.config.worker.writers.events == hs.get_instance_name() + ), "Can only instantiate EventsStore on master" @_retry_on_integrity_error @defer.inlineCallbacks @@ -1590,3 +1590,31 @@ class PersistEventsStore: if not ev.internal_metadata.is_outlier() ], ) + + async def locally_reject_invite(self, user_id: str, room_id: str) -> int: + """Mark the invite has having been rejected even though we failed to + create a leave event for it. + """ + + sql = ( + "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE" + " room_id = ? AND invitee = ? AND locally_rejected is NULL" + " AND replaced_by is NULL" + ) + + def f(txn, stream_ordering): + txn.execute(sql, (stream_ordering, True, room_id, user_id)) + + # We also clear this entry from `local_current_membership`. + # Ideally we'd point to a leave event, but we don't have one, so + # nevermind. + self.db.simple_delete_txn( + txn, + table="local_current_membership", + keyvalues={"room_id": room_id, "user_id": user_id}, + ) + + with self._stream_id_gen.get_next() as stream_ordering: + await self.db.runInteraction("locally_reject_invite", f, stream_ordering) + + return stream_ordering diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index b880a71782..213d69100a 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -76,7 +76,7 @@ class EventsWorkerStore(SQLBaseStore): def __init__(self, database: Database, db_conn, hs): super(EventsWorkerStore, self).__init__(database, db_conn, hs) - if hs.config.worker_app is None: + if hs.config.worker.writers.events == hs.get_instance_name(): # We are the process in charge of generating stream ids for events, # so instantiate ID generators based on the database self._stream_id_gen = StreamIdGenerator( diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py index 7c5ca81ae0..137ebac833 100644 --- a/synapse/storage/data_stores/main/roommember.py +++ b/synapse/storage/data_stores/main/roommember.py @@ -1046,31 +1046,6 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore): def __init__(self, database: Database, db_conn, hs): super(RoomMemberStore, self).__init__(database, db_conn, hs) - @defer.inlineCallbacks - def locally_reject_invite(self, user_id, room_id): - sql = ( - "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE" - " room_id = ? AND invitee = ? AND locally_rejected is NULL" - " AND replaced_by is NULL" - ) - - def f(txn, stream_ordering): - txn.execute(sql, (stream_ordering, True, room_id, user_id)) - - # We also clear this entry from `local_current_membership`. - # Ideally we'd point to a leave event, but we don't have one, so - # nevermind. - self.db.simple_delete_txn( - txn, - table="local_current_membership", - keyvalues={"room_id": room_id, "user_id": user_id}, - ) - - with self._stream_id_gen.get_next() as stream_ordering: - yield self.db.runInteraction("locally_reject_invite", f, stream_ordering) - - return stream_ordering - def forget(self, user_id, room_id): """Indicate that user_id wishes to discard history for room_id.""" diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 12e1ffb9a2..f159400a87 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -786,3 +786,9 @@ class EventsPersistenceStorage(object): for user_id in left_users: await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id) + + async def locally_reject_invite(self, user_id: str, room_id: str) -> int: + """Mark the invite has having been rejected even though we failed to + create a leave event for it. + """ + return await self.persist_events_store.locally_reject_invite(user_id, room_id)