From aee9130a83eec7a295071a7e3ea87ce380c4521c Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Wed, 6 May 2020 15:54:58 +0100 Subject: [PATCH 1/5] Stop Auth methods from polling the config on every req. (#7420) --- changelog.d/7420.misc | 1 + synapse/api/auth.py | 83 ++++------------------------ synapse/api/auth_blocking.py | 104 +++++++++++++++++++++++++++++++++++ tests/api/test_auth.py | 36 ++++++------ tests/handlers/test_auth.py | 23 +++++--- tests/handlers/test_sync.py | 13 +++-- tests/test_mau.py | 14 ++++- 7 files changed, 168 insertions(+), 106 deletions(-) create mode 100644 changelog.d/7420.misc create mode 100644 synapse/api/auth_blocking.py diff --git a/changelog.d/7420.misc b/changelog.d/7420.misc new file mode 100644 index 0000000000..e834a9163e --- /dev/null +++ b/changelog.d/7420.misc @@ -0,0 +1 @@ +Prevent methods in `synapse.handlers.auth` from polling the homeserver config every request. \ No newline at end of file diff --git a/synapse/api/auth.py b/synapse/api/auth.py index c5d1eb952b..1ad5ff9410 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -26,16 +26,15 @@ from twisted.internet import defer import synapse.logging.opentracing as opentracing import synapse.types from synapse import event_auth -from synapse.api.constants import EventTypes, LimitBlockingTypes, Membership, UserTypes +from synapse.api.auth_blocking import AuthBlocking +from synapse.api.constants import EventTypes, Membership from synapse.api.errors import ( AuthError, Codes, InvalidClientTokenError, MissingClientTokenError, - ResourceLimitError, ) from synapse.api.room_versions import KNOWN_ROOM_VERSIONS -from synapse.config.server import is_threepid_reserved from synapse.events import EventBase from synapse.types import StateMap, UserID from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache @@ -77,7 +76,11 @@ class Auth(object): self.token_cache = LruCache(CACHE_SIZE_FACTOR * 10000) register_cache("cache", "token_cache", self.token_cache) + self._auth_blocking = AuthBlocking(self.hs) + self._account_validity = hs.config.account_validity + self._track_appservice_user_ips = hs.config.track_appservice_user_ips + self._macaroon_secret_key = hs.config.macaroon_secret_key @defer.inlineCallbacks def check_from_context(self, room_version: str, event, context, do_sig_check=True): @@ -191,7 +194,7 @@ class Auth(object): opentracing.set_tag("authenticated_entity", user_id) opentracing.set_tag("appservice_id", app_service.id) - if ip_addr and self.hs.config.track_appservice_user_ips: + if ip_addr and self._track_appservice_user_ips: yield self.store.insert_client_ip( user_id=user_id, access_token=access_token, @@ -454,7 +457,7 @@ class Auth(object): # access_tokens include a nonce for uniqueness: any value is acceptable v.satisfy_general(lambda c: c.startswith("nonce = ")) - v.verify(macaroon, self.hs.config.macaroon_secret_key) + v.verify(macaroon, self._macaroon_secret_key) def _verify_expiry(self, caveat): prefix = "time < " @@ -663,71 +666,5 @@ class Auth(object): % (user_id, room_id), ) - @defer.inlineCallbacks - def check_auth_blocking(self, user_id=None, threepid=None, user_type=None): - """Checks if the user should be rejected for some external reason, - such as monthly active user limiting or global disable flag - - Args: - user_id(str|None): If present, checks for presence against existing - MAU cohort - - threepid(dict|None): If present, checks for presence against configured - reserved threepid. Used in cases where the user is trying register - with a MAU blocked server, normally they would be rejected but their - threepid is on the reserved list. user_id and - threepid should never be set at the same time. - - user_type(str|None): If present, is used to decide whether to check against - certain blocking reasons like MAU. - """ - - # Never fail an auth check for the server notices users or support user - # This can be a problem where event creation is prohibited due to blocking - if user_id is not None: - if user_id == self.hs.config.server_notices_mxid: - return - if (yield self.store.is_support_user(user_id)): - return - - if self.hs.config.hs_disabled: - raise ResourceLimitError( - 403, - self.hs.config.hs_disabled_message, - errcode=Codes.RESOURCE_LIMIT_EXCEEDED, - admin_contact=self.hs.config.admin_contact, - limit_type=LimitBlockingTypes.HS_DISABLED, - ) - if self.hs.config.limit_usage_by_mau is True: - assert not (user_id and threepid) - - # If the user is already part of the MAU cohort or a trial user - if user_id: - timestamp = yield self.store.user_last_seen_monthly_active(user_id) - if timestamp: - return - - is_trial = yield self.store.is_trial_user(user_id) - if is_trial: - return - elif threepid: - # If the user does not exist yet, but is signing up with a - # reserved threepid then pass auth check - if is_threepid_reserved( - self.hs.config.mau_limits_reserved_threepids, threepid - ): - return - elif user_type == UserTypes.SUPPORT: - # If the user does not exist yet and is of type "support", - # allow registration. Support users are excluded from MAU checks. - return - # Else if there is no room in the MAU bucket, bail - current_mau = yield self.store.get_monthly_active_count() - if current_mau >= self.hs.config.max_mau_value: - raise ResourceLimitError( - 403, - "Monthly Active User Limit Exceeded", - admin_contact=self.hs.config.admin_contact, - errcode=Codes.RESOURCE_LIMIT_EXCEEDED, - limit_type=LimitBlockingTypes.MONTHLY_ACTIVE_USER, - ) + def check_auth_blocking(self, *args, **kwargs): + return self._auth_blocking.check_auth_blocking(*args, **kwargs) diff --git a/synapse/api/auth_blocking.py b/synapse/api/auth_blocking.py new file mode 100644 index 0000000000..5c499b6b4e --- /dev/null +++ b/synapse/api/auth_blocking.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.constants import LimitBlockingTypes, UserTypes +from synapse.api.errors import Codes, ResourceLimitError +from synapse.config.server import is_threepid_reserved + +logger = logging.getLogger(__name__) + + +class AuthBlocking(object): + def __init__(self, hs): + self.store = hs.get_datastore() + + self._server_notices_mxid = hs.config.server_notices_mxid + self._hs_disabled = hs.config.hs_disabled + self._hs_disabled_message = hs.config.hs_disabled_message + self._admin_contact = hs.config.admin_contact + self._max_mau_value = hs.config.max_mau_value + self._limit_usage_by_mau = hs.config.limit_usage_by_mau + self._mau_limits_reserved_threepids = hs.config.mau_limits_reserved_threepids + + @defer.inlineCallbacks + def check_auth_blocking(self, user_id=None, threepid=None, user_type=None): + """Checks if the user should be rejected for some external reason, + such as monthly active user limiting or global disable flag + + Args: + user_id(str|None): If present, checks for presence against existing + MAU cohort + + threepid(dict|None): If present, checks for presence against configured + reserved threepid. Used in cases where the user is trying register + with a MAU blocked server, normally they would be rejected but their + threepid is on the reserved list. user_id and + threepid should never be set at the same time. + + user_type(str|None): If present, is used to decide whether to check against + certain blocking reasons like MAU. + """ + + # Never fail an auth check for the server notices users or support user + # This can be a problem where event creation is prohibited due to blocking + if user_id is not None: + if user_id == self._server_notices_mxid: + return + if (yield self.store.is_support_user(user_id)): + return + + if self._hs_disabled: + raise ResourceLimitError( + 403, + self._hs_disabled_message, + errcode=Codes.RESOURCE_LIMIT_EXCEEDED, + admin_contact=self._admin_contact, + limit_type=LimitBlockingTypes.HS_DISABLED, + ) + if self._limit_usage_by_mau is True: + assert not (user_id and threepid) + + # If the user is already part of the MAU cohort or a trial user + if user_id: + timestamp = yield self.store.user_last_seen_monthly_active(user_id) + if timestamp: + return + + is_trial = yield self.store.is_trial_user(user_id) + if is_trial: + return + elif threepid: + # If the user does not exist yet, but is signing up with a + # reserved threepid then pass auth check + if is_threepid_reserved(self._mau_limits_reserved_threepids, threepid): + return + elif user_type == UserTypes.SUPPORT: + # If the user does not exist yet and is of type "support", + # allow registration. Support users are excluded from MAU checks. + return + # Else if there is no room in the MAU bucket, bail + current_mau = yield self.store.get_monthly_active_count() + if current_mau >= self._max_mau_value: + raise ResourceLimitError( + 403, + "Monthly Active User Limit Exceeded", + admin_contact=self._admin_contact, + errcode=Codes.RESOURCE_LIMIT_EXCEEDED, + limit_type=LimitBlockingTypes.MONTHLY_ACTIVE_USER, + ) diff --git a/tests/api/test_auth.py b/tests/api/test_auth.py index cc0b10e7f6..0bfb86bf1f 100644 --- a/tests/api/test_auth.py +++ b/tests/api/test_auth.py @@ -52,6 +52,10 @@ class AuthTestCase(unittest.TestCase): self.hs.handlers = TestHandlers(self.hs) self.auth = Auth(self.hs) + # AuthBlocking reads from the hs' config on initialization. We need to + # modify its config instead of the hs' + self.auth_blocking = self.auth._auth_blocking + self.test_user = "@foo:bar" self.test_token = b"_test_token_" @@ -321,15 +325,15 @@ class AuthTestCase(unittest.TestCase): @defer.inlineCallbacks def test_blocking_mau(self): - self.hs.config.limit_usage_by_mau = False - self.hs.config.max_mau_value = 50 + self.auth_blocking._limit_usage_by_mau = False + self.auth_blocking._max_mau_value = 50 lots_of_users = 100 small_number_of_users = 1 # Ensure no error thrown yield defer.ensureDeferred(self.auth.check_auth_blocking()) - self.hs.config.limit_usage_by_mau = True + self.auth_blocking._limit_usage_by_mau = True self.store.get_monthly_active_count = Mock( return_value=defer.succeed(lots_of_users) @@ -349,8 +353,8 @@ class AuthTestCase(unittest.TestCase): @defer.inlineCallbacks def test_blocking_mau__depending_on_user_type(self): - self.hs.config.max_mau_value = 50 - self.hs.config.limit_usage_by_mau = True + self.auth_blocking._max_mau_value = 50 + self.auth_blocking._limit_usage_by_mau = True self.store.get_monthly_active_count = Mock(return_value=defer.succeed(100)) # Support users allowed @@ -370,12 +374,12 @@ class AuthTestCase(unittest.TestCase): @defer.inlineCallbacks def test_reserved_threepid(self): - self.hs.config.limit_usage_by_mau = True - self.hs.config.max_mau_value = 1 + self.auth_blocking._limit_usage_by_mau = True + self.auth_blocking._max_mau_value = 1 self.store.get_monthly_active_count = lambda: defer.succeed(2) threepid = {"medium": "email", "address": "reserved@server.com"} unknown_threepid = {"medium": "email", "address": "unreserved@server.com"} - self.hs.config.mau_limits_reserved_threepids = [threepid] + self.auth_blocking._mau_limits_reserved_threepids = [threepid] with self.assertRaises(ResourceLimitError): yield defer.ensureDeferred(self.auth.check_auth_blocking()) @@ -389,8 +393,8 @@ class AuthTestCase(unittest.TestCase): @defer.inlineCallbacks def test_hs_disabled(self): - self.hs.config.hs_disabled = True - self.hs.config.hs_disabled_message = "Reason for being disabled" + self.auth_blocking._hs_disabled = True + self.auth_blocking._hs_disabled_message = "Reason for being disabled" with self.assertRaises(ResourceLimitError) as e: yield defer.ensureDeferred(self.auth.check_auth_blocking()) self.assertEquals(e.exception.admin_contact, self.hs.config.admin_contact) @@ -404,10 +408,10 @@ class AuthTestCase(unittest.TestCase): """ # this should be the default, but we had a bug where the test was doing the wrong # thing, so let's make it explicit - self.hs.config.server_notices_mxid = None + self.auth_blocking._server_notices_mxid = None - self.hs.config.hs_disabled = True - self.hs.config.hs_disabled_message = "Reason for being disabled" + self.auth_blocking._hs_disabled = True + self.auth_blocking._hs_disabled_message = "Reason for being disabled" with self.assertRaises(ResourceLimitError) as e: yield defer.ensureDeferred(self.auth.check_auth_blocking()) self.assertEquals(e.exception.admin_contact, self.hs.config.admin_contact) @@ -416,8 +420,8 @@ class AuthTestCase(unittest.TestCase): @defer.inlineCallbacks def test_server_notices_mxid_special_cased(self): - self.hs.config.hs_disabled = True + self.auth_blocking._hs_disabled = True user = "@user:server" - self.hs.config.server_notices_mxid = user - self.hs.config.hs_disabled_message = "Reason for being disabled" + self.auth_blocking._server_notices_mxid = user + self.auth_blocking._hs_disabled_message = "Reason for being disabled" yield defer.ensureDeferred(self.auth.check_auth_blocking(user)) diff --git a/tests/handlers/test_auth.py b/tests/handlers/test_auth.py index 52c4ac8b11..c01b04e1dc 100644 --- a/tests/handlers/test_auth.py +++ b/tests/handlers/test_auth.py @@ -39,8 +39,13 @@ class AuthTestCase(unittest.TestCase): self.hs.handlers = AuthHandlers(self.hs) self.auth_handler = self.hs.handlers.auth_handler self.macaroon_generator = self.hs.get_macaroon_generator() + # MAU tests - self.hs.config.max_mau_value = 50 + # AuthBlocking reads from the hs' config on initialization. We need to + # modify its config instead of the hs' + self.auth_blocking = self.hs.get_auth()._auth_blocking + self.auth_blocking._max_mau_value = 50 + self.small_number_of_users = 1 self.large_number_of_users = 100 @@ -119,7 +124,7 @@ class AuthTestCase(unittest.TestCase): @defer.inlineCallbacks def test_mau_limits_disabled(self): - self.hs.config.limit_usage_by_mau = False + self.auth_blocking._limit_usage_by_mau = False # Ensure does not throw exception yield defer.ensureDeferred( self.auth_handler.get_access_token_for_user_id( @@ -135,7 +140,7 @@ class AuthTestCase(unittest.TestCase): @defer.inlineCallbacks def test_mau_limits_exceeded_large(self): - self.hs.config.limit_usage_by_mau = True + self.auth_blocking._limit_usage_by_mau = True self.hs.get_datastore().get_monthly_active_count = Mock( return_value=defer.succeed(self.large_number_of_users) ) @@ -159,11 +164,11 @@ class AuthTestCase(unittest.TestCase): @defer.inlineCallbacks def test_mau_limits_parity(self): - self.hs.config.limit_usage_by_mau = True + self.auth_blocking._limit_usage_by_mau = True # If not in monthly active cohort self.hs.get_datastore().get_monthly_active_count = Mock( - return_value=defer.succeed(self.hs.config.max_mau_value) + return_value=defer.succeed(self.auth_blocking._max_mau_value) ) with self.assertRaises(ResourceLimitError): yield defer.ensureDeferred( @@ -173,7 +178,7 @@ class AuthTestCase(unittest.TestCase): ) self.hs.get_datastore().get_monthly_active_count = Mock( - return_value=defer.succeed(self.hs.config.max_mau_value) + return_value=defer.succeed(self.auth_blocking._max_mau_value) ) with self.assertRaises(ResourceLimitError): yield defer.ensureDeferred( @@ -186,7 +191,7 @@ class AuthTestCase(unittest.TestCase): return_value=defer.succeed(self.hs.get_clock().time_msec()) ) self.hs.get_datastore().get_monthly_active_count = Mock( - return_value=defer.succeed(self.hs.config.max_mau_value) + return_value=defer.succeed(self.auth_blocking._max_mau_value) ) yield defer.ensureDeferred( self.auth_handler.get_access_token_for_user_id( @@ -197,7 +202,7 @@ class AuthTestCase(unittest.TestCase): return_value=defer.succeed(self.hs.get_clock().time_msec()) ) self.hs.get_datastore().get_monthly_active_count = Mock( - return_value=defer.succeed(self.hs.config.max_mau_value) + return_value=defer.succeed(self.auth_blocking._max_mau_value) ) yield defer.ensureDeferred( self.auth_handler.validate_short_term_login_token_and_get_user_id( @@ -207,7 +212,7 @@ class AuthTestCase(unittest.TestCase): @defer.inlineCallbacks def test_mau_limits_not_exceeded(self): - self.hs.config.limit_usage_by_mau = True + self.auth_blocking._limit_usage_by_mau = True self.hs.get_datastore().get_monthly_active_count = Mock( return_value=defer.succeed(self.small_number_of_users) diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 4cbe9784ed..e178d7765b 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -30,28 +30,31 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): self.sync_handler = self.hs.get_sync_handler() self.store = self.hs.get_datastore() - def test_wait_for_sync_for_user_auth_blocking(self): + # AuthBlocking reads from the hs' config on initialization. We need to + # modify its config instead of the hs' + self.auth_blocking = self.hs.get_auth()._auth_blocking + def test_wait_for_sync_for_user_auth_blocking(self): user_id1 = "@user1:test" user_id2 = "@user2:test" sync_config = self._generate_sync_config(user_id1) self.reactor.advance(100) # So we get not 0 time - self.hs.config.limit_usage_by_mau = True - self.hs.config.max_mau_value = 1 + self.auth_blocking._limit_usage_by_mau = True + self.auth_blocking._max_mau_value = 1 # Check that the happy case does not throw errors self.get_success(self.store.upsert_monthly_active_user(user_id1)) self.get_success(self.sync_handler.wait_for_sync_for_user(sync_config)) # Test that global lock works - self.hs.config.hs_disabled = True + self.auth_blocking._hs_disabled = True e = self.get_failure( self.sync_handler.wait_for_sync_for_user(sync_config), ResourceLimitError ) self.assertEquals(e.value.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) - self.hs.config.hs_disabled = False + self.auth_blocking._hs_disabled = False sync_config = self._generate_sync_config(user_id2) diff --git a/tests/test_mau.py b/tests/test_mau.py index 1fbe0d51ff..eb159e3ba5 100644 --- a/tests/test_mau.py +++ b/tests/test_mau.py @@ -19,6 +19,7 @@ import json from mock import Mock +from synapse.api.auth_blocking import AuthBlocking from synapse.api.constants import LoginType from synapse.api.errors import Codes, HttpResponseException, SynapseError from synapse.rest.client.v2_alpha import register, sync @@ -45,11 +46,17 @@ class TestMauLimit(unittest.HomeserverTestCase): self.hs.config.limit_usage_by_mau = True self.hs.config.hs_disabled = False self.hs.config.max_mau_value = 2 - self.hs.config.mau_trial_days = 0 self.hs.config.server_notices_mxid = "@server:red" self.hs.config.server_notices_mxid_display_name = None self.hs.config.server_notices_mxid_avatar_url = None self.hs.config.server_notices_room_name = "Test Server Notice Room" + self.hs.config.mau_trial_days = 0 + + # AuthBlocking reads config options during hs creation. Recreate the + # hs' copy of AuthBlocking after we've updated config values above + self.auth_blocking = AuthBlocking(self.hs) + self.hs.get_auth()._auth_blocking = self.auth_blocking + return self.hs def test_simple_deny_mau(self): @@ -121,6 +128,7 @@ class TestMauLimit(unittest.HomeserverTestCase): self.assertEqual(e.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) def test_trial_users_cant_come_back(self): + self.auth_blocking._mau_trial_days = 1 self.hs.config.mau_trial_days = 1 # We should be able to register more than the limit initially @@ -169,8 +177,8 @@ class TestMauLimit(unittest.HomeserverTestCase): self.assertEqual(e.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) def test_tracked_but_not_limited(self): - self.hs.config.max_mau_value = 1 # should not matter - self.hs.config.limit_usage_by_mau = False + self.auth_blocking._max_mau_value = 1 # should not matter + self.auth_blocking._limit_usage_by_mau = False self.hs.config.mau_stats_only = True # Simply being able to create 2 users indicates that the From e053c86a96609382dec3a7312d9ca1012a412844 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 6 May 2020 17:36:46 +0100 Subject: [PATCH 2/5] Make redis go faster with hiredis (#7439) For the record, the reason we need this is as follows: each RDATA command comes down the redis pipe as a subscription message. txredisapi as written needs at least three reactor ticks to read each subscription message from the tcp buffer. Hence, once the process gets loaded, it starts getting behind, and eventually redis knifes the connection. it then takes ages for the master to work its way through the backlog, before it reconnects again, during which any commands from any workers are dropped. --- changelog.d/7439.feature | 1 + synapse/python_dependencies.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) create mode 100644 changelog.d/7439.feature diff --git a/changelog.d/7439.feature b/changelog.d/7439.feature new file mode 100644 index 0000000000..ce6140fdd1 --- /dev/null +++ b/changelog.d/7439.feature @@ -0,0 +1 @@ +Add support for running replication over Redis when using workers. diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 733c51b758..39c99a2802 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -98,7 +98,9 @@ CONDITIONAL_REQUIREMENTS = { "sentry": ["sentry-sdk>=0.7.2"], "opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"], "jwt": ["pyjwt>=1.6.4"], - "redis": ["txredisapi>=1.4.7"], + # hiredis is not a *strict* dependency, but it makes things much faster. + # (if it is not installed, we fall back to slow code.) + "redis": ["txredisapi>=1.4.7", "hiredis"], } ALL_OPTIONAL_REQUIREMENTS = set() # type: Set[str] From 4162c39dcf9aa9a2d393e5ab3257596c3aa717c1 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Wed, 6 May 2020 20:21:38 +0100 Subject: [PATCH 3/5] Port group attestation renewal slow down from matrix-org-hotfixes (#7442) --- changelog.d/7442.misc | 1 + synapse/groups/attestations.py | 6 ++---- 2 files changed, 3 insertions(+), 4 deletions(-) create mode 100644 changelog.d/7442.misc diff --git a/changelog.d/7442.misc b/changelog.d/7442.misc new file mode 100644 index 0000000000..a8fd5ad803 --- /dev/null +++ b/changelog.d/7442.misc @@ -0,0 +1 @@ +Run group attestation renewal in series rather than parallel for performance. \ No newline at end of file diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 1eec3874b6..27b0c02655 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -46,7 +46,6 @@ from twisted.internet import defer from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import get_domain_from_id -from synapse.util.async_helpers import yieldable_gather_results logger = logging.getLogger(__name__) @@ -208,6 +207,5 @@ class GroupAttestionRenewer(object): "Error renewing attestation of %r in %r", user_id, group_id ) - await yieldable_gather_results( - _renew_attestation, ((row["group_id"], row["user_id"]) for row in rows) - ) + for row in rows: + await _renew_attestation((row["group_id"], row["user_id"])) From d7c2df2fa3691069cc4fdeabd5028e246882d70c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 6 May 2020 16:43:39 -0400 Subject: [PATCH 4/5] Improve per-block CPU and DB usage metrics (#7426) --- changelog.d/7426.misc | 1 + synapse/logging/context.py | 38 ++++++++++++++++++++++++++------------ 2 files changed, 27 insertions(+), 12 deletions(-) create mode 100644 changelog.d/7426.misc diff --git a/changelog.d/7426.misc b/changelog.d/7426.misc new file mode 100644 index 0000000000..731f4dcb52 --- /dev/null +++ b/changelog.d/7426.misc @@ -0,0 +1 @@ +Clean up some LoggingContext code. diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 856534e91a..8b9c4e38bd 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -431,15 +431,7 @@ class LoggingContext(object): return utime_delta, stime_delta = self._get_cputime(rusage) - self._resource_usage.ru_utime += utime_delta - self._resource_usage.ru_stime += stime_delta - - # if we have a parent, pass our CPU usage stats on - if self.parent_context: - self.parent_context._resource_usage += self._resource_usage - - # reset them in case we get entered again - self._resource_usage.reset() + self.add_cputime(utime_delta, stime_delta) finally: self.usage_start = None @@ -497,30 +489,52 @@ class LoggingContext(object): return utime_delta, stime_delta + def add_cputime(self, utime_delta: float, stime_delta: float) -> None: + """Update the CPU time usage of this context (and any parents, recursively). + + Args: + utime_delta: additional user time, in seconds, spent in this context. + stime_delta: additional system time, in seconds, spent in this context. + """ + self._resource_usage.ru_utime += utime_delta + self._resource_usage.ru_stime += stime_delta + if self.parent_context: + self.parent_context.add_cputime(utime_delta, stime_delta) + def add_database_transaction(self, duration_sec: float) -> None: + """Record the use of a database transaction and the length of time it took. + + Args: + duration_sec: The number of seconds the database transaction took. + """ if duration_sec < 0: raise ValueError("DB txn time can only be non-negative") self._resource_usage.db_txn_count += 1 self._resource_usage.db_txn_duration_sec += duration_sec + if self.parent_context: + self.parent_context.add_database_transaction(duration_sec) def add_database_scheduled(self, sched_sec: float) -> None: """Record a use of the database pool Args: - sched_sec (float): number of seconds it took us to get a - connection + sched_sec: number of seconds it took us to get a connection """ if sched_sec < 0: raise ValueError("DB scheduling time can only be non-negative") self._resource_usage.db_sched_duration_sec += sched_sec + if self.parent_context: + self.parent_context.add_database_scheduled(sched_sec) def record_event_fetch(self, event_count: int) -> None: """Record a number of events being fetched from the db Args: - event_count (int): number of events being fetched + event_count: number of events being fetched """ self._resource_usage.evt_db_fetch_count += event_count + if self.parent_context: + self.parent_context.record_event_fetch(event_count) class LoggingContextFilter(logging.Filter): From d9b8d274949df7356e880a67d3aac1b25613ab1f Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 7 May 2020 11:35:23 +0200 Subject: [PATCH 5/5] Add a configuration setting for the dummy event threshold (#7422) Add dummy_events_threshold which allows configuring the number of forward extremities a room needs for Synapse to send forward extremities in it. --- changelog.d/7422.feature | 1 + docs/sample_config.yaml | 12 ++++++++++++ synapse/config/server.py | 15 +++++++++++++++ synapse/handlers/message.py | 4 +++- 4 files changed, 31 insertions(+), 1 deletion(-) create mode 100644 changelog.d/7422.feature diff --git a/changelog.d/7422.feature b/changelog.d/7422.feature new file mode 100644 index 0000000000..d6d5bb2169 --- /dev/null +++ b/changelog.d/7422.feature @@ -0,0 +1 @@ +Add a configuration setting to tweak the threshold for dummy events. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index fc970986c6..98ead7dc0e 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -253,6 +253,18 @@ listeners: # bind_addresses: ['::1', '127.0.0.1'] # type: manhole +# Forward extremities can build up in a room due to networking delays between +# homeservers. Once this happens in a large room, calculation of the state of +# that room can become quite expensive. To mitigate this, once the number of +# forward extremities reaches a given threshold, Synapse will send an +# org.matrix.dummy_event event, which will reduce the forward extremities +# in the room. +# +# This setting defines the threshold (i.e. number of forward extremities in the +# room) at which dummy events are sent. The default value is 10. +# +#dummy_events_threshold: 5 + ## Homeserver blocking ## diff --git a/synapse/config/server.py b/synapse/config/server.py index c6d58effd4..6d88231843 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -505,6 +505,9 @@ class ServerConfig(Config): "cleanup_extremities_with_dummy_events", True ) + # The number of forward extremities in a room needed to send a dummy event. + self.dummy_events_threshold = config.get("dummy_events_threshold", 10) + self.enable_ephemeral_messages = config.get("enable_ephemeral_messages", False) # Inhibits the /requestToken endpoints from returning an error that might leak @@ -823,6 +826,18 @@ class ServerConfig(Config): # bind_addresses: ['::1', '127.0.0.1'] # type: manhole + # Forward extremities can build up in a room due to networking delays between + # homeservers. Once this happens in a large room, calculation of the state of + # that room can become quite expensive. To mitigate this, once the number of + # forward extremities reaches a given threshold, Synapse will send an + # org.matrix.dummy_event event, which will reduce the forward extremities + # in the room. + # + # This setting defines the threshold (i.e. number of forward extremities in the + # room) at which dummy events are sent. The default value is 10. + # + #dummy_events_threshold: 5 + ## Homeserver blocking ## diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a324f09340..a622a600b4 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -419,6 +419,8 @@ class EventCreationHandler(object): self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages + self._dummy_events_threshold = hs.config.dummy_events_threshold + @defer.inlineCallbacks def create_event( self, @@ -1085,7 +1087,7 @@ class EventCreationHandler(object): """ self._expire_rooms_to_exclude_from_dummy_event_insertion() room_ids = await self.store.get_rooms_with_many_extremities( - min_count=10, + min_count=self._dummy_events_threshold, limit=5, room_id_filter=self._rooms_to_exclude_from_dummy_event_insertion.keys(), )