Merge remote-tracking branch 'origin/develop' into dbkr/email_unsubscribe
						commit
						812b5de0fe
					
				|  | @ -126,6 +126,24 @@ class Auth(object): | |||
|                 return allowed | ||||
| 
 | ||||
|             self.check_event_sender_in_room(event, auth_events) | ||||
| 
 | ||||
|             # Special case to allow m.room.third_party_invite events wherever | ||||
|             # a user is allowed to issue invites.  Fixes | ||||
|             # https://github.com/vector-im/vector-web/issues/1208 hopefully | ||||
|             if event.type == EventTypes.ThirdPartyInvite: | ||||
|                 user_level = self._get_user_power_level(event.user_id, auth_events) | ||||
|                 invite_level = self._get_named_level(auth_events, "invite", 0) | ||||
| 
 | ||||
|                 if user_level < invite_level: | ||||
|                     raise AuthError( | ||||
|                         403, ( | ||||
|                             "You cannot issue a third party invite for %s." % | ||||
|                             (event.content.display_name,) | ||||
|                         ) | ||||
|                     ) | ||||
|                 else: | ||||
|                     return True | ||||
| 
 | ||||
|             self._can_send_event(event, auth_events) | ||||
| 
 | ||||
|             if event.type == EventTypes.PowerLevels: | ||||
|  |  | |||
|  | @ -68,6 +68,10 @@ FEDERATION_TIMEOUT = 30 * 60 * 1000 | |||
| # How often to resend presence to remote servers | ||||
| FEDERATION_PING_INTERVAL = 25 * 60 * 1000 | ||||
| 
 | ||||
| # How long we will wait before assuming that the syncs from an external process | ||||
| # are dead. | ||||
| EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000 | ||||
| 
 | ||||
| assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER | ||||
| 
 | ||||
| 
 | ||||
|  | @ -158,10 +162,21 @@ class PresenceHandler(object): | |||
|         self.serial_to_user = {} | ||||
|         self._next_serial = 1 | ||||
| 
 | ||||
|         # Keeps track of the number of *ongoing* syncs. While this is non zero | ||||
|         # a user will never go offline. | ||||
|         # Keeps track of the number of *ongoing* syncs on this process. While | ||||
|         # this is non zero a user will never go offline. | ||||
|         self.user_to_num_current_syncs = {} | ||||
| 
 | ||||
|         # Keeps track of the number of *ongoing* syncs on other processes. | ||||
|         # While any sync is ongoing on another process the user will never | ||||
|         # go offline. | ||||
|         # Each process has a unique identifier and an update frequency. If | ||||
|         # no update is received from that process within the update period then | ||||
|         # we assume that all the sync requests on that process have stopped. | ||||
|         # Stored as a dict from process_id to set of user_id, and a dict of | ||||
|         # process_id to millisecond timestamp last updated. | ||||
|         self.external_process_to_current_syncs = {} | ||||
|         self.external_process_last_updated_ms = {} | ||||
| 
 | ||||
|         # Start a LoopingCall in 30s that fires every 5s. | ||||
|         # The initial delay is to allow disconnected clients a chance to | ||||
|         # reconnect before we treat them as offline. | ||||
|  | @ -272,13 +287,26 @@ class PresenceHandler(object): | |||
|             # Fetch the list of users that *may* have timed out. Things may have | ||||
|             # changed since the timeout was set, so we won't necessarily have to | ||||
|             # take any action. | ||||
|             users_to_check = self.wheel_timer.fetch(now) | ||||
|             users_to_check = set(self.wheel_timer.fetch(now)) | ||||
| 
 | ||||
|             # Check whether the lists of syncing processes from an external | ||||
|             # process have expired. | ||||
|             expired_process_ids = [ | ||||
|                 process_id for process_id, last_update | ||||
|                 in self.external_process_last_update.items() | ||||
|                 if now - last_update > EXTERNAL_PROCESS_EXPIRY | ||||
|             ] | ||||
|             for process_id in expired_process_ids: | ||||
|                 users_to_check.update( | ||||
|                     self.external_process_to_current_syncs.pop(process_id, ()) | ||||
|                 ) | ||||
|                 self.external_process_last_update.pop(process_id) | ||||
| 
 | ||||
|             states = [ | ||||
|                 self.user_to_current_state.get( | ||||
|                     user_id, UserPresenceState.default(user_id) | ||||
|                 ) | ||||
|                 for user_id in set(users_to_check) | ||||
|                 for user_id in users_to_check | ||||
|             ] | ||||
| 
 | ||||
|             timers_fired_counter.inc_by(len(states)) | ||||
|  | @ -286,7 +314,7 @@ class PresenceHandler(object): | |||
|             changes = handle_timeouts( | ||||
|                 states, | ||||
|                 is_mine_fn=self.is_mine_id, | ||||
|                 user_to_num_current_syncs=self.user_to_num_current_syncs, | ||||
|                 syncing_users=self.get_syncing_users(), | ||||
|                 now=now, | ||||
|             ) | ||||
| 
 | ||||
|  | @ -363,6 +391,73 @@ class PresenceHandler(object): | |||
| 
 | ||||
|         defer.returnValue(_user_syncing()) | ||||
| 
 | ||||
|     def get_currently_syncing_users(self): | ||||
|         """Get the set of user ids that are currently syncing on this HS. | ||||
|         Returns: | ||||
|             set(str): A set of user_id strings. | ||||
|         """ | ||||
|         syncing_user_ids = { | ||||
|             user_id for user_id, count in self.user_to_num_current_syncs.items() | ||||
|             if count | ||||
|         } | ||||
|         syncing_user_ids.update(self.external_process_to_current_syncs.values()) | ||||
|         return syncing_user_ids | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def update_external_syncs(self, process_id, syncing_user_ids): | ||||
|         """Update the syncing users for an external process | ||||
| 
 | ||||
|         Args: | ||||
|             process_id(str): An identifier for the process the users are | ||||
|                 syncing against. This allows synapse to process updates | ||||
|                 as user start and stop syncing against a given process. | ||||
|             syncing_user_ids(set(str)): The set of user_ids that are | ||||
|                 currently syncing on that server. | ||||
|         """ | ||||
| 
 | ||||
|         # Grab the previous list of user_ids that were syncing on that process | ||||
|         prev_syncing_user_ids = ( | ||||
|             self.external_process_to_current_syncs.get(process_id, set()) | ||||
|         ) | ||||
|         # Grab the current presence state for both the users that are syncing | ||||
|         # now and the users that were syncing before this update. | ||||
|         prev_states = yield self.current_state_for_users( | ||||
|             syncing_user_ids | prev_syncing_user_ids | ||||
|         ) | ||||
|         updates = [] | ||||
|         time_now_ms = self.clock.time_msec() | ||||
| 
 | ||||
|         # For each new user that is syncing check if we need to mark them as | ||||
|         # being online. | ||||
|         for new_user_id in syncing_user_ids - prev_syncing_user_ids: | ||||
|             prev_state = prev_states[new_user_id] | ||||
|             if prev_state.state == PresenceState.OFFLINE: | ||||
|                 updates.append(prev_state.copy_and_replace( | ||||
|                     state=PresenceState.ONLINE, | ||||
|                     last_active_ts=time_now_ms, | ||||
|                     last_user_sync_ts=time_now_ms, | ||||
|                 )) | ||||
|             else: | ||||
|                 updates.append(prev_state.copy_and_replace( | ||||
|                     last_user_sync_ts=time_now_ms, | ||||
|                 )) | ||||
| 
 | ||||
|         # For each user that is still syncing or stopped syncing update the | ||||
|         # last sync time so that we will correctly apply the grace period when | ||||
|         # they stop syncing. | ||||
|         for old_user_id in prev_syncing_user_ids: | ||||
|             prev_state = prev_states[old_user_id] | ||||
|             updates.append(prev_state.copy_and_replace( | ||||
|                 last_user_sync_ts=time_now_ms, | ||||
|             )) | ||||
| 
 | ||||
|         yield self._update_states(updates) | ||||
| 
 | ||||
|         # Update the last updated time for the process. We expire the entries | ||||
|         # if we don't receive an update in the given timeframe. | ||||
|         self.external_process_last_updated_ms[process_id] = self.clock.time_msec() | ||||
|         self.external_process_to_current_syncs[process_id] = syncing_user_ids | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def current_state_for_user(self, user_id): | ||||
|         """Get the current presence state for a user. | ||||
|  | @ -935,15 +1030,14 @@ class PresenceEventSource(object): | |||
|         return self.get_new_events(user, from_key=None, include_offline=False) | ||||
| 
 | ||||
| 
 | ||||
| def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now): | ||||
| def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now): | ||||
|     """Checks the presence of users that have timed out and updates as | ||||
|     appropriate. | ||||
| 
 | ||||
|     Args: | ||||
|         user_states(list): List of UserPresenceState's to check. | ||||
|         is_mine_fn (fn): Function that returns if a user_id is ours | ||||
|         user_to_num_current_syncs (dict): Mapping of user_id to number of currently | ||||
|             active syncs. | ||||
|         syncing_user_ids (set): Set of user_ids with active syncs. | ||||
|         now (int): Current time in ms. | ||||
| 
 | ||||
|     Returns: | ||||
|  | @ -954,21 +1048,20 @@ def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now): | |||
|     for state in user_states: | ||||
|         is_mine = is_mine_fn(state.user_id) | ||||
| 
 | ||||
|         new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now) | ||||
|         new_state = handle_timeout(state, is_mine, syncing_user_ids, now) | ||||
|         if new_state: | ||||
|             changes[state.user_id] = new_state | ||||
| 
 | ||||
|     return changes.values() | ||||
| 
 | ||||
| 
 | ||||
| def handle_timeout(state, is_mine, user_to_num_current_syncs, now): | ||||
| def handle_timeout(state, is_mine, syncing_user_ids, now): | ||||
|     """Checks the presence of the user to see if any of the timers have elapsed | ||||
| 
 | ||||
|     Args: | ||||
|         state (UserPresenceState) | ||||
|         is_mine (bool): Whether the user is ours | ||||
|         user_to_num_current_syncs (dict): Mapping of user_id to number of currently | ||||
|             active syncs. | ||||
|         syncing_user_ids (set): Set of user_ids with active syncs. | ||||
|         now (int): Current time in ms. | ||||
| 
 | ||||
|     Returns: | ||||
|  | @ -1002,7 +1095,7 @@ def handle_timeout(state, is_mine, user_to_num_current_syncs, now): | |||
| 
 | ||||
|         # If there are have been no sync for a while (and none ongoing), | ||||
|         # set presence to offline | ||||
|         if not user_to_num_current_syncs.get(user_id, 0): | ||||
|         if user_id not in syncing_user_ids: | ||||
|             if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT: | ||||
|                 state = state.copy_and_replace( | ||||
|                     state=PresenceState.OFFLINE, | ||||
|  |  | |||
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							|  | @ -44,7 +44,8 @@ THROTTLE_RESET_AFTER_MS = (12 * 60 * 60 * 1000) | |||
| 
 | ||||
| # does each email include all unread notifs, or just the ones which have happened | ||||
| # since the last mail? | ||||
| INCLUDE_ALL_UNREAD_NOTIFS = True | ||||
| # XXX: this is currently broken as it includes ones from parted rooms(!) | ||||
| INCLUDE_ALL_UNREAD_NOTIFS = False | ||||
| 
 | ||||
| 
 | ||||
| class EmailPusher(object): | ||||
|  | @ -72,7 +73,12 @@ class EmailPusher(object): | |||
|         self.processing = False | ||||
| 
 | ||||
|         if self.hs.config.email_enable_notifs: | ||||
|             self.mailer = Mailer(self.hs) | ||||
|             if 'data' in pusherdict and 'brand' in pusherdict['data']: | ||||
|                 app_name = pusherdict['data']['brand'] | ||||
|             else: | ||||
|                 app_name = self.hs.config.email_app_name | ||||
| 
 | ||||
|             self.mailer = Mailer(self.hs, app_name) | ||||
|         else: | ||||
|             self.mailer = None | ||||
| 
 | ||||
|  |  | |||
|  | @ -78,13 +78,13 @@ ALLOWED_ATTRS = { | |||
| 
 | ||||
| 
 | ||||
| class Mailer(object): | ||||
|     def __init__(self, hs): | ||||
|     def __init__(self, hs, app_name): | ||||
|         self.hs = hs | ||||
|         self.store = self.hs.get_datastore() | ||||
|         self.handlers = self.hs.get_handlers() | ||||
|         self.state_handler = self.hs.get_state_handler() | ||||
|         loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir) | ||||
|         self.app_name = self.hs.config.email_app_name | ||||
|         self.app_name = app_name | ||||
|         env = jinja2.Environment(loader=loader) | ||||
|         env.filters["format_ts"] = format_ts_filter | ||||
|         env.filters["mxc_to_http"] = self.mxc_to_http_filter | ||||
|  |  | |||
|  | @ -0,0 +1,59 @@ | |||
| # Copyright 2016 OpenMarket Ltd | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| 
 | ||||
| from synapse.http.server import respond_with_json_bytes, request_handler | ||||
| from synapse.http.servlet import parse_json_object_from_request | ||||
| 
 | ||||
| from twisted.web.resource import Resource | ||||
| from twisted.web.server import NOT_DONE_YET | ||||
| from twisted.internet import defer | ||||
| 
 | ||||
| 
 | ||||
| class PresenceResource(Resource): | ||||
|     """ | ||||
|     HTTP endpoint for marking users as syncing. | ||||
| 
 | ||||
|     POST /_synapse/replication/presence HTTP/1.1 | ||||
|     Content-Type: application/json | ||||
| 
 | ||||
|     { | ||||
|         "process_id": "<process_id>", | ||||
|         "syncing_users": ["<user_id>"] | ||||
|     } | ||||
|     """ | ||||
| 
 | ||||
|     def __init__(self, hs): | ||||
|         Resource.__init__(self)  # Resource is old-style, so no super() | ||||
| 
 | ||||
|         self.version_string = hs.version_string | ||||
|         self.clock = hs.get_clock() | ||||
|         self.presence_handler = hs.get_presence_handler() | ||||
| 
 | ||||
|     def render_POST(self, request): | ||||
|         self._async_render_POST(request) | ||||
|         return NOT_DONE_YET | ||||
| 
 | ||||
|     @request_handler() | ||||
|     @defer.inlineCallbacks | ||||
|     def _async_render_POST(self, request): | ||||
|         content = parse_json_object_from_request(request) | ||||
| 
 | ||||
|         process_id = content["process_id"] | ||||
|         syncing_user_ids = content["syncing_users"] | ||||
| 
 | ||||
|         yield self.presence_handler.update_external_syncs( | ||||
|             process_id, set(syncing_user_ids) | ||||
|         ) | ||||
| 
 | ||||
|         respond_with_json_bytes(request, 200, "{}") | ||||
|  | @ -16,6 +16,7 @@ | |||
| from synapse.http.servlet import parse_integer, parse_string | ||||
| from synapse.http.server import request_handler, finish_request | ||||
| from synapse.replication.pusher_resource import PusherResource | ||||
| from synapse.replication.presence_resource import PresenceResource | ||||
| 
 | ||||
| from twisted.web.resource import Resource | ||||
| from twisted.web.server import NOT_DONE_YET | ||||
|  | @ -115,6 +116,7 @@ class ReplicationResource(Resource): | |||
|         self.clock = hs.get_clock() | ||||
| 
 | ||||
|         self.putChild("remove_pushers", PusherResource(hs)) | ||||
|         self.putChild("syncing_users", PresenceResource(hs)) | ||||
| 
 | ||||
|     def render_GET(self, request): | ||||
|         self._async_render_GET(request) | ||||
|  |  | |||
|  | @ -152,6 +152,7 @@ class SQLBaseStore(object): | |||
| 
 | ||||
|     def __init__(self, hs): | ||||
|         self.hs = hs | ||||
|         self._clock = hs.get_clock() | ||||
|         self._db_pool = hs.get_db_pool() | ||||
| 
 | ||||
|         self._previous_txn_total_time = 0 | ||||
|  |  | |||
|  | @ -264,7 +264,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): | |||
|         ) | ||||
| 
 | ||||
|         new_state = handle_timeout( | ||||
|             state, is_mine=True, user_to_num_current_syncs={}, now=now | ||||
|             state, is_mine=True, syncing_user_ids=set(), now=now | ||||
|         ) | ||||
| 
 | ||||
|         self.assertIsNotNone(new_state) | ||||
|  | @ -282,7 +282,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): | |||
|         ) | ||||
| 
 | ||||
|         new_state = handle_timeout( | ||||
|             state, is_mine=True, user_to_num_current_syncs={}, now=now | ||||
|             state, is_mine=True, syncing_user_ids=set(), now=now | ||||
|         ) | ||||
| 
 | ||||
|         self.assertIsNotNone(new_state) | ||||
|  | @ -300,9 +300,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): | |||
|         ) | ||||
| 
 | ||||
|         new_state = handle_timeout( | ||||
|             state, is_mine=True, user_to_num_current_syncs={ | ||||
|                 user_id: 1, | ||||
|             }, now=now | ||||
|             state, is_mine=True, syncing_user_ids=set([user_id]), now=now | ||||
|         ) | ||||
| 
 | ||||
|         self.assertIsNotNone(new_state) | ||||
|  | @ -321,7 +319,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): | |||
|         ) | ||||
| 
 | ||||
|         new_state = handle_timeout( | ||||
|             state, is_mine=True, user_to_num_current_syncs={}, now=now | ||||
|             state, is_mine=True, syncing_user_ids=set(), now=now | ||||
|         ) | ||||
| 
 | ||||
|         self.assertIsNotNone(new_state) | ||||
|  | @ -340,7 +338,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): | |||
|         ) | ||||
| 
 | ||||
|         new_state = handle_timeout( | ||||
|             state, is_mine=True, user_to_num_current_syncs={}, now=now | ||||
|             state, is_mine=True, syncing_user_ids=set(), now=now | ||||
|         ) | ||||
| 
 | ||||
|         self.assertIsNone(new_state) | ||||
|  | @ -358,7 +356,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): | |||
|         ) | ||||
| 
 | ||||
|         new_state = handle_timeout( | ||||
|             state, is_mine=False, user_to_num_current_syncs={}, now=now | ||||
|             state, is_mine=False, syncing_user_ids=set(), now=now | ||||
|         ) | ||||
| 
 | ||||
|         self.assertIsNotNone(new_state) | ||||
|  | @ -377,7 +375,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): | |||
|         ) | ||||
| 
 | ||||
|         new_state = handle_timeout( | ||||
|             state, is_mine=True, user_to_num_current_syncs={}, now=now | ||||
|             state, is_mine=True, syncing_user_ids=set(), now=now | ||||
|         ) | ||||
| 
 | ||||
|         self.assertIsNotNone(new_state) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 David Baker
						David Baker