Merge pull request #5388 from matrix-org/erikj/fix_email_push
Fix email notifications for unnamed rooms with multiple peoplepull/5479/head
						commit
						dd927b29e1
					
				|  | @ -0,0 +1 @@ | |||
| Fix email notifications for unnamed rooms with multiple people. | ||||
|  | @ -114,6 +114,21 @@ class EmailPusher(object): | |||
| 
 | ||||
|         run_as_background_process("emailpush.process", self._process) | ||||
| 
 | ||||
|     def _pause_processing(self): | ||||
|         """Used by tests to temporarily pause processing of events. | ||||
| 
 | ||||
|         Asserts that its not currently processing. | ||||
|         """ | ||||
|         assert not self._is_processing | ||||
|         self._is_processing = True | ||||
| 
 | ||||
|     def _resume_processing(self): | ||||
|         """Used by tests to resume processing of events after pausing. | ||||
|         """ | ||||
|         assert self._is_processing | ||||
|         self._is_processing = False | ||||
|         self._start_processing() | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def _process(self): | ||||
|         # we should never get here if we are already processing | ||||
|  | @ -215,6 +230,10 @@ class EmailPusher(object): | |||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def save_last_stream_ordering_and_success(self, last_stream_ordering): | ||||
|         if last_stream_ordering is None: | ||||
|             # This happens if we haven't yet processed anything | ||||
|             return | ||||
| 
 | ||||
|         self.last_stream_ordering = last_stream_ordering | ||||
|         yield self.store.update_pusher_last_stream_ordering_and_success( | ||||
|             self.app_id, self.email, self.user_id, | ||||
|  |  | |||
|  | @ -162,6 +162,17 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True | |||
| 
 | ||||
| 
 | ||||
| def descriptor_from_member_events(member_events): | ||||
|     """Get a description of the room based on the member events. | ||||
| 
 | ||||
|     Args: | ||||
|         member_events (Iterable[FrozenEvent]) | ||||
| 
 | ||||
|     Returns: | ||||
|         str | ||||
|     """ | ||||
| 
 | ||||
|     member_events = list(member_events) | ||||
| 
 | ||||
|     if len(member_events) == 0: | ||||
|         return "nobody" | ||||
|     elif len(member_events) == 1: | ||||
|  |  | |||
|  | @ -60,6 +60,11 @@ class PusherPool: | |||
|     def add_pusher(self, user_id, access_token, kind, app_id, | ||||
|                    app_display_name, device_display_name, pushkey, lang, data, | ||||
|                    profile_tag=""): | ||||
|         """Creates a new pusher and adds it to the pool | ||||
| 
 | ||||
|         Returns: | ||||
|             Deferred[EmailPusher|HttpPusher] | ||||
|         """ | ||||
|         time_now_msec = self.clock.time_msec() | ||||
| 
 | ||||
|         # we try to create the pusher just to validate the config: it | ||||
|  | @ -103,7 +108,9 @@ class PusherPool: | |||
|             last_stream_ordering=last_stream_ordering, | ||||
|             profile_tag=profile_tag, | ||||
|         ) | ||||
|         yield self.start_pusher_by_id(app_id, pushkey, user_id) | ||||
|         pusher = yield self.start_pusher_by_id(app_id, pushkey, user_id) | ||||
| 
 | ||||
|         defer.returnValue(pusher) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey, | ||||
|  | @ -184,7 +191,11 @@ class PusherPool: | |||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def start_pusher_by_id(self, app_id, pushkey, user_id): | ||||
|         """Look up the details for the given pusher, and start it""" | ||||
|         """Look up the details for the given pusher, and start it | ||||
| 
 | ||||
|         Returns: | ||||
|             Deferred[EmailPusher|HttpPusher|None]: The pusher started, if any | ||||
|         """ | ||||
|         if not self._should_start_pushers: | ||||
|             return | ||||
| 
 | ||||
|  | @ -192,13 +203,16 @@ class PusherPool: | |||
|             app_id, pushkey | ||||
|         ) | ||||
| 
 | ||||
|         p = None | ||||
|         pusher_dict = None | ||||
|         for r in resultlist: | ||||
|             if r['user_name'] == user_id: | ||||
|                 p = r | ||||
|                 pusher_dict = r | ||||
| 
 | ||||
|         if p: | ||||
|             yield self._start_pusher(p) | ||||
|         pusher = None | ||||
|         if pusher_dict: | ||||
|             pusher = yield self._start_pusher(pusher_dict) | ||||
| 
 | ||||
|         defer.returnValue(pusher) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def _start_pushers(self): | ||||
|  | @ -224,7 +238,7 @@ class PusherPool: | |||
|             pusherdict (dict): | ||||
| 
 | ||||
|         Returns: | ||||
|             None | ||||
|             Deferred[EmailPusher|HttpPusher] | ||||
|         """ | ||||
|         try: | ||||
|             p = self.pusher_factory.create_pusher(pusherdict) | ||||
|  | @ -270,6 +284,8 @@ class PusherPool: | |||
| 
 | ||||
|         p.on_started(have_notifs) | ||||
| 
 | ||||
|         defer.returnValue(p) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def remove_pusher(self, app_id, pushkey, user_id): | ||||
|         appid_pushkey = "%s:%s" % (app_id, pushkey) | ||||
|  |  | |||
|  | @ -15,6 +15,7 @@ | |||
| 
 | ||||
| import os | ||||
| 
 | ||||
| import attr | ||||
| import pkg_resources | ||||
| 
 | ||||
| from twisted.internet.defer import Deferred | ||||
|  | @ -25,6 +26,13 @@ from synapse.rest.client.v1 import login, room | |||
| from tests.unittest import HomeserverTestCase | ||||
| 
 | ||||
| 
 | ||||
| @attr.s | ||||
| class _User(object): | ||||
|     "Helper wrapper for user ID and access token" | ||||
|     id = attr.ib() | ||||
|     token = attr.ib() | ||||
| 
 | ||||
| 
 | ||||
| class EmailPusherTests(HomeserverTestCase): | ||||
| 
 | ||||
|     servlets = [ | ||||
|  | @ -71,25 +79,32 @@ class EmailPusherTests(HomeserverTestCase): | |||
| 
 | ||||
|         return hs | ||||
| 
 | ||||
|     def test_sends_email(self): | ||||
| 
 | ||||
|     def prepare(self, reactor, clock, hs): | ||||
|         # Register the user who gets notified | ||||
|         user_id = self.register_user("user", "pass") | ||||
|         access_token = self.login("user", "pass") | ||||
|         self.user_id = self.register_user("user", "pass") | ||||
|         self.access_token = self.login("user", "pass") | ||||
| 
 | ||||
|         # Register the user who sends the message | ||||
|         other_user_id = self.register_user("otheruser", "pass") | ||||
|         other_access_token = self.login("otheruser", "pass") | ||||
|         # Register other users | ||||
|         self.others = [ | ||||
|             _User( | ||||
|                 id=self.register_user("otheruser1", "pass"), | ||||
|                 token=self.login("otheruser1", "pass"), | ||||
|             ), | ||||
|             _User( | ||||
|                 id=self.register_user("otheruser2", "pass"), | ||||
|                 token=self.login("otheruser2", "pass"), | ||||
|             ), | ||||
|         ] | ||||
| 
 | ||||
|         # Register the pusher | ||||
|         user_tuple = self.get_success( | ||||
|             self.hs.get_datastore().get_user_by_access_token(access_token) | ||||
|             self.hs.get_datastore().get_user_by_access_token(self.access_token) | ||||
|         ) | ||||
|         token_id = user_tuple["token_id"] | ||||
| 
 | ||||
|         self.get_success( | ||||
|         self.pusher = self.get_success( | ||||
|             self.hs.get_pusherpool().add_pusher( | ||||
|                 user_id=user_id, | ||||
|                 user_id=self.user_id, | ||||
|                 access_token=token_id, | ||||
|                 kind="email", | ||||
|                 app_id="m.email", | ||||
|  | @ -101,22 +116,54 @@ class EmailPusherTests(HomeserverTestCase): | |||
|             ) | ||||
|         ) | ||||
| 
 | ||||
|         # Create a room | ||||
|         room = self.helper.create_room_as(user_id, tok=access_token) | ||||
| 
 | ||||
|         # Invite the other person | ||||
|         self.helper.invite(room=room, src=user_id, tok=access_token, targ=other_user_id) | ||||
| 
 | ||||
|         # The other user joins | ||||
|         self.helper.join(room=room, user=other_user_id, tok=other_access_token) | ||||
|     def test_simple_sends_email(self): | ||||
|         # Create a simple room with two users | ||||
|         room = self.helper.create_room_as(self.user_id, tok=self.access_token) | ||||
|         self.helper.invite( | ||||
|             room=room, src=self.user_id, tok=self.access_token, targ=self.others[0].id, | ||||
|         ) | ||||
|         self.helper.join(room=room, user=self.others[0].id, tok=self.others[0].token) | ||||
| 
 | ||||
|         # The other user sends some messages | ||||
|         self.helper.send(room, body="Hi!", tok=other_access_token) | ||||
|         self.helper.send(room, body="There!", tok=other_access_token) | ||||
|         self.helper.send(room, body="Hi!", tok=self.others[0].token) | ||||
|         self.helper.send(room, body="There!", tok=self.others[0].token) | ||||
| 
 | ||||
|         # We should get emailed about that message | ||||
|         self._check_for_mail() | ||||
| 
 | ||||
|     def test_multiple_members_email(self): | ||||
|         # We want to test multiple notifications, so we pause processing of push | ||||
|         # while we send messages. | ||||
|         self.pusher._pause_processing() | ||||
| 
 | ||||
|         # Create a simple room with multiple other users | ||||
|         room = self.helper.create_room_as(self.user_id, tok=self.access_token) | ||||
| 
 | ||||
|         for other in self.others: | ||||
|             self.helper.invite( | ||||
|                 room=room, src=self.user_id, tok=self.access_token, targ=other.id, | ||||
|             ) | ||||
|             self.helper.join(room=room, user=other.id, tok=other.token) | ||||
| 
 | ||||
|         # The other users send some messages | ||||
|         self.helper.send(room, body="Hi!", tok=self.others[0].token) | ||||
|         self.helper.send(room, body="There!", tok=self.others[1].token) | ||||
|         self.helper.send(room, body="There!", tok=self.others[1].token) | ||||
| 
 | ||||
|         # Nothing should have happened yet, as we're paused. | ||||
|         assert not self.email_attempts | ||||
| 
 | ||||
|         self.pusher._resume_processing() | ||||
| 
 | ||||
|         # We should get emailed about those messages | ||||
|         self._check_for_mail() | ||||
| 
 | ||||
|     def _check_for_mail(self): | ||||
|         "Check that the user receives an email notification" | ||||
| 
 | ||||
|         # Get the stream ordering before it gets sent | ||||
|         pushers = self.get_success( | ||||
|             self.hs.get_datastore().get_pushers_by(dict(user_name=user_id)) | ||||
|             self.hs.get_datastore().get_pushers_by(dict(user_name=self.user_id)) | ||||
|         ) | ||||
|         self.assertEqual(len(pushers), 1) | ||||
|         last_stream_ordering = pushers[0]["last_stream_ordering"] | ||||
|  | @ -126,7 +173,7 @@ class EmailPusherTests(HomeserverTestCase): | |||
| 
 | ||||
|         # It hasn't succeeded yet, so the stream ordering shouldn't have moved | ||||
|         pushers = self.get_success( | ||||
|             self.hs.get_datastore().get_pushers_by(dict(user_name=user_id)) | ||||
|             self.hs.get_datastore().get_pushers_by(dict(user_name=self.user_id)) | ||||
|         ) | ||||
|         self.assertEqual(len(pushers), 1) | ||||
|         self.assertEqual(last_stream_ordering, pushers[0]["last_stream_ordering"]) | ||||
|  | @ -143,7 +190,7 @@ class EmailPusherTests(HomeserverTestCase): | |||
| 
 | ||||
|         # The stream ordering has increased | ||||
|         pushers = self.get_success( | ||||
|             self.hs.get_datastore().get_pushers_by(dict(user_name=user_id)) | ||||
|             self.hs.get_datastore().get_pushers_by(dict(user_name=self.user_id)) | ||||
|         ) | ||||
|         self.assertEqual(len(pushers), 1) | ||||
|         self.assertTrue(pushers[0]["last_stream_ordering"] > last_stream_ordering) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Erik Johnston
						Erik Johnston