Merge pull request #8463 from matrix-org/rav/clean_up_event_handling
Reduce inconsistencies between codepaths for membership and non-membership events.pull/8493/head
						commit
						43c622885c
					
				|  | @ -0,0 +1 @@ | |||
| Reduce inconsistencies between codepaths for membership and non-membership events. | ||||
|  | @ -635,61 +635,6 @@ class EventCreationHandler: | |||
|         msg = self._block_events_without_consent_error % {"consent_uri": consent_uri} | ||||
|         raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri) | ||||
| 
 | ||||
|     async def send_nonmember_event( | ||||
|         self, | ||||
|         requester: Requester, | ||||
|         event: EventBase, | ||||
|         context: EventContext, | ||||
|         ratelimit: bool = True, | ||||
|         ignore_shadow_ban: bool = False, | ||||
|     ) -> int: | ||||
|         """ | ||||
|         Persists and notifies local clients and federation of an event. | ||||
| 
 | ||||
|         Args: | ||||
|             requester: The requester sending the event. | ||||
|             event: The event to send. | ||||
|             context: The context of the event. | ||||
|             ratelimit: Whether to rate limit this send. | ||||
|             ignore_shadow_ban: True if shadow-banned users should be allowed to | ||||
|                 send this event. | ||||
| 
 | ||||
|         Return: | ||||
|             The stream_id of the persisted event. | ||||
| 
 | ||||
|         Raises: | ||||
|             ShadowBanError if the requester has been shadow-banned. | ||||
|         """ | ||||
|         if event.type == EventTypes.Member: | ||||
|             raise SynapseError( | ||||
|                 500, "Tried to send member event through non-member codepath" | ||||
|             ) | ||||
| 
 | ||||
|         if not ignore_shadow_ban and requester.shadow_banned: | ||||
|             # We randomly sleep a bit just to annoy the requester. | ||||
|             await self.clock.sleep(random.randint(1, 10)) | ||||
|             raise ShadowBanError() | ||||
| 
 | ||||
|         user = UserID.from_string(event.sender) | ||||
| 
 | ||||
|         assert self.hs.is_mine(user), "User must be our own: %s" % (user,) | ||||
| 
 | ||||
|         if event.is_state(): | ||||
|             prev_event = await self.deduplicate_state_event(event, context) | ||||
|             if prev_event is not None: | ||||
|                 logger.info( | ||||
|                     "Not bothering to persist state event %s duplicated by %s", | ||||
|                     event.event_id, | ||||
|                     prev_event.event_id, | ||||
|                 ) | ||||
|                 # we know it was persisted, so must have a stream ordering | ||||
|                 assert prev_event.internal_metadata.stream_ordering | ||||
|                 return prev_event.internal_metadata.stream_ordering | ||||
| 
 | ||||
|         return await self.handle_new_client_event( | ||||
|             requester=requester, event=event, context=context, ratelimit=ratelimit | ||||
|         ) | ||||
| 
 | ||||
|     async def deduplicate_state_event( | ||||
|         self, event: EventBase, context: EventContext | ||||
|     ) -> Optional[EventBase]: | ||||
|  | @ -730,7 +675,7 @@ class EventCreationHandler: | |||
|         """ | ||||
|         Creates an event, then sends it. | ||||
| 
 | ||||
|         See self.create_event and self.send_nonmember_event. | ||||
|         See self.create_event and self.handle_new_client_event. | ||||
| 
 | ||||
|         Args: | ||||
|             requester: The requester sending the event. | ||||
|  | @ -740,9 +685,19 @@ class EventCreationHandler: | |||
|             ignore_shadow_ban: True if shadow-banned users should be allowed to | ||||
|                 send this event. | ||||
| 
 | ||||
|         Returns: | ||||
|             The event, and its stream ordering (if state event deduplication happened, | ||||
|             the previous, duplicate event). | ||||
| 
 | ||||
|         Raises: | ||||
|             ShadowBanError if the requester has been shadow-banned. | ||||
|         """ | ||||
| 
 | ||||
|         if event_dict["type"] == EventTypes.Member: | ||||
|             raise SynapseError( | ||||
|                 500, "Tried to send member event through non-member codepath" | ||||
|             ) | ||||
| 
 | ||||
|         if not ignore_shadow_ban and requester.shadow_banned: | ||||
|             # We randomly sleep a bit just to annoy the requester. | ||||
|             await self.clock.sleep(random.randint(1, 10)) | ||||
|  | @ -758,20 +713,27 @@ class EventCreationHandler: | |||
|                 requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id | ||||
|             ) | ||||
| 
 | ||||
|             assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( | ||||
|                 event.sender, | ||||
|             ) | ||||
| 
 | ||||
|             spam_error = self.spam_checker.check_event_for_spam(event) | ||||
|             if spam_error: | ||||
|                 if not isinstance(spam_error, str): | ||||
|                     spam_error = "Spam is not permitted here" | ||||
|                 raise SynapseError(403, spam_error, Codes.FORBIDDEN) | ||||
| 
 | ||||
|             stream_id = await self.send_nonmember_event( | ||||
|                 requester, | ||||
|                 event, | ||||
|                 context, | ||||
|             ev = await self.handle_new_client_event( | ||||
|                 requester=requester, | ||||
|                 event=event, | ||||
|                 context=context, | ||||
|                 ratelimit=ratelimit, | ||||
|                 ignore_shadow_ban=ignore_shadow_ban, | ||||
|             ) | ||||
|         return event, stream_id | ||||
| 
 | ||||
|         # we know it was persisted, so must have a stream ordering | ||||
|         assert ev.internal_metadata.stream_ordering | ||||
|         return ev, ev.internal_metadata.stream_ordering | ||||
| 
 | ||||
|     @measure_func("create_new_client_event") | ||||
|     async def create_new_client_event( | ||||
|  | @ -845,8 +807,11 @@ class EventCreationHandler: | |||
|         context: EventContext, | ||||
|         ratelimit: bool = True, | ||||
|         extra_users: List[UserID] = [], | ||||
|     ) -> int: | ||||
|         """Processes a new event. This includes checking auth, persisting it, | ||||
|         ignore_shadow_ban: bool = False, | ||||
|     ) -> EventBase: | ||||
|         """Processes a new event. | ||||
| 
 | ||||
|         This includes deduplicating, checking auth, persisting, | ||||
|         notifying users, sending to remote servers, etc. | ||||
| 
 | ||||
|         If called from a worker will hit out to the master process for final | ||||
|  | @ -859,10 +824,39 @@ class EventCreationHandler: | |||
|             ratelimit | ||||
|             extra_users: Any extra users to notify about event | ||||
| 
 | ||||
|             ignore_shadow_ban: True if shadow-banned users should be allowed to | ||||
|                 send this event. | ||||
| 
 | ||||
|         Return: | ||||
|             The stream_id of the persisted event. | ||||
|             If the event was deduplicated, the previous, duplicate, event. Otherwise, | ||||
|             `event`. | ||||
| 
 | ||||
|         Raises: | ||||
|             ShadowBanError if the requester has been shadow-banned. | ||||
|         """ | ||||
| 
 | ||||
|         # we don't apply shadow-banning to membership events here. Invites are blocked | ||||
|         # higher up the stack, and we allow shadow-banned users to send join and leave | ||||
|         # events as normal. | ||||
|         if ( | ||||
|             event.type != EventTypes.Member | ||||
|             and not ignore_shadow_ban | ||||
|             and requester.shadow_banned | ||||
|         ): | ||||
|             # We randomly sleep a bit just to annoy the requester. | ||||
|             await self.clock.sleep(random.randint(1, 10)) | ||||
|             raise ShadowBanError() | ||||
| 
 | ||||
|         if event.is_state(): | ||||
|             prev_event = await self.deduplicate_state_event(event, context) | ||||
|             if prev_event is not None: | ||||
|                 logger.info( | ||||
|                     "Not bothering to persist state event %s duplicated by %s", | ||||
|                     event.event_id, | ||||
|                     prev_event.event_id, | ||||
|                 ) | ||||
|                 return prev_event | ||||
| 
 | ||||
|         if event.is_state() and (event.type, event.state_key) == ( | ||||
|             EventTypes.Create, | ||||
|             "", | ||||
|  | @ -917,13 +911,13 @@ class EventCreationHandler: | |||
|                 ) | ||||
|                 stream_id = result["stream_id"] | ||||
|                 event.internal_metadata.stream_ordering = stream_id | ||||
|                 return stream_id | ||||
|                 return event | ||||
| 
 | ||||
|             stream_id = await self.persist_and_notify_client_event( | ||||
|                 requester, event, context, ratelimit=ratelimit, extra_users=extra_users | ||||
|             ) | ||||
| 
 | ||||
|             return stream_id | ||||
|             return event | ||||
|         except Exception: | ||||
|             # Ensure that we actually remove the entries in the push actions | ||||
|             # staging area, if we calculated them. | ||||
|  | @ -1234,8 +1228,12 @@ class EventCreationHandler: | |||
| 
 | ||||
|                 # Since this is a dummy-event it is OK if it is sent by a | ||||
|                 # shadow-banned user. | ||||
|                 await self.send_nonmember_event( | ||||
|                     requester, event, context, ratelimit=False, ignore_shadow_ban=True, | ||||
|                 await self.handle_new_client_event( | ||||
|                     requester=requester, | ||||
|                     event=event, | ||||
|                     context=context, | ||||
|                     ratelimit=False, | ||||
|                     ignore_shadow_ban=True, | ||||
|                 ) | ||||
|                 return True | ||||
|             except ConsentNotGivenError: | ||||
|  |  | |||
|  | @ -185,6 +185,7 @@ class RoomCreationHandler(BaseHandler): | |||
|             ShadowBanError if the requester is shadow-banned. | ||||
|         """ | ||||
|         user_id = requester.user.to_string() | ||||
|         assert self.hs.is_mine_id(user_id), "User must be our own: %s" % (user_id,) | ||||
| 
 | ||||
|         # start by allocating a new room id | ||||
|         r = await self.store.get_room(old_room_id) | ||||
|  | @ -229,8 +230,8 @@ class RoomCreationHandler(BaseHandler): | |||
|         ) | ||||
| 
 | ||||
|         # now send the tombstone | ||||
|         await self.event_creation_handler.send_nonmember_event( | ||||
|             requester, tombstone_event, tombstone_context | ||||
|         await self.event_creation_handler.handle_new_client_event( | ||||
|             requester=requester, event=tombstone_event, context=tombstone_context, | ||||
|         ) | ||||
| 
 | ||||
|         old_room_state = await tombstone_context.get_current_state_ids() | ||||
|  |  | |||
|  | @ -188,16 +188,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): | |||
|             require_consent=require_consent, | ||||
|         ) | ||||
| 
 | ||||
|         # Check if this event matches the previous membership event for the user. | ||||
|         duplicate = await self.event_creation_handler.deduplicate_state_event( | ||||
|             event, context | ||||
|         ) | ||||
|         if duplicate is not None: | ||||
|             # Discard the new event since this membership change is a no-op. | ||||
|             # we know it was persisted, so must have a stream ordering. | ||||
|             assert duplicate.internal_metadata.stream_ordering | ||||
|             return duplicate.event_id, duplicate.internal_metadata.stream_ordering | ||||
| 
 | ||||
|         prev_state_ids = await context.get_prev_state_ids() | ||||
| 
 | ||||
|         prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None) | ||||
|  | @ -222,7 +212,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): | |||
|                         retry_after_ms=int(1000 * (time_allowed - time_now_s)) | ||||
|                     ) | ||||
| 
 | ||||
|         stream_id = await self.event_creation_handler.handle_new_client_event( | ||||
|         result_event = await self.event_creation_handler.handle_new_client_event( | ||||
|             requester, event, context, extra_users=[target], ratelimit=ratelimit, | ||||
|         ) | ||||
| 
 | ||||
|  | @ -232,7 +222,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): | |||
|                 if prev_member_event.membership == Membership.JOIN: | ||||
|                     await self._user_left_room(target, room_id) | ||||
| 
 | ||||
|         return event.event_id, stream_id | ||||
|         # we know it was persisted, so should have a stream ordering | ||||
|         assert result_event.internal_metadata.stream_ordering | ||||
|         return result_event.event_id, result_event.internal_metadata.stream_ordering | ||||
| 
 | ||||
|     async def copy_room_tags_and_direct_to_room( | ||||
|         self, old_room_id, new_room_id, user_id | ||||
|  | @ -673,12 +665,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): | |||
|         else: | ||||
|             requester = types.create_requester(target_user) | ||||
| 
 | ||||
|         prev_event = await self.event_creation_handler.deduplicate_state_event( | ||||
|             event, context | ||||
|         ) | ||||
|         if prev_event is not None: | ||||
|             return | ||||
| 
 | ||||
|         prev_state_ids = await context.get_prev_state_ids() | ||||
|         if event.membership == Membership.JOIN: | ||||
|             if requester.is_guest: | ||||
|  | @ -1186,10 +1172,13 @@ class RoomMemberMasterHandler(RoomMemberHandler): | |||
| 
 | ||||
|         context = await self.state_handler.compute_event_context(event) | ||||
|         context.app_service = requester.app_service | ||||
|         stream_id = await self.event_creation_handler.handle_new_client_event( | ||||
|         result_event = await self.event_creation_handler.handle_new_client_event( | ||||
|             requester, event, context, extra_users=[UserID.from_string(target_user)], | ||||
|         ) | ||||
|         return event.event_id, stream_id | ||||
|         # we know it was persisted, so must have a stream ordering | ||||
|         assert result_event.internal_metadata.stream_ordering | ||||
| 
 | ||||
|         return result_event.event_id, result_event.internal_metadata.stream_ordering | ||||
| 
 | ||||
|     async def _user_left_room(self, target: UserID, room_id: str) -> None: | ||||
|         """Implements RoomMemberHandler._user_left_room | ||||
|  |  | |||
|  | @ -413,7 +413,7 @@ class RegistrationTestCase(unittest.HomeserverTestCase): | |||
|             ) | ||||
|         ) | ||||
|         self.get_success( | ||||
|             event_creation_handler.send_nonmember_event(requester, event, context) | ||||
|             event_creation_handler.handle_new_client_event(requester, event, context) | ||||
|         ) | ||||
| 
 | ||||
|         # Register a second user, which won't be be in the room (or even have an invite) | ||||
|  |  | |||
|  | @ -608,7 +608,9 @@ class HomeserverTestCase(TestCase): | |||
|         if soft_failed: | ||||
|             event.internal_metadata.soft_failed = True | ||||
| 
 | ||||
|         self.get_success(event_creator.send_nonmember_event(requester, event, context)) | ||||
|         self.get_success( | ||||
|             event_creator.handle_new_client_event(requester, event, context) | ||||
|         ) | ||||
| 
 | ||||
|         return event.event_id | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Richard van der Hoff
						Richard van der Hoff