From 8cb8df55e9bb5de27d9e6570441560eb81db36df Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Jul 2018 15:22:02 +0100 Subject: [PATCH 01/29] Split MessageHandler into read only and writers This will let us call the read only parts from workers, and so be able to move some APIs off of master, e.g. the `/state` API. --- synapse/handlers/__init__.py | 2 - synapse/handlers/message.py | 281 +++++++++++++++++--------------- synapse/rest/client/v1/admin.py | 8 +- synapse/rest/client/v1/room.py | 20 +-- synapse/server.py | 14 +- 5 files changed, 176 insertions(+), 149 deletions(-) diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 4b9923d8c0..0bad3e0a2e 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -17,7 +17,6 @@ from .admin import AdminHandler from .directory import DirectoryHandler from .federation import FederationHandler from .identity import IdentityHandler -from .message import MessageHandler from .register import RegistrationHandler from .room import RoomContextHandler from .search import SearchHandler @@ -44,7 +43,6 @@ class Handlers(object): def __init__(self, hs): self.registration_handler = RegistrationHandler(hs) - self.message_handler = MessageHandler(hs) self.federation_handler = FederationHandler(hs) self.directory_handler = DirectoryHandler(hs) self.admin_handler = AdminHandler(hs) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a39b852ceb..3c6f9860d5 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -75,12 +75,159 @@ class PurgeStatus(object): } -class MessageHandler(BaseHandler): +class MessageHandler(object): + """Contains some read only APIs to get state about a room + """ def __init__(self, hs): - super(MessageHandler, self).__init__(hs) - self.hs = hs + self.auth = hs.get_auth() + self.clock = hs.get_clock() self.state = hs.get_state_handler() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def get_room_data(self, user_id=None, room_id=None, + event_type=None, state_key="", is_guest=False): + """ Get data from a room. + + Args: + event : The room path event + Returns: + The path data content. + Raises: + SynapseError if something went wrong. + """ + membership, membership_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + + if membership == Membership.JOIN: + data = yield self.state.get_current_state( + room_id, event_type, state_key + ) + elif membership == Membership.LEAVE: + key = (event_type, state_key) + room_state = yield self.store.get_state_for_events( + [membership_event_id], [key] + ) + data = room_state[membership_event_id].get(key) + + defer.returnValue(data) + + @defer.inlineCallbacks + def _check_in_room_or_world_readable(self, room_id, user_id): + try: + # check_user_was_in_room will return the most recent membership + # event for the user if: + # * The user is a non-guest user, and was ever in the room + # * The user is a guest user, and has joined the room + # else it will throw. + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + defer.returnValue((member_event.membership, member_event.event_id)) + return + except AuthError: + visibility = yield self.state.get_current_state( + room_id, EventTypes.RoomHistoryVisibility, "" + ) + if ( + visibility and + visibility.content["history_visibility"] == "world_readable" + ): + defer.returnValue((Membership.JOIN, None)) + return + raise AuthError( + 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN + ) + + @defer.inlineCallbacks + def get_state_events(self, user_id, room_id, is_guest=False): + """Retrieve all state events for a given room. If the user is + joined to the room then return the current state. If the user has + left the room return the state events from when they left. + + Args: + user_id(str): The user requesting state events. + room_id(str): The room ID to get all state events from. + Returns: + A list of dicts representing state events. [{}, {}, {}] + """ + membership, membership_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + + if membership == Membership.JOIN: + room_state = yield self.state.get_current_state(room_id) + elif membership == Membership.LEAVE: + room_state = yield self.store.get_state_for_events( + [membership_event_id], None + ) + room_state = room_state[membership_event_id] + + now = self.clock.time_msec() + defer.returnValue( + [serialize_event(c, now) for c in room_state.values()] + ) + + @defer.inlineCallbacks + def get_joined_members(self, requester, room_id): + """Get all the joined members in the room and their profile information. + + If the user has left the room return the state events from when they left. + + Args: + requester(Requester): The user requesting state events. + room_id(str): The room ID to get all state events from. + Returns: + A dict of user_id to profile info + """ + user_id = requester.user.to_string() + if not requester.app_service: + # We check AS auth after fetching the room membership, as it + # requires us to pull out all joined members anyway. + membership, _ = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + if membership != Membership.JOIN: + raise NotImplementedError( + "Getting joined members after leaving is not implemented" + ) + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + + # If this is an AS, double check that they are allowed to see the members. + # This can either be because the AS user is in the room or because there + # is a user in the room that the AS is "interested in" + if requester.app_service and user_id not in users_with_profile: + for uid in users_with_profile: + if requester.app_service.is_interested_in_user(uid): + break + else: + # Loop fell through, AS has no interested users in room + raise AuthError(403, "Appservice not in room") + + defer.returnValue({ + user_id: { + "avatar_url": profile.avatar_url, + "display_name": profile.display_name, + } + for user_id, profile in iteritems(users_with_profile) + }) + + +class PaginationHandler(MessageHandler): + """Handles pagination and purge history requests. + + These are in the same handler due to the fact we need to block clients + paginating during a purge. + + This subclasses MessageHandler to get at _check_in_room_or_world_readable + """ + + def __init__(self, hs): + super(PaginationHandler, self).__init__(hs) + + self.hs = hs + self.store = hs.get_datastore() self.clock = hs.get_clock() self.pagination_lock = ReadWriteLock() @@ -274,134 +421,6 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) - @defer.inlineCallbacks - def get_room_data(self, user_id=None, room_id=None, - event_type=None, state_key="", is_guest=False): - """ Get data from a room. - - Args: - event : The room path event - Returns: - The path data content. - Raises: - SynapseError if something went wrong. - """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - - if membership == Membership.JOIN: - data = yield self.state_handler.get_current_state( - room_id, event_type, state_key - ) - elif membership == Membership.LEAVE: - key = (event_type, state_key) - room_state = yield self.store.get_state_for_events( - [membership_event_id], [key] - ) - data = room_state[membership_event_id].get(key) - - defer.returnValue(data) - - @defer.inlineCallbacks - def _check_in_room_or_world_readable(self, room_id, user_id): - try: - # check_user_was_in_room will return the most recent membership - # event for the user if: - # * The user is a non-guest user, and was ever in the room - # * The user is a guest user, and has joined the room - # else it will throw. - member_event = yield self.auth.check_user_was_in_room(room_id, user_id) - defer.returnValue((member_event.membership, member_event.event_id)) - return - except AuthError: - visibility = yield self.state_handler.get_current_state( - room_id, EventTypes.RoomHistoryVisibility, "" - ) - if ( - visibility and - visibility.content["history_visibility"] == "world_readable" - ): - defer.returnValue((Membership.JOIN, None)) - return - raise AuthError( - 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN - ) - - @defer.inlineCallbacks - def get_state_events(self, user_id, room_id, is_guest=False): - """Retrieve all state events for a given room. If the user is - joined to the room then return the current state. If the user has - left the room return the state events from when they left. - - Args: - user_id(str): The user requesting state events. - room_id(str): The room ID to get all state events from. - Returns: - A list of dicts representing state events. [{}, {}, {}] - """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - - if membership == Membership.JOIN: - room_state = yield self.state_handler.get_current_state(room_id) - elif membership == Membership.LEAVE: - room_state = yield self.store.get_state_for_events( - [membership_event_id], None - ) - room_state = room_state[membership_event_id] - - now = self.clock.time_msec() - defer.returnValue( - [serialize_event(c, now) for c in room_state.values()] - ) - - @defer.inlineCallbacks - def get_joined_members(self, requester, room_id): - """Get all the joined members in the room and their profile information. - - If the user has left the room return the state events from when they left. - - Args: - requester(Requester): The user requesting state events. - room_id(str): The room ID to get all state events from. - Returns: - A dict of user_id to profile info - """ - user_id = requester.user.to_string() - if not requester.app_service: - # We check AS auth after fetching the room membership, as it - # requires us to pull out all joined members anyway. - membership, _ = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - if membership != Membership.JOIN: - raise NotImplementedError( - "Getting joined members after leaving is not implemented" - ) - - users_with_profile = yield self.state.get_current_user_in_room(room_id) - - # If this is an AS, double check that they are allowed to see the members. - # This can either be because the AS user is in the room or because there - # is a user in the room that the AS is "interested in" - if requester.app_service and user_id not in users_with_profile: - for uid in users_with_profile: - if requester.app_service.is_interested_in_user(uid): - break - else: - # Loop fell through, AS has no interested users in room - raise AuthError(403, "Appservice not in room") - - defer.returnValue({ - user_id: { - "avatar_url": profile.avatar_url, - "display_name": profile.display_name, - } - for user_id, profile in iteritems(users_with_profile) - }) - class EventCreationHandler(object): def __init__(self, hs): diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 2dc50e582b..13fd63a5b2 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -123,7 +123,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): hs (synapse.server.HomeServer) """ super(PurgeHistoryRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.pagination_handler = hs.get_pagination_handler() self.store = hs.get_datastore() @defer.inlineCallbacks @@ -198,7 +198,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): errcode=Codes.BAD_JSON, ) - purge_id = yield self.handlers.message_handler.start_purge_history( + purge_id = yield self.pagination_handler.start_purge_history( room_id, token, delete_local_events=delete_local_events, ) @@ -220,7 +220,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet): hs (synapse.server.HomeServer) """ super(PurgeHistoryStatusRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.pagination_handler = hs.get_pagination_handler() @defer.inlineCallbacks def on_GET(self, request, purge_id): @@ -230,7 +230,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet): if not is_admin: raise AuthError(403, "You are not a server admin") - purge_status = self.handlers.message_handler.get_purge_status(purge_id) + purge_status = self.pagination_handler.get_purge_status(purge_id) if purge_status is None: raise NotFoundError("purge id '%s' not found" % purge_id) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 3d62447854..8b6be9da96 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -90,6 +90,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet): self.handlers = hs.get_handlers() self.event_creation_hander = hs.get_event_creation_handler() self.room_member_handler = hs.get_room_member_handler() + self.message_handler = hs.get_message_handler() def register(self, http_server): # /room/$roomid/state/$eventtype @@ -124,7 +125,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet): format = parse_string(request, "format", default="content", allowed_values=["content", "event"]) - msg_handler = self.handlers.message_handler + msg_handler = self.message_handler data = yield msg_handler.get_room_data( user_id=requester.user.to_string(), room_id=room_id, @@ -377,14 +378,13 @@ class RoomMemberListRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomMemberListRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.message_handler = hs.get_message_handler() @defer.inlineCallbacks def on_GET(self, request, room_id): # TODO support Pagination stream API (limit/tokens) requester = yield self.auth.get_user_by_req(request) - handler = self.handlers.message_handler - events = yield handler.get_state_events( + events = yield self.message_handler.get_state_events( room_id=room_id, user_id=requester.user.to_string(), ) @@ -406,7 +406,7 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet): def __init__(self, hs): super(JoinedRoomMemberListRestServlet, self).__init__(hs) - self.message_handler = hs.get_handlers().message_handler + self.message_handler = hs.get_message_handler() @defer.inlineCallbacks def on_GET(self, request, room_id): @@ -427,7 +427,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomMessageListRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.pagination_handler = hs.get_pagination_handler() @defer.inlineCallbacks def on_GET(self, request, room_id): @@ -442,8 +442,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet): event_filter = Filter(json.loads(filter_json)) else: event_filter = None - handler = self.handlers.message_handler - msgs = yield handler.get_messages( + msgs = yield self.pagination_handler.get_messages( room_id=room_id, requester=requester, pagin_config=pagination_config, @@ -460,14 +459,13 @@ class RoomStateRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomStateRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.message_handler = hs.get_message_handler() @defer.inlineCallbacks def on_GET(self, request, room_id): requester = yield self.auth.get_user_by_req(request, allow_guest=True) - handler = self.handlers.message_handler # Get all the current state for this room - events = yield handler.get_state_events( + events = yield self.message_handler.get_state_events( room_id=room_id, user_id=requester.user.to_string(), is_guest=requester.is_guest, diff --git a/synapse/server.py b/synapse/server.py index 92bea96c5c..b93bd6d7d9 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -52,7 +52,11 @@ from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.groups_local import GroupsLocalHandler from synapse.handlers.initial_sync import InitialSyncHandler -from synapse.handlers.message import EventCreationHandler +from synapse.handlers.message import ( + EventCreationHandler, + MessageHandler, + PaginationHandler, +) from synapse.handlers.presence import PresenceHandler from synapse.handlers.profile import ProfileHandler from synapse.handlers.read_marker import ReadMarkerHandler @@ -163,6 +167,8 @@ class HomeServer(object): 'federation_registry', 'server_notices_manager', 'server_notices_sender', + 'message_handler', + 'pagination_handler', ] def __init__(self, hostname, reactor=None, **kwargs): @@ -426,6 +432,12 @@ class HomeServer(object): return WorkerServerNoticesSender(self) return ServerNoticesSender(self) + def build_message_handler(self): + return MessageHandler(self) + + def build_pagination_handler(self): + return PaginationHandler(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) From bacdf0cbf9fdbf9bbab2420b86308830ac4e4592 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Jul 2018 15:29:45 +0100 Subject: [PATCH 02/29] Move RoomContextHandler out of Handlers This is in preparation for moving GET /context/ to a worker --- synapse/handlers/__init__.py | 2 -- synapse/handlers/room.py | 6 +++++- synapse/rest/client/v1/room.py | 4 ++-- synapse/server.py | 6 +++++- 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 0bad3e0a2e..413425fed1 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -18,7 +18,6 @@ from .directory import DirectoryHandler from .federation import FederationHandler from .identity import IdentityHandler from .register import RegistrationHandler -from .room import RoomContextHandler from .search import SearchHandler @@ -48,4 +47,3 @@ class Handlers(object): self.admin_handler = AdminHandler(hs) self.identity_handler = IdentityHandler(hs) self.search_handler = SearchHandler(hs) - self.room_context_handler = RoomContextHandler(hs) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index f67512078b..6150b7e226 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -395,7 +395,11 @@ class RoomCreationHandler(BaseHandler): ) -class RoomContextHandler(BaseHandler): +class RoomContextHandler(object): + def __init__(self, hs): + self.hs = hs + self.store = hs.get_datastore() + @defer.inlineCallbacks def get_event_context(self, user, room_id, event_id, limit): """Retrieves events, pagination tokens and state around a given event diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 8b6be9da96..2c9459534e 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -523,7 +523,7 @@ class RoomEventContextServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomEventContextServlet, self).__init__(hs) self.clock = hs.get_clock() - self.handlers = hs.get_handlers() + self.room_context_handler = hs.get_room_context_handler() @defer.inlineCallbacks def on_GET(self, request, room_id, event_id): @@ -531,7 +531,7 @@ class RoomEventContextServlet(ClientV1RestServlet): limit = parse_integer(request, "limit", default=10) - results = yield self.handlers.room_context_handler.get_event_context( + results = yield self.room_context_handler.get_event_context( requester.user, room_id, event_id, diff --git a/synapse/server.py b/synapse/server.py index b93bd6d7d9..a24ea158df 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -61,7 +61,7 @@ from synapse.handlers.presence import PresenceHandler from synapse.handlers.profile import ProfileHandler from synapse.handlers.read_marker import ReadMarkerHandler from synapse.handlers.receipts import ReceiptsHandler -from synapse.handlers.room import RoomCreationHandler +from synapse.handlers.room import RoomContextHandler, RoomCreationHandler from synapse.handlers.room_list import RoomListHandler from synapse.handlers.room_member import RoomMemberMasterHandler from synapse.handlers.room_member_worker import RoomMemberWorkerHandler @@ -169,6 +169,7 @@ class HomeServer(object): 'server_notices_sender', 'message_handler', 'pagination_handler', + 'room_context_handler', ] def __init__(self, hostname, reactor=None, **kwargs): @@ -438,6 +439,9 @@ class HomeServer(object): def build_pagination_handler(self): return PaginationHandler(self) + def build_room_context_handler(self): + return RoomContextHandler(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) From 9b596177aeb12211e86cce27aa409bc596909ed0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Jul 2018 15:30:43 +0100 Subject: [PATCH 03/29] Add some room read only APIs to client_reader --- synapse/app/client_reader.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index b0ea26dcb4..398bb36602 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -40,7 +40,13 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.rest.client.v1.room import PublicRoomListRestServlet +from synapse.rest.client.v1.room import ( + JoinedRoomMemberListRestServlet, + PublicRoomListRestServlet, + RoomEventContextServlet, + RoomMemberListRestServlet, + RoomStateRestServlet, +) from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree @@ -82,7 +88,13 @@ class ClientReaderServer(HomeServer): resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "client": resource = JsonResource(self, canonical_json=False) + PublicRoomListRestServlet(self).register(resource) + RoomMemberListRestServlet(self).register(resource) + JoinedRoomMemberListRestServlet(self).register(resource) + RoomStateRestServlet(self).register(resource) + RoomEventContextServlet(self).register(resource) + resources.update({ "/_matrix/client/r0": resource, "/_matrix/client/unstable": resource, From 3a993a660d0d05578c189591980a9383812d7627 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Jul 2018 15:39:49 +0100 Subject: [PATCH 04/29] Newsfile --- changelog.d/3555.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3555.feature diff --git a/changelog.d/3555.feature b/changelog.d/3555.feature new file mode 100644 index 0000000000..ea4a85e0ae --- /dev/null +++ b/changelog.d/3555.feature @@ -0,0 +1 @@ +Add support for client_reader to handle more APIs From 0ecf68aedc09f4037208b613b692a8a98c78b3ea Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Jul 2018 15:30:59 +0100 Subject: [PATCH 05/29] Move check_in_room_or_world_readable to Auth --- synapse/api/auth.py | 34 +++++++++++++++++++++++++++++++ synapse/handlers/message.py | 40 ++++++------------------------------- 2 files changed, 40 insertions(+), 34 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index bc629832d9..bf9efb170a 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -737,3 +737,37 @@ class Auth(object): ) return query_params[0] + + @defer.inlineCallbacks + def check_in_room_or_world_readable(self, room_id, user_id): + """Checks that the user is or was in the room or the room is world + readable. If it isn't then an exception is raised. + + Returns: + Deferred[tuple[str, str|None]]: Resolves to the current membership of + the user in the room and the membership event ID of the user. If + the user is not in the room and never has been, then + `(Membership.JOIN, None)` is returned. + """ + + try: + # check_user_was_in_room will return the most recent membership + # event for the user if: + # * The user is a non-guest user, and was ever in the room + # * The user is a guest user, and has joined the room + # else it will throw. + member_event = yield self.check_user_was_in_room(room_id, user_id) + defer.returnValue((member_event.membership, member_event.event_id)) + except AuthError: + visibility = yield self.state.get_current_state( + room_id, EventTypes.RoomHistoryVisibility, "" + ) + if ( + visibility and + visibility.content["history_visibility"] == "world_readable" + ): + defer.returnValue((Membership.JOIN, None)) + return + raise AuthError( + 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN + ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3c6f9860d5..c1489cd066 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -97,7 +97,7 @@ class MessageHandler(object): Raises: SynapseError if something went wrong. """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( + membership, membership_event_id = yield self.auth.check_in_room_or_world_readable( room_id, user_id ) @@ -114,31 +114,6 @@ class MessageHandler(object): defer.returnValue(data) - @defer.inlineCallbacks - def _check_in_room_or_world_readable(self, room_id, user_id): - try: - # check_user_was_in_room will return the most recent membership - # event for the user if: - # * The user is a non-guest user, and was ever in the room - # * The user is a guest user, and has joined the room - # else it will throw. - member_event = yield self.auth.check_user_was_in_room(room_id, user_id) - defer.returnValue((member_event.membership, member_event.event_id)) - return - except AuthError: - visibility = yield self.state.get_current_state( - room_id, EventTypes.RoomHistoryVisibility, "" - ) - if ( - visibility and - visibility.content["history_visibility"] == "world_readable" - ): - defer.returnValue((Membership.JOIN, None)) - return - raise AuthError( - 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN - ) - @defer.inlineCallbacks def get_state_events(self, user_id, room_id, is_guest=False): """Retrieve all state events for a given room. If the user is @@ -151,7 +126,7 @@ class MessageHandler(object): Returns: A list of dicts representing state events. [{}, {}, {}] """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( + membership, membership_event_id = yield self.auth.check_in_room_or_world_readable( room_id, user_id ) @@ -184,7 +159,7 @@ class MessageHandler(object): if not requester.app_service: # We check AS auth after fetching the room membership, as it # requires us to pull out all joined members anyway. - membership, _ = yield self._check_in_room_or_world_readable( + membership, _ = yield self.auth.check_in_room_or_world_readable( room_id, user_id ) if membership != Membership.JOIN: @@ -214,19 +189,16 @@ class MessageHandler(object): }) -class PaginationHandler(MessageHandler): +class PaginationHandler(object): """Handles pagination and purge history requests. These are in the same handler due to the fact we need to block clients paginating during a purge. - - This subclasses MessageHandler to get at _check_in_room_or_world_readable """ def __init__(self, hs): - super(PaginationHandler, self).__init__(hs) - self.hs = hs + self.auth = hs.get_auth() self.store = hs.get_datastore() self.clock = hs.get_clock() @@ -349,7 +321,7 @@ class PaginationHandler(MessageHandler): source_config = pagin_config.get_source_config("room") with (yield self.pagination_lock.read(room_id)): - membership, member_event_id = yield self._check_in_room_or_world_readable( + membership, member_event_id = yield self.auth.check_in_room_or_world_readable( room_id, user_id ) From 5c88bb722f57af1c77f34d77152689425ab95eba Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Jul 2018 15:32:23 +0100 Subject: [PATCH 06/29] Move PaginationHandler to its own file --- synapse/handlers/message.py | 242 +----------------------------- synapse/handlers/pagination.py | 265 +++++++++++++++++++++++++++++++++ synapse/server.py | 7 +- 3 files changed, 269 insertions(+), 245 deletions(-) create mode 100644 synapse/handlers/pagination.py diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c1489cd066..ba3c4642bc 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json, json from twisted.internet import defer from twisted.internet.defer import succeed -from twisted.python.failure import Failure from synapse.api.constants import MAX_DEPTH, EventTypes, Membership from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError @@ -32,49 +31,17 @@ from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.replication.http.send_event import send_event_to_master -from synapse.types import RoomAlias, RoomStreamToken, UserID -from synapse.util.async import Limiter, ReadWriteLock +from synapse.types import RoomAlias, UserID +from synapse.util.async import Limiter from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func -from synapse.util.stringutils import random_string -from synapse.visibility import filter_events_for_client from ._base import BaseHandler logger = logging.getLogger(__name__) -class PurgeStatus(object): - """Object tracking the status of a purge request - - This class contains information on the progress of a purge request, for - return by get_purge_status. - - Attributes: - status (int): Tracks whether this request has completed. One of - STATUS_{ACTIVE,COMPLETE,FAILED} - """ - - STATUS_ACTIVE = 0 - STATUS_COMPLETE = 1 - STATUS_FAILED = 2 - - STATUS_TEXT = { - STATUS_ACTIVE: "active", - STATUS_COMPLETE: "complete", - STATUS_FAILED: "failed", - } - - def __init__(self): - self.status = PurgeStatus.STATUS_ACTIVE - - def asdict(self): - return { - "status": PurgeStatus.STATUS_TEXT[self.status] - } - - class MessageHandler(object): """Contains some read only APIs to get state about a room """ @@ -189,211 +156,6 @@ class MessageHandler(object): }) -class PaginationHandler(object): - """Handles pagination and purge history requests. - - These are in the same handler due to the fact we need to block clients - paginating during a purge. - """ - - def __init__(self, hs): - self.hs = hs - self.auth = hs.get_auth() - self.store = hs.get_datastore() - self.clock = hs.get_clock() - - self.pagination_lock = ReadWriteLock() - self._purges_in_progress_by_room = set() - # map from purge id to PurgeStatus - self._purges_by_id = {} - - def start_purge_history(self, room_id, token, - delete_local_events=False): - """Start off a history purge on a room. - - Args: - room_id (str): The room to purge from - - token (str): topological token to delete events before - delete_local_events (bool): True to delete local events as well as - remote ones - - Returns: - str: unique ID for this purge transaction. - """ - if room_id in self._purges_in_progress_by_room: - raise SynapseError( - 400, - "History purge already in progress for %s" % (room_id, ), - ) - - purge_id = random_string(16) - - # we log the purge_id here so that it can be tied back to the - # request id in the log lines. - logger.info("[purge] starting purge_id %s", purge_id) - - self._purges_by_id[purge_id] = PurgeStatus() - run_in_background( - self._purge_history, - purge_id, room_id, token, delete_local_events, - ) - return purge_id - - @defer.inlineCallbacks - def _purge_history(self, purge_id, room_id, token, - delete_local_events): - """Carry out a history purge on a room. - - Args: - purge_id (str): The id for this purge - room_id (str): The room to purge from - token (str): topological token to delete events before - delete_local_events (bool): True to delete local events as well as - remote ones - - Returns: - Deferred - """ - self._purges_in_progress_by_room.add(room_id) - try: - with (yield self.pagination_lock.write(room_id)): - yield self.store.purge_history( - room_id, token, delete_local_events, - ) - logger.info("[purge] complete") - self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE - except Exception: - logger.error("[purge] failed: %s", Failure().getTraceback().rstrip()) - self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED - finally: - self._purges_in_progress_by_room.discard(room_id) - - # remove the purge from the list 24 hours after it completes - def clear_purge(): - del self._purges_by_id[purge_id] - self.hs.get_reactor().callLater(24 * 3600, clear_purge) - - def get_purge_status(self, purge_id): - """Get the current status of an active purge - - Args: - purge_id (str): purge_id returned by start_purge_history - - Returns: - PurgeStatus|None - """ - return self._purges_by_id.get(purge_id) - - @defer.inlineCallbacks - def get_messages(self, requester, room_id=None, pagin_config=None, - as_client_event=True, event_filter=None): - """Get messages in a room. - - Args: - requester (Requester): The user requesting messages. - room_id (str): The room they want messages from. - pagin_config (synapse.api.streams.PaginationConfig): The pagination - config rules to apply, if any. - as_client_event (bool): True to get events in client-server format. - event_filter (Filter): Filter to apply to results or None - Returns: - dict: Pagination API results - """ - user_id = requester.user.to_string() - - if pagin_config.from_token: - room_token = pagin_config.from_token.room_key - else: - pagin_config.from_token = ( - yield self.hs.get_event_sources().get_current_token_for_room( - room_id=room_id - ) - ) - room_token = pagin_config.from_token.room_key - - room_token = RoomStreamToken.parse(room_token) - - pagin_config.from_token = pagin_config.from_token.copy_and_replace( - "room_key", str(room_token) - ) - - source_config = pagin_config.get_source_config("room") - - with (yield self.pagination_lock.read(room_id)): - membership, member_event_id = yield self.auth.check_in_room_or_world_readable( - room_id, user_id - ) - - if source_config.direction == 'b': - # if we're going backwards, we might need to backfill. This - # requires that we have a topo token. - if room_token.topological: - max_topo = room_token.topological - else: - max_topo = yield self.store.get_max_topological_token( - room_id, room_token.stream - ) - - if membership == Membership.LEAVE: - # If they have left the room then clamp the token to be before - # they left the room, to save the effort of loading from the - # database. - leave_token = yield self.store.get_topological_token_for_event( - member_event_id - ) - leave_token = RoomStreamToken.parse(leave_token) - if leave_token.topological < max_topo: - source_config.from_key = str(leave_token) - - yield self.hs.get_handlers().federation_handler.maybe_backfill( - room_id, max_topo - ) - - events, next_key = yield self.store.paginate_room_events( - room_id=room_id, - from_key=source_config.from_key, - to_key=source_config.to_key, - direction=source_config.direction, - limit=source_config.limit, - event_filter=event_filter, - ) - - next_token = pagin_config.from_token.copy_and_replace( - "room_key", next_key - ) - - if not events: - defer.returnValue({ - "chunk": [], - "start": pagin_config.from_token.to_string(), - "end": next_token.to_string(), - }) - - if event_filter: - events = event_filter.filter(events) - - events = yield filter_events_for_client( - self.store, - user_id, - events, - is_peeking=(member_event_id is None), - ) - - time_now = self.clock.time_msec() - - chunk = { - "chunk": [ - serialize_event(e, time_now, as_client_event) - for e in events - ], - "start": pagin_config.from_token.to_string(), - "end": next_token.to_string(), - } - - defer.returnValue(chunk) - - class EventCreationHandler(object): def __init__(self, hs): self.hs = hs diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py new file mode 100644 index 0000000000..b2849783ed --- /dev/null +++ b/synapse/handlers/pagination.py @@ -0,0 +1,265 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 - 2016 OpenMarket Ltd +# Copyright 2017 - 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from twisted.internet import defer +from twisted.python.failure import Failure + +from synapse.api.constants import Membership +from synapse.api.errors import SynapseError +from synapse.events.utils import serialize_event +from synapse.types import RoomStreamToken +from synapse.util.async import ReadWriteLock +from synapse.util.logcontext import run_in_background +from synapse.util.stringutils import random_string +from synapse.visibility import filter_events_for_client + +logger = logging.getLogger(__name__) + + +class PurgeStatus(object): + """Object tracking the status of a purge request + + This class contains information on the progress of a purge request, for + return by get_purge_status. + + Attributes: + status (int): Tracks whether this request has completed. One of + STATUS_{ACTIVE,COMPLETE,FAILED} + """ + + STATUS_ACTIVE = 0 + STATUS_COMPLETE = 1 + STATUS_FAILED = 2 + + STATUS_TEXT = { + STATUS_ACTIVE: "active", + STATUS_COMPLETE: "complete", + STATUS_FAILED: "failed", + } + + def __init__(self): + self.status = PurgeStatus.STATUS_ACTIVE + + def asdict(self): + return { + "status": PurgeStatus.STATUS_TEXT[self.status] + } + + +class PaginationHandler(object): + """Handles pagination and purge history requests. + + These are in the same handler due to the fact we need to block clients + paginating during a purge. + """ + + def __init__(self, hs): + self.hs = hs + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.clock = hs.get_clock() + + self.pagination_lock = ReadWriteLock() + self._purges_in_progress_by_room = set() + # map from purge id to PurgeStatus + self._purges_by_id = {} + + def start_purge_history(self, room_id, token, + delete_local_events=False): + """Start off a history purge on a room. + + Args: + room_id (str): The room to purge from + + token (str): topological token to delete events before + delete_local_events (bool): True to delete local events as well as + remote ones + + Returns: + str: unique ID for this purge transaction. + """ + if room_id in self._purges_in_progress_by_room: + raise SynapseError( + 400, + "History purge already in progress for %s" % (room_id, ), + ) + + purge_id = random_string(16) + + # we log the purge_id here so that it can be tied back to the + # request id in the log lines. + logger.info("[purge] starting purge_id %s", purge_id) + + self._purges_by_id[purge_id] = PurgeStatus() + run_in_background( + self._purge_history, + purge_id, room_id, token, delete_local_events, + ) + return purge_id + + @defer.inlineCallbacks + def _purge_history(self, purge_id, room_id, token, + delete_local_events): + """Carry out a history purge on a room. + + Args: + purge_id (str): The id for this purge + room_id (str): The room to purge from + token (str): topological token to delete events before + delete_local_events (bool): True to delete local events as well as + remote ones + + Returns: + Deferred + """ + self._purges_in_progress_by_room.add(room_id) + try: + with (yield self.pagination_lock.write(room_id)): + yield self.store.purge_history( + room_id, token, delete_local_events, + ) + logger.info("[purge] complete") + self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE + except Exception: + logger.error("[purge] failed: %s", Failure().getTraceback().rstrip()) + self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED + finally: + self._purges_in_progress_by_room.discard(room_id) + + # remove the purge from the list 24 hours after it completes + def clear_purge(): + del self._purges_by_id[purge_id] + self.hs.get_reactor().callLater(24 * 3600, clear_purge) + + def get_purge_status(self, purge_id): + """Get the current status of an active purge + + Args: + purge_id (str): purge_id returned by start_purge_history + + Returns: + PurgeStatus|None + """ + return self._purges_by_id.get(purge_id) + + @defer.inlineCallbacks + def get_messages(self, requester, room_id=None, pagin_config=None, + as_client_event=True, event_filter=None): + """Get messages in a room. + + Args: + requester (Requester): The user requesting messages. + room_id (str): The room they want messages from. + pagin_config (synapse.api.streams.PaginationConfig): The pagination + config rules to apply, if any. + as_client_event (bool): True to get events in client-server format. + event_filter (Filter): Filter to apply to results or None + Returns: + dict: Pagination API results + """ + user_id = requester.user.to_string() + + if pagin_config.from_token: + room_token = pagin_config.from_token.room_key + else: + pagin_config.from_token = ( + yield self.hs.get_event_sources().get_current_token_for_room( + room_id=room_id + ) + ) + room_token = pagin_config.from_token.room_key + + room_token = RoomStreamToken.parse(room_token) + + pagin_config.from_token = pagin_config.from_token.copy_and_replace( + "room_key", str(room_token) + ) + + source_config = pagin_config.get_source_config("room") + + with (yield self.pagination_lock.read(room_id)): + membership, member_event_id = yield self.auth.check_in_room_or_world_readable( + room_id, user_id + ) + + if source_config.direction == 'b': + # if we're going backwards, we might need to backfill. This + # requires that we have a topo token. + if room_token.topological: + max_topo = room_token.topological + else: + max_topo = yield self.store.get_max_topological_token( + room_id, room_token.stream + ) + + if membership == Membership.LEAVE: + # If they have left the room then clamp the token to be before + # they left the room, to save the effort of loading from the + # database. + leave_token = yield self.store.get_topological_token_for_event( + member_event_id + ) + leave_token = RoomStreamToken.parse(leave_token) + if leave_token.topological < max_topo: + source_config.from_key = str(leave_token) + + yield self.hs.get_handlers().federation_handler.maybe_backfill( + room_id, max_topo + ) + + events, next_key = yield self.store.paginate_room_events( + room_id=room_id, + from_key=source_config.from_key, + to_key=source_config.to_key, + direction=source_config.direction, + limit=source_config.limit, + event_filter=event_filter, + ) + + next_token = pagin_config.from_token.copy_and_replace( + "room_key", next_key + ) + + if not events: + defer.returnValue({ + "chunk": [], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + }) + + if event_filter: + events = event_filter.filter(events) + + events = yield filter_events_for_client( + self.store, + user_id, + events, + is_peeking=(member_event_id is None), + ) + + time_now = self.clock.time_msec() + + chunk = { + "chunk": [ + serialize_event(e, time_now, as_client_event) + for e in events + ], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + } + + defer.returnValue(chunk) diff --git a/synapse/server.py b/synapse/server.py index a24ea158df..83eacccc29 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -52,11 +52,8 @@ from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.groups_local import GroupsLocalHandler from synapse.handlers.initial_sync import InitialSyncHandler -from synapse.handlers.message import ( - EventCreationHandler, - MessageHandler, - PaginationHandler, -) +from synapse.handlers.message import EventCreationHandler, MessageHandler +from synapse.handlers.pagination import PaginationHandler from synapse.handlers.presence import PresenceHandler from synapse.handlers.profile import ProfileHandler from synapse.handlers.read_marker import ReadMarkerHandler From 4fc52b10378b090fcf96747d8a5f2b261c2d3543 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 13:20:43 +0100 Subject: [PATCH 07/29] Update docs/workers.rst --- docs/workers.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/workers.rst b/docs/workers.rst index 1d521b9ec5..c5b37c3ded 100644 --- a/docs/workers.rst +++ b/docs/workers.rst @@ -206,6 +206,10 @@ Handles client API endpoints. It can handle REST endpoints matching the following regular expressions:: ^/_matrix/client/(api/v1|r0|unstable)/publicRooms$ + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/joined_members$ + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/context/.*$ + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/members$ + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$ ``synapse.app.user_dir`` ~~~~~~~~~~~~~~~~~~~~~~~~ From dae6dc1e776cc6198dc1f71575876fc16693c170 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 23 Jul 2018 17:13:34 +0100 Subject: [PATCH 08/29] Remove redundant checks on room forgottenness Fixes #3550 --- changelog.d/3350.misc | 1 + synapse/storage/push_rule.py | 13 ------------- synapse/storage/roommember.py | 15 --------------- synapse/visibility.py | 19 +------------------ 4 files changed, 2 insertions(+), 46 deletions(-) create mode 100644 changelog.d/3350.misc diff --git a/changelog.d/3350.misc b/changelog.d/3350.misc new file mode 100644 index 0000000000..3713cd6d63 --- /dev/null +++ b/changelog.d/3350.misc @@ -0,0 +1 @@ +Remove redundant checks on who_forgot_in_room \ No newline at end of file diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index be655d287b..d25b39ec02 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -21,7 +21,6 @@ from canonicaljson import json from twisted.internet import defer -from synapse.api.constants import EventTypes from synapse.push.baserules import list_with_base_rules from synapse.storage.appservice import ApplicationServiceWorkerStore from synapse.storage.pusher import PusherWorkerStore @@ -247,18 +246,6 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore, if uid in local_users_in_room: user_ids.add(uid) - forgotten = yield self.who_forgot_in_room( - event.room_id, on_invalidate=cache_context.invalidate, - ) - - for row in forgotten: - user_id = row["user_id"] - event_id = row["event_id"] - - mem_id = current_state_ids.get((EventTypes.Member, user_id), None) - if event_id == mem_id: - user_ids.discard(user_id) - rules_by_user = yield self.bulk_get_push_rules( user_ids, on_invalidate=cache_context.invalidate, ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 02a802bed9..9eb5b18144 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -458,18 +458,6 @@ class RoomMemberWorkerStore(EventsWorkerStore): def _get_joined_hosts_cache(self, room_id): return _JoinedHostsCache(self, room_id) - @cached() - def who_forgot_in_room(self, room_id): - return self._simple_select_list( - table="room_memberships", - retcols=("user_id", "event_id"), - keyvalues={ - "room_id": room_id, - "forgotten": 1, - }, - desc="who_forgot" - ) - class RoomMemberStore(RoomMemberWorkerStore): def __init__(self, db_conn, hs): @@ -578,9 +566,6 @@ class RoomMemberStore(RoomMemberWorkerStore): txn.execute(sql, (user_id, room_id)) txn.call_after(self.did_forget.invalidate, (user_id, room_id)) - self._invalidate_cache_and_stream( - txn, self.who_forgot_in_room, (room_id,) - ) return self.runInteraction("forget_membership", f) @cachedInlineCallbacks(num_args=2) diff --git a/synapse/visibility.py b/synapse/visibility.py index 9b97ea2b83..e0b2eccceb 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -23,7 +23,6 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.events.utils import prune_event from synapse.types import get_domain_from_id -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn logger = logging.getLogger(__name__) @@ -75,19 +74,6 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, types=types, ) - forgotten = yield make_deferred_yieldable(defer.gatherResults([ - defer.maybeDeferred( - preserve_fn(store.who_forgot_in_room), - room_id, - ) - for room_id in frozenset(e.room_id for e in events) - ], consumeErrors=True)) - - # Set of membership event_ids that have been forgotten - event_id_forgotten = frozenset( - row["event_id"] for rows in forgotten for row in rows - ) - ignore_dict_content = yield store.get_global_account_data_by_type_for_user( "m.ignored_user_list", user_id, ) @@ -176,10 +162,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, if membership is None: membership_event = state.get((EventTypes.Member, user_id), None) if membership_event: - # XXX why do we do this? - # https://github.com/matrix-org/synapse/issues/3350 - if membership_event.event_id not in event_id_forgotten: - membership = membership_event.membership + membership = membership_event.membership # if the user was a member of the room at the time of the event, # they can see it. From 5c705f70c9489427a7985ea10ec60552965b9a1c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 23 Jul 2018 19:00:16 +0100 Subject: [PATCH 09/29] Fixes and optimisations for resolve_state_groups First of all, fix the logic which looks for identical input state groups so that we actually use them. This turned out to be most easily done by factoring the relevant code out to a separate function so that we could do an early return. Secondly, avoid building the whole `conflicted_state` dict (which was only ever used as a boolean flag). Thirdly, replace the construction of the `state` dict (which mapped from keys to events that set them), with an optimistic construction of the resolution result assuming there will be no conflicts. This should be no slower than building the old `state` dict, and: - in the conflicted case, we'll short-cut it, saving part of the work - in the unconflicted case, it saves rebuilding the resolution from the `state` dict. Finally, do a couple of s/values/itervalues/. --- synapse/state.py | 143 +++++++++++++++++++++++++++++------------------ 1 file changed, 89 insertions(+), 54 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index 32125c95df..033f55d967 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -471,69 +471,39 @@ class StateResolutionHandler(object): "Resolving state for %s with %d groups", room_id, len(state_groups_ids) ) - # build a map from state key to the event_ids which set that state. - # dict[(str, str), set[str]) - state = {} + # start by assuming we won't have any conflicted state, and build up the new + # state map by iterating through the state groups. If we discover a conflict, + # we give up and instead use `resolve_events_with_factory`. + # + # XXX: is this actually worthwhile, or should we just let + # resolve_events_with_factory do it? + new_state = {} + conflicted_state = False for st in itervalues(state_groups_ids): for key, e_id in iteritems(st): - state.setdefault(key, set()).add(e_id) - - # build a map from state key to the event_ids which set that state, - # including only those where there are state keys in conflict. - conflicted_state = { - k: list(v) - for k, v in iteritems(state) - if len(v) > 1 - } + if key in new_state: + conflicted_state = True + break + new_state[key] = e_id + if conflicted_state: + break if conflicted_state: logger.info("Resolving conflicted state for %r", room_id) with Measure(self.clock, "state._resolve_events"): new_state = yield resolve_events_with_factory( - list(state_groups_ids.values()), + list(itervalues(state_groups_ids)), event_map=event_map, state_map_factory=state_map_factory, ) - else: - new_state = { - key: e_ids.pop() for key, e_ids in iteritems(state) - } + + # if the new state matches any of the input state groups, we can + # use that state group again. Otherwise we will generate a state_id + # which will be used as a cache key for future resolutions, but + # not get persisted. with Measure(self.clock, "state.create_group_ids"): - # if the new state matches any of the input state groups, we can - # use that state group again. Otherwise we will generate a state_id - # which will be used as a cache key for future resolutions, but - # not get persisted. - state_group = None - new_state_event_ids = frozenset(itervalues(new_state)) - for sg, events in iteritems(state_groups_ids): - if new_state_event_ids == frozenset(e_id for e_id in events): - state_group = sg - break - - # TODO: We want to create a state group for this set of events, to - # increase cache hits, but we need to make sure that it doesn't - # end up as a prev_group without being added to the database - - prev_group = None - delta_ids = None - for old_group, old_ids in iteritems(state_groups_ids): - if not set(new_state) - set(old_ids): - n_delta_ids = { - k: v - for k, v in iteritems(new_state) - if old_ids.get(k) != v - } - if not delta_ids or len(n_delta_ids) < len(delta_ids): - prev_group = old_group - delta_ids = n_delta_ids - - cache = _StateCacheEntry( - state=new_state, - state_group=state_group, - prev_group=prev_group, - delta_ids=delta_ids, - ) + cache = _make_state_cache_entry(new_state, state_groups_ids) if self._state_cache is not None: self._state_cache[group_names] = cache @@ -541,6 +511,70 @@ class StateResolutionHandler(object): defer.returnValue(cache) +def _make_state_cache_entry( + new_state, + state_groups_ids, +): + """Given a resolved state, and a set of input state groups, pick one to base + a new state group on (if any), and return an appropriately-constructed + _StateCacheEntry. + + Args: + new_state (dict[(str, str), str]): resolved state map (mapping from + (type, state_key) to event_id) + + state_groups_ids (dict[int, dict[(str, str), str]]): + map from state group id to the state in that state group + (where 'state' is a map from state key to event id) + + Returns: + _StateCacheEntry + """ + # if the new state matches any of the input state groups, we can + # use that state group again. Otherwise we will generate a state_id + # which will be used as a cache key for future resolutions, but + # not get persisted. + + # first look for exact matches + new_state_event_ids = set(itervalues(new_state)) + for sg, state in iteritems(state_groups_ids): + if len(new_state_event_ids) != len(state): + continue + + old_state_event_ids = set(itervalues(state)) + if new_state_event_ids == old_state_event_ids: + # got an exact match. + return _StateCacheEntry( + state=new_state, + state_group=sg, + ) + + # TODO: We want to create a state group for this set of events, to + # increase cache hits, but we need to make sure that it doesn't + # end up as a prev_group without being added to the database + + # failing that, look for the closest match. + prev_group = None + delta_ids = None + + for old_group, old_state in iteritems(state_groups_ids): + n_delta_ids = { + k: v + for k, v in iteritems(new_state) + if old_state.get(k) != v + } + if not delta_ids or len(n_delta_ids) < len(delta_ids): + prev_group = old_group + delta_ids = n_delta_ids + + return _StateCacheEntry( + state=new_state, + state_group=None, + prev_group=prev_group, + delta_ids=delta_ids, + ) + + def _ordered_events(events): def key_func(e): return -int(e.depth), hashlib.sha1(e.event_id.encode()).hexdigest() @@ -582,7 +616,7 @@ def _seperate(state_sets): with them in different state sets. Args: - state_sets(list[dict[(str, str), str]]): + state_sets(iterable[dict[(str, str), str]]): List of dicts of (type, state_key) -> event_id, which are the different state groups to resolve. @@ -596,10 +630,11 @@ def _seperate(state_sets): conflicted_state is a dict mapping (type, state_key) to a set of event ids for conflicted state keys. """ - unconflicted_state = dict(state_sets[0]) + state_set_iterator = iter(state_sets) + unconflicted_state = dict(next(state_set_iterator)) conflicted_state = {} - for state_set in state_sets[1:]: + for state_set in state_set_iterator: for key, value in iteritems(state_set): # Check if there is an unconflicted entry for the state key. unconflicted_value = unconflicted_state.get(key) From cc99256e90c620b01ae255db304170479bfe8523 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 23 Jul 2018 19:10:50 +0100 Subject: [PATCH 10/29] newsfile --- changelog.d/3586.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3586.misc diff --git a/changelog.d/3586.misc b/changelog.d/3586.misc new file mode 100644 index 0000000000..e853e2481b --- /dev/null +++ b/changelog.d/3586.misc @@ -0,0 +1 @@ +Fixes and optimisations for resolve_state_groups From c1f80effbe17b1572161cc50838e60b495fb45a4 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 23 Jul 2018 22:06:50 +0100 Subject: [PATCH 11/29] Handle delta_ids being None in _update_context_for_auth_events it's easier to create the new state group as a delta from the existing one. (There's an outside chance this will help with https://github.com/matrix-org/synapse/issues/3364) --- synapse/events/snapshot.py | 3 ++- synapse/handlers/federation.py | 13 ++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 189212b0fa..368b5f6ae4 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -249,7 +249,7 @@ class EventContext(object): @defer.inlineCallbacks def update_state(self, state_group, prev_state_ids, current_state_ids, - delta_ids): + prev_group, delta_ids): """Replace the state in the context """ @@ -260,6 +260,7 @@ class EventContext(object): self.state_group = state_group self._prev_state_ids = prev_state_ids + self.prev_group = prev_group self._current_state_ids = current_state_ids self.delta_ids = delta_ids diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 14654d59f1..145c1a21d4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1980,10 +1980,6 @@ class FederationHandler(BaseHandler): current_state_ids.update(state_updates) - if context.delta_ids is not None: - delta_ids = dict(context.delta_ids) - delta_ids.update(state_updates) - prev_state_ids = yield context.get_prev_state_ids(self.store) prev_state_ids = dict(prev_state_ids) @@ -1991,11 +1987,13 @@ class FederationHandler(BaseHandler): k: a.event_id for k, a in iteritems(auth_events) }) + # create a new state group as a delta from the existing one. + prev_group = context.state_group state_group = yield self.store.store_state_group( event.event_id, event.room_id, - prev_group=context.prev_group, - delta_ids=delta_ids, + prev_group=prev_group, + delta_ids=state_updates, current_state_ids=current_state_ids, ) @@ -2003,7 +2001,8 @@ class FederationHandler(BaseHandler): state_group=state_group, current_state_ids=current_state_ids, prev_state_ids=prev_state_ids, - delta_ids=delta_ids, + prev_group=prev_group, + delta_ids=state_updates, ) @defer.inlineCallbacks From ce0c18dec5d04626018b8e401273c8f30e88b0c7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 23 Jul 2018 22:13:19 +0100 Subject: [PATCH 12/29] Improve logging for exceptions handling PDUs when we get an exception handling a federation PDU, log the whole stacktrace. --- synapse/federation/federation_server.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 48f26db67c..e501251b6e 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -24,6 +24,7 @@ from prometheus_client import Counter from twisted.internet import defer from twisted.internet.abstract import isIPAddress +from twisted.python import failure from synapse.api.constants import EventTypes from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError @@ -186,8 +187,12 @@ class FederationServer(FederationBase): logger.warn("Error handling PDU %s: %s", event_id, e) pdu_results[event_id] = {"error": str(e)} except Exception as e: + f = failure.Failure() pdu_results[event_id] = {"error": str(e)} - logger.exception("Failed to handle PDU %s", event_id) + logger.error( + "Failed to handle PDU %s: %s", + event_id, f.getTraceback().rstrip(), + ) yield async.concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), @@ -203,8 +208,8 @@ class FederationServer(FederationBase): ) pdu_failures = getattr(transaction, "pdu_failures", []) - for failure in pdu_failures: - logger.info("Got failure %r", failure) + for fail in pdu_failures: + logger.info("Got failure %r", fail) response = { "pdus": pdu_results, From d8709df7391901ac13ed57375b986d1b9de583ea Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 23 Jul 2018 22:16:22 +0100 Subject: [PATCH 13/29] changelog --- changelog.d/3587.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3587.misc diff --git a/changelog.d/3587.misc b/changelog.d/3587.misc new file mode 100644 index 0000000000..75a3479910 --- /dev/null +++ b/changelog.d/3587.misc @@ -0,0 +1 @@ +Improve logging for exceptions when handling PDUs \ No newline at end of file From 1938cffaea465375d5dd410baa94a6b9b6ded0cc Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 23 Jul 2018 23:48:19 +0100 Subject: [PATCH 14/29] Add some measure blocks to persist_events ... to help us figure out where 40% of CPU is going --- synapse/storage/events.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c2910094d0..d13d099d1e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -417,19 +417,23 @@ class EventsStore(EventsWorkerStore): logger.info( "Calculating state delta for room %s", room_id, ) - current_state = yield self._get_new_state_after_events( - room_id, - ev_ctx_rm, - latest_event_ids, - new_latest_event_ids, - ) + + with Measure("persist_events.get_new_state_after_events"): + current_state = yield self._get_new_state_after_events( + room_id, + ev_ctx_rm, + latest_event_ids, + new_latest_event_ids, + ) + if current_state is not None: current_state_for_room[room_id] = current_state - delta = yield self._calculate_state_delta( - room_id, current_state, - ) - if delta is not None: - state_delta_for_room[room_id] = delta + with Measure("persist_events.calculate_state_delta"): + delta = yield self._calculate_state_delta( + room_id, current_state, + ) + if delta is not None: + state_delta_for_room[room_id] = delta yield self.runInteraction( "persist_events", From 69fb5dbdabf50dffe34e2a0d012b5a227fc16c55 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 24 Jul 2018 00:04:44 +0100 Subject: [PATCH 15/29] fix idiocy --- synapse/storage/events.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d13d099d1e..392935cacf 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -418,7 +418,10 @@ class EventsStore(EventsWorkerStore): "Calculating state delta for room %s", room_id, ) - with Measure("persist_events.get_new_state_after_events"): + with Measure( + self._clock, + "persist_events.get_new_state_after_events", + ): current_state = yield self._get_new_state_after_events( room_id, ev_ctx_rm, @@ -428,7 +431,10 @@ class EventsStore(EventsWorkerStore): if current_state is not None: current_state_for_room[room_id] = current_state - with Measure("persist_events.calculate_state_delta"): + with Measure( + self._clock, + "persist_events.calculate_state_delta", + ): delta = yield self._calculate_state_delta( room_id, current_state, ) From 30957a941a9a48b9afcfadbabdd4118b08abd81f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 24 Jul 2018 00:24:38 +0100 Subject: [PATCH 16/29] changelog --- changelog.d/3590.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3590.misc diff --git a/changelog.d/3590.misc b/changelog.d/3590.misc new file mode 100644 index 0000000000..0f1688fd0f --- /dev/null +++ b/changelog.d/3590.misc @@ -0,0 +1 @@ +Add some measure blocks to persist_events From 8dff6e0322718ec9c446465c1e10ab331a417b8a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 24 Jul 2018 00:37:17 +0100 Subject: [PATCH 17/29] Logcontext fixes Fix some random logcontext leaks. --- synapse/handlers/initial_sync.py | 28 +++++++++++++++------------- synapse/storage/events.py | 5 +++-- synapse/storage/pusher.py | 2 +- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index fb11716eb8..50b13d8820 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -387,19 +387,21 @@ class InitialSyncHandler(BaseHandler): receipts = [] defer.returnValue(receipts) - presence, receipts, (messages, token) = yield defer.gatherResults( - [ - run_in_background(get_presence), - run_in_background(get_receipts), - run_in_background( - self.store.get_recent_events_for_room, - room_id, - limit=limit, - end_token=now_token.room_key, - ) - ], - consumeErrors=True, - ).addErrback(unwrapFirstError) + presence, receipts, (messages, token) = yield make_deferred_yieldable( + defer.gatherResults( + [ + run_in_background(get_presence), + run_in_background(get_receipts), + run_in_background( + self.store.get_recent_events_for_room, + room_id, + limit=limit, + end_token=now_token.room_key, + ) + ], + consumeErrors=True, + ).addErrback(unwrapFirstError), + ) messages = yield filter_events_for_client( self.store, user_id, messages, is_peeking=is_peeking, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c2910094d0..c06dbb3768 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -39,7 +39,7 @@ from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util.async import ObservableDeferred from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.frozenutils import frozendict_json_encoder -from synapse.util.logcontext import make_deferred_yieldable +from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable from synapse.util.logutils import log_function from synapse.util.metrics import Measure @@ -147,7 +147,8 @@ class _EventPeristenceQueue(object): # callbacks on the deferred. try: ret = yield per_item_callback(item) - item.deferred.callback(ret) + with PreserveLoggingContext(): + item.deferred.callback(ret) except Exception: item.deferred.errback() finally: diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index cc273a57b2..8443bd4c1b 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -233,7 +233,7 @@ class PusherStore(PusherWorkerStore): ) if newly_inserted: - self.runInteraction( + yield self.runInteraction( "add_pusher", self._invalidate_cache_and_stream, self.get_if_user_has_pusher, (user_id,) From c6e66821a991c45b8b6a78f00f52ae65304225ea Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 24 Jul 2018 00:38:39 +0100 Subject: [PATCH 18/29] Changelog --- changelog.d/3591.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3591.misc diff --git a/changelog.d/3591.misc b/changelog.d/3591.misc new file mode 100644 index 0000000000..f0137766a0 --- /dev/null +++ b/changelog.d/3591.misc @@ -0,0 +1 @@ +Fix some random logcontext leaks. \ No newline at end of file From cf2d15c6a953d42207fb2c8fe5dc57ee7fdae7ce Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 24 Jul 2018 00:57:48 +0100 Subject: [PATCH 19/29] another couple of logcontext leaks --- synapse/handlers/appservice.py | 5 ++++- synapse/handlers/initial_sync.py | 10 ++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index ec9fe01a5a..ee41aed69e 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -23,6 +23,7 @@ from twisted.internet import defer import synapse from synapse.api.constants import EventTypes +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.metrics import Measure @@ -106,7 +107,9 @@ class ApplicationServicesHandler(object): yield self._check_user_exists(event.state_key) if not self.started_scheduler: - self.scheduler.start().addErrback(log_failure) + def start_scheduler(): + return self.scheduler.start().addErrback(log_failure) + run_as_background_process("as_scheduler", start_scheduler) self.started_scheduler = True # Fork off pushes to these services diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 50b13d8820..40e7580a61 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -148,13 +148,15 @@ class InitialSyncHandler(BaseHandler): try: if event.membership == Membership.JOIN: room_end_token = now_token.room_key - deferred_room_state = self.state_handler.get_current_state( - event.room_id + deferred_room_state = run_in_background( + self.state_handler.get_current_state, + event.room_id, ) elif event.membership == Membership.LEAVE: room_end_token = "s%d" % (event.stream_ordering,) - deferred_room_state = self.store.get_state_for_events( - [event.event_id], None + deferred_room_state = run_in_background( + self.store.get_state_for_events, + [event.event_id], None, ) deferred_room_state.addCallback( lambda states: states[event.event_id] From ff5426f6b8263b416f412725a6d7be2fac284824 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 10:55:11 +0100 Subject: [PATCH 20/29] Speed up _calculate_state_delta --- synapse/storage/events.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c2910094d0..1b075e6cc6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -644,21 +644,14 @@ class EventsStore(EventsWorkerStore): """ existing_state = yield self.get_current_state_ids(room_id) - existing_events = set(itervalues(existing_state)) - new_events = set(ev_id for ev_id in itervalues(current_state)) - changed_events = existing_events ^ new_events - - if not changed_events: - return - to_delete = { key: ev_id for key, ev_id in iteritems(existing_state) - if ev_id in changed_events + if ev_id != current_state.get(key) } - events_to_insert = (new_events - existing_events) + to_insert = { key: ev_id for key, ev_id in iteritems(current_state) - if ev_id in events_to_insert + if ev_id != existing_state.get(key) } defer.returnValue((to_delete, to_insert)) From 2581eb3e1d9f7d767315432f2c2e381f31a8716c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 11:14:28 +0100 Subject: [PATCH 21/29] Newsfile --- changelog.d/3592.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3592.misc diff --git a/changelog.d/3592.misc b/changelog.d/3592.misc new file mode 100644 index 0000000000..60129569c2 --- /dev/null +++ b/changelog.d/3592.misc @@ -0,0 +1 @@ +Speed up calculating state deltas in persist_event loop From 0fa73e4a63d6cd5d402ea5213eac6b766e650321 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 11:19:23 +0100 Subject: [PATCH 22/29] Remove unnecessary if --- synapse/storage/events.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 1b075e6cc6..cc8ab5165b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -428,8 +428,7 @@ class EventsStore(EventsWorkerStore): delta = yield self._calculate_state_delta( room_id, current_state, ) - if delta is not None: - state_delta_for_room[room_id] = delta + state_delta_for_room[room_id] = delta yield self.runInteraction( "persist_events", From 223341205edf725c5583a73d5680245a24790452 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 11:59:16 +0100 Subject: [PATCH 23/29] Don't require to_delete to have event_ids --- synapse/storage/events.py | 91 +++++++++++++++++++++++---------------- 1 file changed, 55 insertions(+), 36 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 19c05dc8d6..87d0b78a2d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,7 +19,7 @@ import logging from collections import OrderedDict, deque, namedtuple from functools import wraps -from six import iteritems, itervalues +from six import iteritems from six.moves import range from canonicaljson import json @@ -347,8 +347,9 @@ class EventsStore(EventsWorkerStore): # state in each room after adding these events current_state_for_room = {} - # map room_id->(to_delete, to_insert) where each entry is - # a map (type,key)->event_id giving the state delta in each + # map room_id->(to_delete, to_insert) where to_delete is a list + # of type/state keys to remove from current state, and to_insert + # is a map (type,key)->event_id giving the state delta in each # room state_delta_for_room = {} @@ -647,17 +648,16 @@ class EventsStore(EventsWorkerStore): Assumes that we are only persisting events for one room at a time. Returns: - 2-tuple (to_delete, to_insert) where both are state dicts, - i.e. (type, state_key) -> event_id. `to_delete` are the entries to - first be deleted from current_state_events, `to_insert` are entries - to insert. + tuple[list, dict] (to_delete, to_insert): where to_delete are the + type/state_keys to remove from current_state_events and `to_insert` + are the updates to current_state_events. """ existing_state = yield self.get_current_state_ids(room_id) - to_delete = { - key: ev_id for key, ev_id in iteritems(existing_state) - if ev_id != current_state.get(key) - } + to_delete = [ + key for key, ev_id in iteritems(existing_state) + if key not in current_state + ] to_insert = { key: ev_id for key, ev_id in iteritems(current_state) @@ -684,10 +684,10 @@ class EventsStore(EventsWorkerStore): delete_existing (bool): True to purge existing table rows for the events from the database. This is useful when retrying due to IntegrityError. - state_delta_for_room (dict[str, (list[str], list[str])]): + state_delta_for_room (dict[str, (list, dict)]): The current-state delta for each room. For each room, a tuple - (to_delete, to_insert), being a list of event ids to be removed - from the current state, and a list of event ids to be added to + (to_delete, to_insert), being a list of type/state keys to be + removed from the current state, and a state set to be added to the current state. new_forward_extremeties (dict[str, list[str]]): The new forward extremities for each room. For each room, a @@ -765,9 +765,46 @@ class EventsStore(EventsWorkerStore): def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): for room_id, current_state_tuple in iteritems(state_delta_by_room): to_delete, to_insert = current_state_tuple + + # First we add entries to the current_state_delta_stream. We + # do this before updating the current_state_events table so + # that we can use it to calculate the `prev_event_id`. (This + # allows us to not have to pull out the existing state + # unnecessarily). + sql = """ + INSERT INTO current_state_delta_stream + (stream_id, room_id, type, state_key, event_id, prev_event_id) + SELECT ?, ?, ?, ?, ?, ( + SELECT event_id FROM current_state_events + WHERE room_id = ? AND type = ? AND state_key = ? + ) + """ + txn.executemany(sql, ( + ( + max_stream_order, room_id, etype, state_key, None, + room_id, etype, state_key, + ) + for etype, state_key in to_delete + # We sanity check that we're deleting rather than updating + if (etype, state_key) not in to_insert + )) + txn.executemany(sql, ( + ( + max_stream_order, room_id, etype, state_key, ev_id, + room_id, etype, state_key, + ) + for (etype, state_key), ev_id in iteritems(to_insert) + )) + + # Now we actually update the current_state_events table + txn.executemany( - "DELETE FROM current_state_events WHERE event_id = ?", - [(ev_id,) for ev_id in itervalues(to_delete)], + "DELETE FROM current_state_events" + " WHERE room_id = ? AND type = ? AND state_key = ?", + ( + (room_id, etype, state_key) + for etype, state_key in itertools.chain(to_delete, to_insert) + ), ) self._simple_insert_many_txn( @@ -784,25 +821,6 @@ class EventsStore(EventsWorkerStore): ], ) - state_deltas = {key: None for key in to_delete} - state_deltas.update(to_insert) - - self._simple_insert_many_txn( - txn, - table="current_state_delta_stream", - values=[ - { - "stream_id": max_stream_order, - "room_id": room_id, - "type": key[0], - "state_key": key[1], - "event_id": ev_id, - "prev_event_id": to_delete.get(key, None), - } - for key, ev_id in iteritems(state_deltas) - ] - ) - txn.call_after( self._curr_state_delta_stream_cache.entity_has_changed, room_id, max_stream_order, @@ -816,7 +834,8 @@ class EventsStore(EventsWorkerStore): # and which we have added, then we invlidate the caches for all # those users. members_changed = set( - state_key for ev_type, state_key in state_deltas + state_key + for ev_type, state_key in itertools.chain(to_delete, to_insert) if ev_type == EventTypes.Member ) From a79410e7b8dce3fb8b4b40901a66fd9fde9c47a4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 11:13:47 +0100 Subject: [PATCH 24/29] Have _get_new_state_after_events return delta If we have a delta from the existing to new current state, then we can reuse that rather than manually working it out by fetching both lots of state. --- synapse/storage/events.py | 63 +++++++++++++++++++++++++++++---------- 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 87d0b78a2d..6d2aae9c4a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -344,7 +344,9 @@ class EventsStore(EventsWorkerStore): new_forward_extremeties = {} # map room_id->(type,state_key)->event_id tracking the full - # state in each room after adding these events + # state in each room after adding these events. + # This is simply used to prefill the get_current_state_ids + # cache current_state_for_room = {} # map room_id->(to_delete, to_insert) where to_delete is a list @@ -419,28 +421,40 @@ class EventsStore(EventsWorkerStore): logger.info( "Calculating state delta for room %s", room_id, ) - with Measure( - self._clock, - "persist_events.get_new_state_after_events", + self._clock, + "persist_events.get_new_state_after_events", ): - current_state = yield self._get_new_state_after_events( + res = yield self._get_new_state_after_events( room_id, ev_ctx_rm, latest_event_ids, new_latest_event_ids, ) + current_state, delta_ids = res - if current_state is not None: - current_state_for_room[room_id] = current_state + # If either are not None then there has been a change, + # and we need to work out the delta (or use that + # given) + if delta_ids is not None: + # If there is a delta we know that we've + # only added or replaced state, never + # removed keys entirely. + state_delta_for_room[room_id] = ([], delta_ids) + elif current_state is not None: with Measure( - self._clock, - "persist_events.calculate_state_delta", + self._clock, + "persist_events.calculate_state_delta", ): delta = yield self._calculate_state_delta( room_id, current_state, ) - state_delta_for_room[room_id] = delta + state_delta_for_room[room_id] = delta + + # If we have the current_state then lets prefill + # the cache with it. + if current_state is not None: + current_state_for_room[room_id] = current_state yield self.runInteraction( "persist_events", @@ -539,9 +553,10 @@ class EventsStore(EventsWorkerStore): the new forward extremities for the room. Returns: - Deferred[dict[(str,str), str]|None]: - None if there are no changes to the room state, or - a dict of (type, state_key) -> event_id]. + Deferred[tuple[dict[(str,str), str]|None, dict[(str,str), str]|None]]: + Returns a tuple of two state maps, the first being the full new current + state and the second being the delta to the existing current state. + If both are None then there has been no change. """ if not new_latest_event_ids: @@ -549,6 +564,9 @@ class EventsStore(EventsWorkerStore): # map from state_group to ((type, key) -> event_id) state map state_groups_map = {} + + state_group_deltas = {} + for ev, ctx in events_context: if ctx.state_group is None: # I don't think this can happen, but let's double-check @@ -567,6 +585,9 @@ class EventsStore(EventsWorkerStore): if current_state_ids is not None: state_groups_map[ctx.state_group] = current_state_ids + if ctx.prev_group: + state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids + # We need to map the event_ids to their state groups. First, let's # check if the event is one we're persisting, in which case we can # pull the state group from its context. @@ -608,7 +629,7 @@ class EventsStore(EventsWorkerStore): # If they old and new groups are the same then we don't need to do # anything. if old_state_groups == new_state_groups: - return + defer.returnValue((None, None)) # Now that we have calculated new_state_groups we need to get # their state IDs so we can resolve to a single state set. @@ -620,7 +641,17 @@ class EventsStore(EventsWorkerStore): if len(new_state_groups) == 1: # If there is only one state group, then we know what the current # state is. - defer.returnValue(state_groups_map[new_state_groups.pop()]) + new_state_group = new_state_groups.pop() + + delta_ids = None + if len(old_state_groups) == 1: + old_state_group = old_state_groups.pop() + + delta_ids = state_group_deltas.get( + (old_state_group, new_state_group,), None + ) + + defer.returnValue((state_groups_map[new_state_group], delta_ids)) # Ok, we need to defer to the state handler to resolve our state sets. @@ -639,7 +670,7 @@ class EventsStore(EventsWorkerStore): room_id, state_groups, events_map, get_events ) - defer.returnValue(res.state) + defer.returnValue((res.state, None)) @defer.inlineCallbacks def _calculate_state_delta(self, room_id, current_state): From 811ac73a42db16db7211a7517ad5651a1f6aee71 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 13:40:42 +0100 Subject: [PATCH 25/29] Don't fetch state from the database unless needed --- synapse/storage/events.py | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 6d2aae9c4a..eb8bbe13ab 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -631,6 +631,24 @@ class EventsStore(EventsWorkerStore): if old_state_groups == new_state_groups: defer.returnValue((None, None)) + if len(new_state_groups) == 1: + # If there is only one state group, then we know what the current + # state is. + if len(old_state_groups) == 1: + new_state_group = next(iter(new_state_groups)) + old_state_group = next(iter(old_state_groups)) + + delta_ids = state_group_deltas.get( + (old_state_group, new_state_group,), None + ) + if delta_ids is not None: + # We have a delta from the existing to new current state, + # so lets just return that. If we happen to already have + # the current state in memory then lets also return that, + # but it doesn't matter if we don't. + new_state = state_groups_map.get(new_state_group) + defer.returnValue((new_state, delta_ids)) + # Now that we have calculated new_state_groups we need to get # their state IDs so we can resolve to a single state set. missing_state = new_state_groups - set(state_groups_map) @@ -639,19 +657,7 @@ class EventsStore(EventsWorkerStore): state_groups_map.update(group_to_state) if len(new_state_groups) == 1: - # If there is only one state group, then we know what the current - # state is. - new_state_group = new_state_groups.pop() - - delta_ids = None - if len(old_state_groups) == 1: - old_state_group = old_state_groups.pop() - - delta_ids = state_group_deltas.get( - (old_state_group, new_state_group,), None - ) - - defer.returnValue((state_groups_map[new_state_group], delta_ids)) + defer.returnValue((state_groups_map[new_state_groups.pop()], None)) # Ok, we need to defer to the state handler to resolve our state sets. From f33c596533e46335a3fc4d02236087412615ee1a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 14:28:04 +0100 Subject: [PATCH 26/29] Newsfile --- changelog.d/3595.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3595.misc diff --git a/changelog.d/3595.misc b/changelog.d/3595.misc new file mode 100644 index 0000000000..85903504cc --- /dev/null +++ b/changelog.d/3595.misc @@ -0,0 +1 @@ +Attempt to reduce amount of state pulled out of DB during persist_events From ed0dd68731fa1549b4ea9b2dab08c2afb6ccb64e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 14:31:38 +0100 Subject: [PATCH 27/29] Fixup comment (and indent) --- synapse/storage/events.py | 36 ++++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index eb8bbe13ab..76cfbc90fe 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -565,6 +565,7 @@ class EventsStore(EventsWorkerStore): # map from state_group to ((type, key) -> event_id) state map state_groups_map = {} + # Map from (prev state group, new state group) -> delta state dict state_group_deltas = {} for ev, ctx in events_context: @@ -631,23 +632,24 @@ class EventsStore(EventsWorkerStore): if old_state_groups == new_state_groups: defer.returnValue((None, None)) - if len(new_state_groups) == 1: - # If there is only one state group, then we know what the current - # state is. - if len(old_state_groups) == 1: - new_state_group = next(iter(new_state_groups)) - old_state_group = next(iter(old_state_groups)) + if len(new_state_groups) == 1 and len(old_state_groups) == 1: + # If we're going from one state group to another, lets check if + # we have a delta for that transition. If we do then we can just + # return that. - delta_ids = state_group_deltas.get( - (old_state_group, new_state_group,), None - ) - if delta_ids is not None: - # We have a delta from the existing to new current state, - # so lets just return that. If we happen to already have - # the current state in memory then lets also return that, - # but it doesn't matter if we don't. - new_state = state_groups_map.get(new_state_group) - defer.returnValue((new_state, delta_ids)) + new_state_group = next(iter(new_state_groups)) + old_state_group = next(iter(old_state_groups)) + + delta_ids = state_group_deltas.get( + (old_state_group, new_state_group,), None + ) + if delta_ids is not None: + # We have a delta from the existing to new current state, + # so lets just return that. If we happen to already have + # the current state in memory then lets also return that, + # but it doesn't matter if we don't. + new_state = state_groups_map.get(new_state_group) + defer.returnValue((new_state, delta_ids)) # Now that we have calculated new_state_groups we need to get # their state IDs so we can resolve to a single state set. @@ -657,6 +659,8 @@ class EventsStore(EventsWorkerStore): state_groups_map.update(group_to_state) if len(new_state_groups) == 1: + # If there is only one state group, then we know what the current + # state is. defer.returnValue((state_groups_map[new_state_groups.pop()], None)) # Ok, we need to defer to the state handler to resolve our state sets. From 8f65ab98d2ab35bbb29c23cc7978d54b12fd3db0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 15:08:01 +0100 Subject: [PATCH 28/29] Remove unnecessary iteritems --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 76cfbc90fe..9c94d3454f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -696,7 +696,7 @@ class EventsStore(EventsWorkerStore): existing_state = yield self.get_current_state_ids(room_id) to_delete = [ - key for key, ev_id in iteritems(existing_state) + key for key in existing_state if key not in current_state ] From 709c309b0e1756afd8b4514b95b0e901c567a0f1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 15:12:50 +0100 Subject: [PATCH 29/29] Expand on docstring comment about return value --- synapse/storage/events.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 9c94d3454f..906a405031 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -557,6 +557,11 @@ class EventsStore(EventsWorkerStore): Returns a tuple of two state maps, the first being the full new current state and the second being the delta to the existing current state. If both are None then there has been no change. + + If there has been a change then we only return the delta if its + already been calculated. Conversely if we do know the delta then + the new current state is only returned if we've already calculated + it. """ if not new_latest_event_ids: