From e9f3de0baba9be63b77fdaff996274e0abed8ec4 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 15 May 2020 09:32:13 -0400 Subject: [PATCH] Update the room member handler to use async/await. (#7507) --- changelog.d/7507.misc | 1 + synapse/handlers/room_member.py | 111 +++++++++++-------------- synapse/handlers/room_member_worker.py | 21 ++--- 3 files changed, 59 insertions(+), 74 deletions(-) create mode 100644 changelog.d/7507.misc diff --git a/changelog.d/7507.misc b/changelog.d/7507.misc new file mode 100644 index 0000000000..afc7a730b3 --- /dev/null +++ b/changelog.d/7507.misc @@ -0,0 +1 @@ +Convert the room member handler to async/await. diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ccc9659454..4ddeba4c97 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -20,8 +20,6 @@ import logging from six.moves import http_client -from twisted.internet import defer - from synapse import types from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError @@ -76,7 +74,7 @@ class RoomMemberHandler(object): self.base_handler = BaseHandler(hs) @abc.abstractmethod - def _remote_join(self, requester, remote_room_hosts, room_id, user, content): + async def _remote_join(self, requester, remote_room_hosts, room_id, user, content): """Try and join a room that this server is not in Args: @@ -94,7 +92,7 @@ class RoomMemberHandler(object): raise NotImplementedError() @abc.abstractmethod - def _remote_reject_invite( + async def _remote_reject_invite( self, requester, remote_room_hosts, room_id, target, content ): """Attempt to reject an invite for a room this server is not in. If we @@ -115,7 +113,7 @@ class RoomMemberHandler(object): raise NotImplementedError() @abc.abstractmethod - def _user_joined_room(self, target, room_id): + async def _user_joined_room(self, target, room_id): """Notifies distributor on master process that the user has joined the room. @@ -124,12 +122,12 @@ class RoomMemberHandler(object): room_id (str) Returns: - Deferred|None + None """ raise NotImplementedError() @abc.abstractmethod - def _user_left_room(self, target, room_id): + async def _user_left_room(self, target, room_id): """Notifies distributor on master process that the user has left the room. @@ -138,7 +136,7 @@ class RoomMemberHandler(object): room_id (str) Returns: - Deferred|None + None """ raise NotImplementedError() @@ -214,8 +212,9 @@ class RoomMemberHandler(object): return event - @defer.inlineCallbacks - def copy_room_tags_and_direct_to_room(self, old_room_id, new_room_id, user_id): + async def copy_room_tags_and_direct_to_room( + self, old_room_id, new_room_id, user_id + ): """Copies the tags and direct room state from one room to another. Args: @@ -227,7 +226,7 @@ class RoomMemberHandler(object): Deferred[None] """ # Retrieve user account data for predecessor room - user_account_data, _ = yield self.store.get_account_data_for_user(user_id) + user_account_data, _ = await self.store.get_account_data_for_user(user_id) # Copy direct message state if applicable direct_rooms = user_account_data.get("m.direct", {}) @@ -240,17 +239,17 @@ class RoomMemberHandler(object): direct_rooms[key].append(new_room_id) # Save back to user's m.direct account data - yield self.store.add_account_data_for_user( + await self.store.add_account_data_for_user( user_id, "m.direct", direct_rooms ) break # Copy room tags if applicable - room_tags = yield self.store.get_tags_for_room(user_id, old_room_id) + room_tags = await self.store.get_tags_for_room(user_id, old_room_id) # Copy each room tag to the new room for tag, tag_content in room_tags.items(): - yield self.store.add_tag_to_room(user_id, new_room_id, tag, tag_content) + await self.store.add_tag_to_room(user_id, new_room_id, tag, tag_content) async def update_membership( self, @@ -487,8 +486,7 @@ class RoomMemberHandler(object): ) return res - @defer.inlineCallbacks - def transfer_room_state_on_room_upgrade(self, old_room_id, room_id): + async def transfer_room_state_on_room_upgrade(self, old_room_id, room_id): """Upon our server becoming aware of an upgraded room, either by upgrading a room ourselves or joining one, we can transfer over information from the previous room. @@ -506,30 +504,29 @@ class RoomMemberHandler(object): logger.info("Transferring room state from %s to %s", old_room_id, room_id) # Find all local users that were in the old room and copy over each user's state - users = yield self.store.get_users_in_room(old_room_id) - yield self.copy_user_state_on_room_upgrade(old_room_id, room_id, users) + users = await self.store.get_users_in_room(old_room_id) + await self.copy_user_state_on_room_upgrade(old_room_id, room_id, users) # Add new room to the room directory if the old room was there # Remove old room from the room directory - old_room = yield self.store.get_room(old_room_id) + old_room = await self.store.get_room(old_room_id) if old_room and old_room["is_public"]: - yield self.store.set_room_is_public(old_room_id, False) - yield self.store.set_room_is_public(room_id, True) + await self.store.set_room_is_public(old_room_id, False) + await self.store.set_room_is_public(room_id, True) # Transfer alias mappings in the room directory - yield self.store.update_aliases_for_room(old_room_id, room_id) + await self.store.update_aliases_for_room(old_room_id, room_id) # Check if any groups we own contain the predecessor room - local_group_ids = yield self.store.get_local_groups_for_room(old_room_id) + local_group_ids = await self.store.get_local_groups_for_room(old_room_id) for group_id in local_group_ids: # Add new the new room to those groups - yield self.store.add_room_to_group(group_id, room_id, old_room["is_public"]) + await self.store.add_room_to_group(group_id, room_id, old_room["is_public"]) # Remove the old room from those groups - yield self.store.remove_room_from_group(group_id, old_room_id) + await self.store.remove_room_from_group(group_id, old_room_id) - @defer.inlineCallbacks - def copy_user_state_on_room_upgrade(self, old_room_id, new_room_id, user_ids): + async def copy_user_state_on_room_upgrade(self, old_room_id, new_room_id, user_ids): """Copy user-specific information when they join a new room when that new room is the result of a room upgrade @@ -552,11 +549,11 @@ class RoomMemberHandler(object): for user_id in user_ids: try: # It is an upgraded room. Copy over old tags - yield self.copy_room_tags_and_direct_to_room( + await self.copy_room_tags_and_direct_to_room( old_room_id, new_room_id, user_id ) # Copy over push rules - yield self.store.copy_push_rules_from_room_to_room_for_user( + await self.store.copy_push_rules_from_room_to_room_for_user( old_room_id, new_room_id, user_id ) except Exception: @@ -639,8 +636,7 @@ class RoomMemberHandler(object): if prev_member_event.membership == Membership.JOIN: await self._user_left_room(target_user, room_id) - @defer.inlineCallbacks - def _can_guest_join(self, current_state_ids): + async def _can_guest_join(self, current_state_ids): """ Returns whether a guest can join a room based on its current state. """ @@ -648,7 +644,7 @@ class RoomMemberHandler(object): if not guest_access_id: return False - guest_access = yield self.store.get_event(guest_access_id) + guest_access = await self.store.get_event(guest_access_id) return ( guest_access @@ -657,8 +653,7 @@ class RoomMemberHandler(object): and guest_access.content["guest_access"] == "can_join" ) - @defer.inlineCallbacks - def lookup_room_alias(self, room_alias): + async def lookup_room_alias(self, room_alias): """ Get the room ID associated with a room alias. @@ -672,7 +667,7 @@ class RoomMemberHandler(object): SynapseError if room alias could not be found. """ directory_handler = self.directory_handler - mapping = yield directory_handler.get_association(room_alias) + mapping = await directory_handler.get_association(room_alias) if not mapping: raise SynapseError(404, "No such room alias") @@ -687,9 +682,8 @@ class RoomMemberHandler(object): return RoomID.from_string(room_id), servers - @defer.inlineCallbacks - def _get_inviter(self, user_id, room_id): - invite = yield self.store.get_invite_for_local_user_in_room( + async def _get_inviter(self, user_id, room_id): + invite = await self.store.get_invite_for_local_user_in_room( user_id=user_id, room_id=room_id ) if invite: @@ -836,8 +830,7 @@ class RoomMemberHandler(object): txn_id=txn_id, ) - @defer.inlineCallbacks - def _is_host_in_room(self, current_state_ids): + async def _is_host_in_room(self, current_state_ids): # Have we just created the room, and is this about to be the very # first member event? create_event_id = current_state_ids.get(("m.room.create", "")) @@ -850,7 +843,7 @@ class RoomMemberHandler(object): continue event_id = current_state_ids[(etype, state_key)] - event = yield self.store.get_event(event_id, allow_none=True) + event = await self.store.get_event(event_id, allow_none=True) if not event: continue @@ -859,11 +852,10 @@ class RoomMemberHandler(object): return False - @defer.inlineCallbacks - def _is_server_notice_room(self, room_id): + async def _is_server_notice_room(self, room_id): if self._server_notices_mxid is None: return False - user_ids = yield self.store.get_users_in_room(room_id) + user_ids = await self.store.get_users_in_room(room_id) return self._server_notices_mxid in user_ids @@ -895,8 +887,7 @@ class RoomMemberMasterHandler(RoomMemberHandler): return complexity["v1"] > max_complexity return None - @defer.inlineCallbacks - def _is_local_room_too_complex(self, room_id): + async def _is_local_room_too_complex(self, room_id): """ Check if the complexity of a local room is too great. @@ -906,7 +897,7 @@ class RoomMemberMasterHandler(RoomMemberHandler): Returns: bool """ max_complexity = self.hs.config.limit_remote_rooms.complexity - complexity = yield self.store.get_room_complexity(room_id) + complexity = await self.store.get_room_complexity(room_id) return complexity["v1"] > max_complexity @@ -969,18 +960,15 @@ class RoomMemberMasterHandler(RoomMemberHandler): errcode=Codes.RESOURCE_LIMIT_EXCEEDED, ) - @defer.inlineCallbacks - def _remote_reject_invite( + async def _remote_reject_invite( self, requester, remote_room_hosts, room_id, target, content ): """Implements RoomMemberHandler._remote_reject_invite """ fed_handler = self.federation_handler try: - ret = yield defer.ensureDeferred( - fed_handler.do_remotely_reject_invite( - remote_room_hosts, room_id, target.to_string(), content=content, - ) + ret = await fed_handler.do_remotely_reject_invite( + remote_room_hosts, room_id, target.to_string(), content=content, ) return ret except Exception as e: @@ -992,24 +980,23 @@ class RoomMemberMasterHandler(RoomMemberHandler): # logger.warning("Failed to reject invite: %s", e) - yield self.store.locally_reject_invite(target.to_string(), room_id) + await self.store.locally_reject_invite(target.to_string(), room_id) return {} - def _user_joined_room(self, target, room_id): + async def _user_joined_room(self, target, room_id): """Implements RoomMemberHandler._user_joined_room """ - return defer.succeed(user_joined_room(self.distributor, target, room_id)) + return user_joined_room(self.distributor, target, room_id) - def _user_left_room(self, target, room_id): + async def _user_left_room(self, target, room_id): """Implements RoomMemberHandler._user_left_room """ - return defer.succeed(user_left_room(self.distributor, target, room_id)) + return user_left_room(self.distributor, target, room_id) - @defer.inlineCallbacks - def forget(self, user, room_id): + async def forget(self, user, room_id): user_id = user.to_string() - member = yield self.state_handler.get_current_state( + member = await self.state_handler.get_current_state( room_id=room_id, event_type=EventTypes.Member, state_key=user_id ) membership = member.membership if member else None @@ -1021,4 +1008,4 @@ class RoomMemberMasterHandler(RoomMemberHandler): raise SynapseError(400, "User %s in room %s" % (user_id, room_id)) if membership: - yield self.store.forget(user_id, room_id) + await self.store.forget(user_id, room_id) diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 69be86893b..0fc54349ab 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -15,8 +15,6 @@ import logging -from twisted.internet import defer - from synapse.api.errors import SynapseError from synapse.handlers.room_member import RoomMemberHandler from synapse.replication.http.membership import ( @@ -36,14 +34,13 @@ class RoomMemberWorkerHandler(RoomMemberHandler): self._remote_reject_client = ReplRejectInvite.make_client(hs) self._notify_change_client = ReplJoinedLeft.make_client(hs) - @defer.inlineCallbacks - def _remote_join(self, requester, remote_room_hosts, room_id, user, content): + async def _remote_join(self, requester, remote_room_hosts, room_id, user, content): """Implements RoomMemberHandler._remote_join """ if len(remote_room_hosts) == 0: raise SynapseError(404, "No known servers") - ret = yield self._remote_join_client( + ret = await self._remote_join_client( requester=requester, remote_room_hosts=remote_room_hosts, room_id=room_id, @@ -51,16 +48,16 @@ class RoomMemberWorkerHandler(RoomMemberHandler): content=content, ) - yield self._user_joined_room(user, room_id) + await self._user_joined_room(user, room_id) return ret - def _remote_reject_invite( + async def _remote_reject_invite( self, requester, remote_room_hosts, room_id, target, content ): """Implements RoomMemberHandler._remote_reject_invite """ - return self._remote_reject_client( + return await self._remote_reject_client( requester=requester, remote_room_hosts=remote_room_hosts, room_id=room_id, @@ -68,16 +65,16 @@ class RoomMemberWorkerHandler(RoomMemberHandler): content=content, ) - def _user_joined_room(self, target, room_id): + async def _user_joined_room(self, target, room_id): """Implements RoomMemberHandler._user_joined_room """ - return self._notify_change_client( + return await self._notify_change_client( user_id=target.to_string(), room_id=room_id, change="joined" ) - def _user_left_room(self, target, room_id): + async def _user_left_room(self, target, room_id): """Implements RoomMemberHandler._user_left_room """ - return self._notify_change_client( + return await self._notify_change_client( user_id=target.to_string(), room_id=room_id, change="left" )