Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes

matrix-org-hotfixes
Erik Johnston 2023-08-23 09:41:34 +01:00
commit 144cf227ca
56 changed files with 2026 additions and 440 deletions

8
Cargo.lock generated
View File

@ -332,18 +332,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "serde"
version = "1.0.183"
version = "1.0.184"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c"
checksum = "2c911f4b04d7385c9035407a4eff5903bf4fe270fa046fda448b69e797f4fff0"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.183"
version = "1.0.184"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816"
checksum = "c1df27f5b29406ada06609b2e2f77fb34f6dbb104a457a671cc31dbed237e09e"
dependencies = [
"proc-macro2",
"quote",

View File

@ -0,0 +1 @@
Implements a task scheduler for resumable potentially long running tasks.

View File

@ -0,0 +1 @@
Allow specifying `client_secret_path` as alternative to `client_secret` for OIDC providers. This avoids leaking the client secret in the homeserver config. Contributed by @Ma27.

View File

@ -0,0 +1 @@
Add an `admins` query parameter to the [List Accounts](https://matrix-org.github.io/synapse/v1.91/admin_api/user_admin_api.html#list-accounts) [admin API](https://matrix-org.github.io/synapse/v1.91/usage/administration/admin_api/index.html), to include only admins or to exclude admins in user queries.

1
changelog.d/16116.bugfix Normal file
View File

@ -0,0 +1 @@
Fix performance of state resolutions for large, old rooms that did not have the full auth chain persisted.

1
changelog.d/16125.misc Normal file
View File

@ -0,0 +1 @@
Add an admin endpoint to allow authorizing server to signal token revocations.

1
changelog.d/16127.bugfix Normal file
View File

@ -0,0 +1 @@
User constent and 3-PID changes capability cannot be enabled when using experimental [MSC3861](https://github.com/matrix-org/matrix-spec-proposals/pull/3861) support.

1
changelog.d/16131.misc Normal file
View File

@ -0,0 +1 @@
Add response time metrics for introspection requests for delegated auth.

1
changelog.d/16132.misc Normal file
View File

@ -0,0 +1 @@
MSC3861: allow impersonation by an admin user using `_oidc_admin_impersonate_user_id` query parameter.

1
changelog.d/16134.bugfix Normal file
View File

@ -0,0 +1 @@
User constent and 3-PID changes capability cannot be enabled when using experimental [MSC3861](https://github.com/matrix-org/matrix-spec-proposals/pull/3861) support.

1
changelog.d/16148.bugfix Normal file
View File

@ -0,0 +1 @@
Fix performance degredation when there are a lot of in-flight replication requests.

1
changelog.d/16149.misc Normal file
View File

@ -0,0 +1 @@
Increase performance of read/write locks.

1
changelog.d/16150.misc Normal file
View File

@ -0,0 +1 @@
Clean-up calling `setup_background_tasks` in unit tests.

1
changelog.d/16152.misc Normal file
View File

@ -0,0 +1 @@
Raised the poetry-core version cap to 1.7.0.

1
changelog.d/16156.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a bug introduced in 1.87 where synapse would send an excessive amount of federation requests to servers which have been offline for a long time. Contributed by Nico.

1
changelog.d/16157.misc Normal file
View File

@ -0,0 +1 @@
Fix assertion in user directory unit tests.

1
changelog.d/16158.misc Normal file
View File

@ -0,0 +1 @@
Improve presence tests.

1
changelog.d/16159.misc Normal file
View File

@ -0,0 +1 @@
Reduce scope of locks when paginating to alleviate DB contention.

View File

@ -219,6 +219,8 @@ The following parameters should be set in the URL:
**or** displaynames that contain this value.
- `guests` - string representing a bool - Is optional and if `false` will **exclude** guest users.
Defaults to `true` to include guest users.
- `admins` - Optional flag to filter admins. If `true`, only admins are queried. If `false`, admins are excluded from
the query. When the flag is absent (the default), **both** admins and non-admins are included in the search results.
- `deactivated` - string representing a bool - Is optional and if `true` will **include** deactivated users.
Defaults to `false` to exclude deactivated users.
- `limit` - string representing a positive integer - Is optional but is used for pagination,

View File

@ -3204,6 +3204,14 @@ Options for each entry include:
* `client_secret`: oauth2 client secret to use. May be omitted if
`client_secret_jwt_key` is given, or if `client_auth_method` is 'none'.
Must be omitted if `client_secret_path` is specified.
* `client_secret_path`: path to the oauth2 client secret to use. With that
it's not necessary to leak secrets into the config file itself.
Mutually exclusive with `client_secret`. Can be omitted if
`client_secret_jwt_key` is specified.
*Added in Synapse 1.91.0.*
* `client_secret_jwt_key`: Alternative to client_secret: details of a key used
to create a JSON Web Token to be used as an OAuth2 client secret. If

182
poetry.lock generated
View File

@ -397,13 +397,13 @@ files = [
[[package]]
name = "click"
version = "8.1.6"
version = "8.1.7"
description = "Composable command line interface toolkit"
optional = false
python-versions = ">=3.7"
files = [
{file = "click-8.1.6-py3-none-any.whl", hash = "sha256:fa244bb30b3b5ee2cae3da8f55c9e5e0c0e86093306301fb418eb9dc40fbded5"},
{file = "click-8.1.6.tar.gz", hash = "sha256:48ee849951919527a045bfe3bf7baa8a959c423134e1a5b98c05c20ba75a1cbd"},
{file = "click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28"},
{file = "click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de"},
]
[package.dependencies]
@ -726,89 +726,89 @@ files = [
[[package]]
name = "ijson"
version = "3.2.1"
version = "3.2.3"
description = "Iterative JSON parser with standard Python iterator interfaces"
optional = false
python-versions = "*"
files = [
{file = "ijson-3.2.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:6f827f6961f093e1055a2be0c3137f0e7d667979da455ac9648f72d4a2bb8970"},
{file = "ijson-3.2.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b6e51f4497065cd0d09f5e906cd538a8d22609eab716e3c883769acf147ab1b6"},
{file = "ijson-3.2.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:f022686c40bff3e340627a5a0c9212718d529e787ada3b76ba546d47a9ecdbbd"},
{file = "ijson-3.2.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e4105c15a13fa1dc24ebd3bf2e679fa14dcbfcc48bc39138a0fa3f4ddf6cc09b"},
{file = "ijson-3.2.1-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:404423e666f185dfb753ddc92705c84dffdc4cc872aaf825bbe0607893cb5b02"},
{file = "ijson-3.2.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:39e71f32830827cf21d0233a814092e5a23668e18f52eca5cac4f670d9df1240"},
{file = "ijson-3.2.1-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:43af7ed5292caa1452747e2b62485b6c0ece4bcbc5bf6f2758abd547e4124a14"},
{file = "ijson-3.2.1-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:e805aa6897a11b0f73f1f6bca078981df8960aeeccf527a214f240409c742bab"},
{file = "ijson-3.2.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:5b2df0bd84889e9017a670328fe3e82ec509fd6744c7ac2c99c7ee2300d76afa"},
{file = "ijson-3.2.1-cp310-cp310-win32.whl", hash = "sha256:675259c7ea7f51ffaf8cb9e79bf875e28bb09622892943f4f415588fd7ab7bec"},
{file = "ijson-3.2.1-cp310-cp310-win_amd64.whl", hash = "sha256:90d4b2eb771a3585c8186820fe50e3282ef62477b865e765a50a8295674abeac"},
{file = "ijson-3.2.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:fc581a61e210bf6013c1fa6536566e51127be1cfbd69539b63d8b813206d2fe0"},
{file = "ijson-3.2.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:75cdf7ad4c00a8f5ac94ff27e3b7c1bf5ac463f125bca2be1744c5bc9600db5c"},
{file = "ijson-3.2.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:85a2bf4636ace4d92e7c5d857a1c5694f42407c868953cf2927f18127bcd0d58"},
{file = "ijson-3.2.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9fe0cb66e7dd4aa11da5fff60bdf5ee04819a5e6a57acf7ca12c65f7fc009afc"},
{file = "ijson-3.2.1-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c6f7957ad38cb714378944032f2c2ee9c6531b5b0b38c5ccd08cedbb0ceddd02"},
{file = "ijson-3.2.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:13283d264cca8a63e5bad91e82eec39711e95893e7e8d4a419799a8c5f85203a"},
{file = "ijson-3.2.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:12c24cde850fe79bc806be0e9fc38b47dd5ac0a223070ccb12e9b695425e2936"},
{file = "ijson-3.2.1-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:2ce8eed838e5a0791cb5948117b5453f2b3b3c28d93d06ee2bbf2c198c47881c"},
{file = "ijson-3.2.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:b81c2589f191b0dc741f532be00b4bea617297dd9698431c8053e2d28272d4db"},
{file = "ijson-3.2.1-cp311-cp311-win32.whl", hash = "sha256:ba2beac56ac96f728d0f2430e4c667c66819a423d321bb9db9ebdebd803e1b5b"},
{file = "ijson-3.2.1-cp311-cp311-win_amd64.whl", hash = "sha256:c71614ed4bbc6a32ff1e42d7ce92a176fb67d658913343792d2c4567aa130817"},
{file = "ijson-3.2.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:683fc8b0ea085e75ea34044fdc70649b37367d494f132a2bd1e59d7135054d89"},
{file = "ijson-3.2.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:deeaecec2f4e20e8bec20b0a5cdc34daebe7903f2e700f7dcaef68b5925d35ea"},
{file = "ijson-3.2.1-cp36-cp36m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:11923ac3188877f19dbb7051f7345202701cc39bf8e5ac44f8ae536c9eca8c82"},
{file = "ijson-3.2.1-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:400deefcdae21e90fc39c1dcfc6ba2df24537e8c65bd57b763ed5256b73ba64d"},
{file = "ijson-3.2.1-cp36-cp36m-musllinux_1_1_aarch64.whl", hash = "sha256:56bc4bad53770710a3a91944fe640fdeb269987a14352b74ebbad2aa55801c00"},
{file = "ijson-3.2.1-cp36-cp36m-musllinux_1_1_i686.whl", hash = "sha256:f5a179523e085126844c6161aabcd193dbb5747bd01fadb68e92abf048f32ec9"},
{file = "ijson-3.2.1-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:ee24655986e4415fbb7a0cf51445fff3072ceac0e219f4bbbd5c53535a3c5680"},
{file = "ijson-3.2.1-cp36-cp36m-win32.whl", hash = "sha256:4a5c672b0540005c1bb0bba97aa559a87a2e4ee409fc68e2f5ba5b30f009ac99"},
{file = "ijson-3.2.1-cp36-cp36m-win_amd64.whl", hash = "sha256:cfaf1d89b0e122e69c87a15db6d6f44feb9db96d2af7fe88cdc464177a257b5d"},
{file = "ijson-3.2.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:1cbd052eb67c1b3611f25974ba967886e89391faaf55afec93808c19f06ca612"},
{file = "ijson-3.2.1-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f13ffc491886e5d7bde7d68712d168bce0141b2a918db1164bc8599c0123e293"},
{file = "ijson-3.2.1-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:bc4c4fc6bafc777f8422fe36edb1cbd72a13cb29695893a064c9c95776a4bdf9"},
{file = "ijson-3.2.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a42fcb2bf9748c26f004690b2feb6e13e4875bb7c9d83535f887c21e0a982a7c"},
{file = "ijson-3.2.1-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:0c92f7bc2f3a947c2ba7f7aa48382c36079f8259c930e81d9164341f9b853c45"},
{file = "ijson-3.2.1-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:fd497042db562379339660e787bc8679ed3abaa740768d39bc3746e769e7c7a5"},
{file = "ijson-3.2.1-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:7d61c7cd8ddd75dcef818ff5a111a31b902a6a0e410ee0c2b2ecaa6dac92658a"},
{file = "ijson-3.2.1-cp37-cp37m-win32.whl", hash = "sha256:36caf624d263fc40e7e805d759d09ea368d8cf497aecb3241ac2f0a286ad8eca"},
{file = "ijson-3.2.1-cp37-cp37m-win_amd64.whl", hash = "sha256:32f9ed25ff80942e433119600bca13b86a8f9b8b0966edbc1d91a48ccbdd4d54"},
{file = "ijson-3.2.1-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:e89bbd747140eac3a3c9e7e5835b90d85c4a02763fc5134861bfc1ea03b66ae7"},
{file = "ijson-3.2.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:d69b4b1d509de36ec42a0e4af30ede39fb754e4039b2928ef7282ebc2125ffdd"},
{file = "ijson-3.2.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:e7feb0771f50deabe6ce85b210fa9e005843d3d3c60fb3315d69e1f9d0d75e0c"},
{file = "ijson-3.2.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5fd8148a363888054ff06eaaa1103f2f98720ab39666084a214e4fedfc13cf64"},
{file = "ijson-3.2.1-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:598638dcc5141e9ae269903901877103f5362e0db4443e34721df8f8d34577b4"},
{file = "ijson-3.2.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e979190b7d0fabca20d6b7224ab1c1aa461ad1ab72ba94f1bb1e5894cd59f342"},
{file = "ijson-3.2.1-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:bc810eb80b4f486c7957201ba2a53f53ddc9b3233af67e4359e29371bf04883b"},
{file = "ijson-3.2.1-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:26e758584611dfe826dd18ffd94dc0d8a062ce56e41674ad3bfa371c7b78c4b5"},
{file = "ijson-3.2.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:24e9ae5b35b85ea094b6c36495bc856089254aed6a48bada8d7eec5a04f74439"},
{file = "ijson-3.2.1-cp38-cp38-win32.whl", hash = "sha256:4b5dc7b5b4b8cb3087d188f37911cd67e26672d33d3571e73440de3f0a86f7e6"},
{file = "ijson-3.2.1-cp38-cp38-win_amd64.whl", hash = "sha256:1af94ff40609270bbb3eac47e072582bb578f5023fac8408cccd80fe5892d221"},
{file = "ijson-3.2.1-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:2dda67affceebc52c8bc5fe72c3a4a1e338e4d4b0497dbac5089c2d3862df214"},
{file = "ijson-3.2.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:bd780303ddfedc8d57cdb9f2d53a8cea2f2f4a6fb857bf8fe5a0c3ab1d4ca901"},
{file = "ijson-3.2.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:4fbab6af1bab88a8e46beda08cf44610eed0adb8d157a1a60b4bb6c3a121c6de"},
{file = "ijson-3.2.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c97a07988a1e0ce2bc8e8a62eb5f25195a3bd58a939ac353cbc6018a548cc08d"},
{file = "ijson-3.2.1-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a65671a6826ae723837143914c823ad7bcc0d1a3e38d87c71df897a2556fb48f"},
{file = "ijson-3.2.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a1806372008bbed9ee92db5747e38c047fa1c4ee89cb2dd5daaa57feb46ce50a"},
{file = "ijson-3.2.1-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:91e5a8e96f78a59e2520078c227a4fec5bf91c13adeded9e33fb13981cb823c3"},
{file = "ijson-3.2.1-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:1f820fce8ef093718f2319ff6f1322390664659b783775919dadccb1b470153d"},
{file = "ijson-3.2.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:bca3e8c91a1076a20620dbaa6a2848772b0e8a4055e86d42d3fa39221b53ed1a"},
{file = "ijson-3.2.1-cp39-cp39-win32.whl", hash = "sha256:de87f137b7438d43840f4339a37d4e6a58c987f4bb2a70609969f854f8ae20f3"},
{file = "ijson-3.2.1-cp39-cp39-win_amd64.whl", hash = "sha256:0caebb8350b47266a58b766ec08e1de441d6d160702c428b5cf7504d93c832c4"},
{file = "ijson-3.2.1-pp37-pypy37_pp73-macosx_10_9_x86_64.whl", hash = "sha256:37389785c1abd27fcc24800fcfa9a6b1022743413e4056507fd32356b623ff33"},
{file = "ijson-3.2.1-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4b364b82231d51cbeae52468c3b27e8a042e544ab764c8f3975e912cf010603f"},
{file = "ijson-3.2.1-pp37-pypy37_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0a5999d0ec28a8ec47cf20c736fd4f895dc077bf6441bf237b00b074315a295d"},
{file = "ijson-3.2.1-pp37-pypy37_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8bd481857a39305517fb6f1313d558c2dc4e78c9e9384cc5bc1c3e28f1afbedf"},
{file = "ijson-3.2.1-pp37-pypy37_pp73-win_amd64.whl", hash = "sha256:545f62f12f89350d4d73f2a779cb269198ae578fac080085a1927148b803e602"},
{file = "ijson-3.2.1-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:4d5622505d01c2f3d7b9638c1eb8c747eb550936b505225893704289ff28576f"},
{file = "ijson-3.2.1-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:20293bb36423b129fad3753858ccf7b2ccb5b2c0d3759efe810d0b9d79633a7e"},
{file = "ijson-3.2.1-pp38-pypy38_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7cd8a4921b852fd2cb5b0c985540c97ff6893139a57fe7121d510ec5d1c0ca44"},
{file = "ijson-3.2.1-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc902ff1ae1efed7d526294d7a9dd3df66d29b2cdc05fb5479838fef1327a534"},
{file = "ijson-3.2.1-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:2925a7978d8170146a9cb49a15a982b71fbbf21980bf2e16cd90c528545b7c02"},
{file = "ijson-3.2.1-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:c21c6509f6944939399f3630c5dc424d30d71d375f6cd58f9af56158fdf7251c"},
{file = "ijson-3.2.1-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f5729fc7648bc972d70922d7dad15459cca3a9e5ed0328eb9ae3ffa004066194"},
{file = "ijson-3.2.1-pp39-pypy39_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:805a2d5ed5a15d60327bc9347f2d4125ab621fb18071db98b1c598f1ee99e8f1"},
{file = "ijson-3.2.1-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3d0220a4b6c63f44589e429157174e3f4b8d1e534d5fb82bdb43a7f8dd77ae4b"},
{file = "ijson-3.2.1-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:271d9b7c083f65c58ff0afd9dbb5d2f3d445f734632aebfef4a261b0a337abdb"},
{file = "ijson-3.2.1.tar.gz", hash = "sha256:8574bf19f31fab870488769ad919a80f130825236ac8bde9a733f69c2961d7a7"},
{file = "ijson-3.2.3-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:0a4ae076bf97b0430e4e16c9cb635a6b773904aec45ed8dcbc9b17211b8569ba"},
{file = "ijson-3.2.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:cfced0a6ec85916eb8c8e22415b7267ae118eaff2a860c42d2cc1261711d0d31"},
{file = "ijson-3.2.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:0b9d1141cfd1e6d6643aa0b4876730d0d28371815ce846d2e4e84a2d4f471cf3"},
{file = "ijson-3.2.3-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9e0a27db6454edd6013d40a956d008361aac5bff375a9c04ab11fc8c214250b5"},
{file = "ijson-3.2.3-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3c0d526ccb335c3c13063c273637d8611f32970603dfb182177b232d01f14c23"},
{file = "ijson-3.2.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:545a30b3659df2a3481593d30d60491d1594bc8005f99600e1bba647bb44cbb5"},
{file = "ijson-3.2.3-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:9680e37a10fedb3eab24a4a7e749d8a73f26f1a4c901430e7aa81b5da15f7307"},
{file = "ijson-3.2.3-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:2a80c0bb1053055d1599e44dc1396f713e8b3407000e6390add72d49633ff3bb"},
{file = "ijson-3.2.3-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:f05ed49f434ce396ddcf99e9fd98245328e99f991283850c309f5e3182211a79"},
{file = "ijson-3.2.3-cp310-cp310-win32.whl", hash = "sha256:b4eb2304573c9fdf448d3fa4a4fdcb727b93002b5c5c56c14a5ffbbc39f64ae4"},
{file = "ijson-3.2.3-cp310-cp310-win_amd64.whl", hash = "sha256:923131f5153c70936e8bd2dd9dcfcff43c67a3d1c789e9c96724747423c173eb"},
{file = "ijson-3.2.3-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:904f77dd3d87736ff668884fe5197a184748eb0c3e302ded61706501d0327465"},
{file = "ijson-3.2.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:0974444c1f416e19de1e9f567a4560890095e71e81623c509feff642114c1e53"},
{file = "ijson-3.2.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:c1a4b8eb69b6d7b4e94170aa991efad75ba156b05f0de2a6cd84f991def12ff9"},
{file = "ijson-3.2.3-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d052417fd7ce2221114f8d3b58f05a83c1a2b6b99cafe0b86ac9ed5e2fc889df"},
{file = "ijson-3.2.3-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7b8064a85ec1b0beda7dd028e887f7112670d574db606f68006c72dd0bb0e0e2"},
{file = "ijson-3.2.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eaac293853f1342a8d2a45ac1f723c860f700860e7743fb97f7b76356df883a8"},
{file = "ijson-3.2.3-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:6c32c18a934c1dc8917455b0ce478fd7a26c50c364bd52c5a4fb0fc6bb516af7"},
{file = "ijson-3.2.3-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:713a919e0220ac44dab12b5fed74f9130f3480e55e90f9d80f58de129ea24f83"},
{file = "ijson-3.2.3-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:4a3a6a2fbbe7550ffe52d151cf76065e6b89cfb3e9d0463e49a7e322a25d0426"},
{file = "ijson-3.2.3-cp311-cp311-win32.whl", hash = "sha256:6a4db2f7fb9acfb855c9ae1aae602e4648dd1f88804a0d5cfb78c3639bcf156c"},
{file = "ijson-3.2.3-cp311-cp311-win_amd64.whl", hash = "sha256:ccd6be56335cbb845f3d3021b1766299c056c70c4c9165fb2fbe2d62258bae3f"},
{file = "ijson-3.2.3-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:eeb286639649fb6bed37997a5e30eefcacddac79476d24128348ec890b2a0ccb"},
{file = "ijson-3.2.3-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:396338a655fb9af4ac59dd09c189885b51fa0eefc84d35408662031023c110d1"},
{file = "ijson-3.2.3-cp36-cp36m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0e0243d166d11a2a47c17c7e885debf3b19ed136be2af1f5d1c34212850236ac"},
{file = "ijson-3.2.3-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:85afdb3f3a5d0011584d4fa8e6dccc5936be51c27e84cd2882fe904ca3bd04c5"},
{file = "ijson-3.2.3-cp36-cp36m-musllinux_1_1_aarch64.whl", hash = "sha256:4fc35d569eff3afa76bfecf533f818ecb9390105be257f3f83c03204661ace70"},
{file = "ijson-3.2.3-cp36-cp36m-musllinux_1_1_i686.whl", hash = "sha256:455d7d3b7a6aacfb8ab1ebcaf697eedf5be66e044eac32508fccdc633d995f0e"},
{file = "ijson-3.2.3-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:c63f3d57dbbac56cead05b12b81e8e1e259f14ce7f233a8cbe7fa0996733b628"},
{file = "ijson-3.2.3-cp36-cp36m-win32.whl", hash = "sha256:a4d7fe3629de3ecb088bff6dfe25f77be3e8261ed53d5e244717e266f8544305"},
{file = "ijson-3.2.3-cp36-cp36m-win_amd64.whl", hash = "sha256:96190d59f015b5a2af388a98446e411f58ecc6a93934e036daa75f75d02386a0"},
{file = "ijson-3.2.3-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:35194e0b8a2bda12b4096e2e792efa5d4801a0abb950c48ade351d479cd22ba5"},
{file = "ijson-3.2.3-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d1053fb5f0b010ee76ca515e6af36b50d26c1728ad46be12f1f147a835341083"},
{file = "ijson-3.2.3-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:211124cff9d9d139dd0dfced356f1472860352c055d2481459038b8205d7d742"},
{file = "ijson-3.2.3-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:92dc4d48e9f6a271292d6079e9fcdce33c83d1acf11e6e12696fb05c5889fe74"},
{file = "ijson-3.2.3-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:3dcc33ee56f92a77f48776014ddb47af67c33dda361e84371153c4f1ed4434e1"},
{file = "ijson-3.2.3-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:98c6799925a5d1988da4cd68879b8eeab52c6e029acc45e03abb7921a4715c4b"},
{file = "ijson-3.2.3-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:4252e48c95cd8ceefc2caade310559ab61c37d82dfa045928ed05328eb5b5f65"},
{file = "ijson-3.2.3-cp37-cp37m-win32.whl", hash = "sha256:644f4f03349ff2731fd515afd1c91b9e439e90c9f8c28292251834154edbffca"},
{file = "ijson-3.2.3-cp37-cp37m-win_amd64.whl", hash = "sha256:ba33c764afa9ecef62801ba7ac0319268a7526f50f7601370d9f8f04e77fc02b"},
{file = "ijson-3.2.3-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:4b2ec8c2a3f1742cbd5f36b65e192028e541b5fd8c7fd97c1fc0ca6c427c704a"},
{file = "ijson-3.2.3-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:7dc357da4b4ebd8903e77dbcc3ce0555ee29ebe0747c3c7f56adda423df8ec89"},
{file = "ijson-3.2.3-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:bcc51c84bb220ac330122468fe526a7777faa6464e3b04c15b476761beea424f"},
{file = "ijson-3.2.3-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f8d54b624629f9903005c58d9321a036c72f5c212701bbb93d1a520ecd15e370"},
{file = "ijson-3.2.3-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d6ea7c7e3ec44742e867c72fd750c6a1e35b112f88a917615332c4476e718d40"},
{file = "ijson-3.2.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:916acdc5e504f8b66c3e287ada5d4b39a3275fc1f2013c4b05d1ab9933671a6c"},
{file = "ijson-3.2.3-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:81815b4184b85ce124bfc4c446d5f5e5e643fc119771c5916f035220ada29974"},
{file = "ijson-3.2.3-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:b49fd5fe1cd9c1c8caf6c59f82b08117dd6bea2ec45b641594e25948f48f4169"},
{file = "ijson-3.2.3-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:86b3c91fdcb8ffb30556c9669930f02b7642de58ca2987845b04f0d7fe46d9a8"},
{file = "ijson-3.2.3-cp38-cp38-win32.whl", hash = "sha256:a729b0c8fb935481afe3cf7e0dadd0da3a69cc7f145dbab8502e2f1e01d85a7c"},
{file = "ijson-3.2.3-cp38-cp38-win_amd64.whl", hash = "sha256:d34e049992d8a46922f96483e96b32ac4c9cffd01a5c33a928e70a283710cd58"},
{file = "ijson-3.2.3-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:9c2a12dcdb6fa28f333bf10b3a0f80ec70bc45280d8435be7e19696fab2bc706"},
{file = "ijson-3.2.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:1844c5b57da21466f255a0aeddf89049e730d7f3dfc4d750f0e65c36e6a61a7c"},
{file = "ijson-3.2.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:2ec3e5ff2515f1c40ef6a94983158e172f004cd643b9e4b5302017139b6c96e4"},
{file = "ijson-3.2.3-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:46bafb1b9959872a1f946f8dd9c6f1a30a970fc05b7bfae8579da3f1f988e598"},
{file = "ijson-3.2.3-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ab4db9fee0138b60e31b3c02fff8a4c28d7b152040553b6a91b60354aebd4b02"},
{file = "ijson-3.2.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f4bc87e69d1997c6a55fff5ee2af878720801ff6ab1fb3b7f94adda050651e37"},
{file = "ijson-3.2.3-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:e9fd906f0c38e9f0bfd5365e1bed98d649f506721f76bb1a9baa5d7374f26f19"},
{file = "ijson-3.2.3-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:e84d27d1acb60d9102728d06b9650e5b7e5cb0631bd6e3dfadba8fb6a80d6c2f"},
{file = "ijson-3.2.3-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:2cc04fc0a22bb945cd179f614845c8b5106c0b3939ee0d84ce67c7a61ac1a936"},
{file = "ijson-3.2.3-cp39-cp39-win32.whl", hash = "sha256:e641814793a037175f7ec1b717ebb68f26d89d82cfd66f36e588f32d7e488d5f"},
{file = "ijson-3.2.3-cp39-cp39-win_amd64.whl", hash = "sha256:6bd3e7e91d031f1e8cea7ce53f704ab74e61e505e8072467e092172422728b22"},
{file = "ijson-3.2.3-pp37-pypy37_pp73-macosx_10_9_x86_64.whl", hash = "sha256:06f9707da06a19b01013f8c65bf67db523662a9b4a4ff027e946e66c261f17f0"},
{file = "ijson-3.2.3-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:be8495f7c13fa1f622a2c6b64e79ac63965b89caf664cc4e701c335c652d15f2"},
{file = "ijson-3.2.3-pp37-pypy37_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7596b42f38c3dcf9d434dddd50f46aeb28e96f891444c2b4b1266304a19a2c09"},
{file = "ijson-3.2.3-pp37-pypy37_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fbac4e9609a1086bbad075beb2ceec486a3b138604e12d2059a33ce2cba93051"},
{file = "ijson-3.2.3-pp37-pypy37_pp73-win_amd64.whl", hash = "sha256:db2d6341f9cb538253e7fe23311d59252f124f47165221d3c06a7ed667ecd595"},
{file = "ijson-3.2.3-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:fa8b98be298efbb2588f883f9953113d8a0023ab39abe77fe734b71b46b1220a"},
{file = "ijson-3.2.3-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:674e585361c702fad050ab4c153fd168dc30f5980ef42b64400bc84d194e662d"},
{file = "ijson-3.2.3-pp38-pypy38_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fd12e42b9cb9c0166559a3ffa276b4f9fc9d5b4c304e5a13668642d34b48b634"},
{file = "ijson-3.2.3-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d31e0d771d82def80cd4663a66de277c3b44ba82cd48f630526b52f74663c639"},
{file = "ijson-3.2.3-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:7ce4c70c23521179d6da842bb9bc2e36bb9fad1e0187e35423ff0f282890c9ca"},
{file = "ijson-3.2.3-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:39f551a6fbeed4433c85269c7c8778e2aaea2501d7ebcb65b38f556030642c17"},
{file = "ijson-3.2.3-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3b14d322fec0de7af16f3ef920bf282f0dd747200b69e0b9628117f381b7775b"},
{file = "ijson-3.2.3-pp39-pypy39_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7851a341429b12d4527ca507097c959659baf5106c7074d15c17c387719ffbcd"},
{file = "ijson-3.2.3-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:db3bf1b42191b5cc9b6441552fdcb3b583594cb6b19e90d1578b7cbcf80d0fae"},
{file = "ijson-3.2.3-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:6f662dc44362a53af3084d3765bb01cd7b4734d1f484a6095cad4cb0cbfe5374"},
{file = "ijson-3.2.3.tar.gz", hash = "sha256:10294e9bf89cb713da05bc4790bdff616610432db561964827074898e174f917"},
]
[[package]]
@ -1881,13 +1881,13 @@ email = ["email-validator (>=1.0.3)"]
[[package]]
name = "pygithub"
version = "1.59.0"
version = "1.59.1"
description = "Use the full Github API v3"
optional = false
python-versions = ">=3.7"
files = [
{file = "PyGithub-1.59.0-py3-none-any.whl", hash = "sha256:126bdbae72087d8d038b113aab6b059b4553cb59348e3024bb1a1cae406ace9e"},
{file = "PyGithub-1.59.0.tar.gz", hash = "sha256:6e05ff49bac3caa7d1d6177a10c6e55a3e20c85b92424cc198571fd0cf786690"},
{file = "PyGithub-1.59.1-py3-none-any.whl", hash = "sha256:3d87a822e6c868142f0c2c4bf16cce4696b5a7a4d142a7bd160e1bdf75bc54a9"},
{file = "PyGithub-1.59.1.tar.gz", hash = "sha256:c44e3a121c15bf9d3a5cc98d94c9a047a5132a9b01d22264627f58ade9ddc217"},
]
[package.dependencies]
@ -2385,13 +2385,13 @@ doc = ["Sphinx", "sphinx-rtd-theme"]
[[package]]
name = "sentry-sdk"
version = "1.28.1"
version = "1.29.2"
description = "Python client for Sentry (https://sentry.io)"
optional = true
python-versions = "*"
files = [
{file = "sentry-sdk-1.28.1.tar.gz", hash = "sha256:dcd88c68aa64dae715311b5ede6502fd684f70d00a7cd4858118f0ba3153a3ae"},
{file = "sentry_sdk-1.28.1-py2.py3-none-any.whl", hash = "sha256:6bdb25bd9092478d3a817cb0d01fa99e296aea34d404eac3ca0037faa5c2aa0a"},
{file = "sentry-sdk-1.29.2.tar.gz", hash = "sha256:a99ee105384788c3f228726a88baf515fe7b5f1d2d0f215a03d194369f158df7"},
{file = "sentry_sdk-1.29.2-py2.py3-none-any.whl", hash = "sha256:3e17215d8006612e2df02b0e73115eb8376c37e3f586d8436fa41644e605074d"},
]
[package.dependencies]
@ -3013,13 +3013,13 @@ files = [
[[package]]
name = "types-pyopenssl"
version = "23.2.0.1"
version = "23.2.0.2"
description = "Typing stubs for pyOpenSSL"
optional = false
python-versions = "*"
files = [
{file = "types-pyOpenSSL-23.2.0.1.tar.gz", hash = "sha256:beeb5d22704c625a1e4b6dc756355c5b4af0b980138b702a9d9f932acf020903"},
{file = "types_pyOpenSSL-23.2.0.1-py3-none-any.whl", hash = "sha256:0568553f104466f1b8e0db3360fbe6770137d02e21a1a45c209bf2b1b03d90d4"},
{file = "types-pyOpenSSL-23.2.0.2.tar.gz", hash = "sha256:6a010dac9ecd42b582d7dd2cc3e9e40486b79b3b64bb2fffba1474ff96af906d"},
{file = "types_pyOpenSSL-23.2.0.2-py3-none-any.whl", hash = "sha256:19536aa3debfbe25a918cf0d898e9f5fbbe6f3594a429da7914bf331deb1b342"},
]
[package.dependencies]

View File

@ -367,7 +367,7 @@ furo = ">=2022.12.7,<2024.0.0"
# system changes.
# We are happy to raise these upper bounds upon request,
# provided we check that it's safe to do so (i.e. that CI passes).
requires = ["poetry-core>=1.1.0,<=1.6.0", "setuptools_rust>=1.3,<=1.6.0"]
requires = ["poetry-core>=1.1.0,<=1.7.0", "setuptools_rust>=1.3,<=1.6.0"]
build-backend = "poetry.core.masonry.api"

View File

@ -20,6 +20,7 @@ from authlib.oauth2.auth import encode_client_secret_basic, encode_client_secret
from authlib.oauth2.rfc7523 import ClientSecretJWT, PrivateKeyJWT, private_key_jwt_sign
from authlib.oauth2.rfc7662 import IntrospectionToken
from authlib.oidc.discovery import OpenIDProviderMetadata, get_well_known_url
from prometheus_client import Histogram
from twisted.web.client import readBody
from twisted.web.http_headers import Headers
@ -46,6 +47,13 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
introspection_response_timer = Histogram(
"synapse_api_auth_delegated_introspection_response",
"Time taken to get a response for an introspection request",
["code"],
)
# Scope as defined by MSC2967
# https://github.com/matrix-org/matrix-spec-proposals/pull/2967
SCOPE_MATRIX_API = "urn:matrix:org.matrix.msc2967.client:api:*"
@ -190,14 +198,26 @@ class MSC3861DelegatedAuth(BaseAuth):
# Do the actual request
# We're not using the SimpleHttpClient util methods as we don't want to
# check the HTTP status code, and we do the body encoding ourselves.
response = await self._http_client.request(
method="POST",
uri=uri,
data=body.encode("utf-8"),
headers=headers,
)
resp_body = await make_deferred_yieldable(readBody(response))
start_time = self._clock.time()
try:
response = await self._http_client.request(
method="POST",
uri=uri,
data=body.encode("utf-8"),
headers=headers,
)
resp_body = await make_deferred_yieldable(readBody(response))
except Exception:
end_time = self._clock.time()
introspection_response_timer.labels("ERR").observe(end_time - start_time)
raise
end_time = self._clock.time()
introspection_response_timer.labels(response.code).observe(
end_time - start_time
)
if response.code < 200 or response.code >= 300:
raise HttpResponseException(
@ -226,7 +246,7 @@ class MSC3861DelegatedAuth(BaseAuth):
return introspection_token
async def is_server_admin(self, requester: Requester) -> bool:
return "urn:synapse:admin:*" in requester.scope
return SCOPE_SYNAPSE_ADMIN in requester.scope
async def get_user_by_req(
self,
@ -243,6 +263,25 @@ class MSC3861DelegatedAuth(BaseAuth):
# so that we don't provision the user if they don't have enough permission:
requester = await self.get_user_by_access_token(access_token, allow_expired)
# Allow impersonation by an admin user using `_oidc_admin_impersonate_user_id` query parameter
if request.args is not None:
user_id_params = request.args.get(b"_oidc_admin_impersonate_user_id")
if user_id_params:
if await self.is_server_admin(requester):
user_id_str = user_id_params[0].decode("ascii")
impersonated_user_id = UserID.from_string(user_id_str)
logging.info(f"Admin impersonation of user {user_id_str}")
requester = create_requester(
user_id=impersonated_user_id,
scope=[SCOPE_MATRIX_API],
authenticated_entity=requester.user.to_string(),
)
else:
raise AuthError(
401,
"Impersonation not possible by a non admin user",
)
# Deny the request if the user account is locked.
if not allow_locked and await self.store.get_user_locked_status(
requester.user.to_string()
@ -270,14 +309,14 @@ class MSC3861DelegatedAuth(BaseAuth):
# XXX: This is a temporary solution so that the admin API can be called by
# the OIDC provider. This will be removed once we have OIDC client
# credentials grant support in matrix-authentication-service.
logging.info("Admin toked used")
logging.info("Admin token used")
# XXX: that user doesn't exist and won't be provisioned.
# This is mostly fine for admin calls, but we should also think about doing
# requesters without a user_id.
admin_user = UserID("__oidc_admin", self._hostname)
return create_requester(
user_id=admin_user,
scope=["urn:synapse:admin:*"],
scope=[SCOPE_SYNAPSE_ADMIN],
)
try:
@ -399,3 +438,16 @@ class MSC3861DelegatedAuth(BaseAuth):
scope=scope,
is_guest=(has_guest_scope and not has_user_scope),
)
def invalidate_cached_tokens(self, keys: List[str]) -> None:
"""
Invalidate the entry(s) in the introspection token cache corresponding to the given key
"""
for key in keys:
self._token_cache.invalidate(key)
def invalidate_token_cache(self) -> None:
"""
Invalidate the entire token cache.
"""
self._token_cache.invalidate_all()

View File

@ -91,6 +91,7 @@ from synapse.storage.databases.main.state import StateGroupWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.databases.main.tags import TagsWorkerStore
from synapse.storage.databases.main.task_scheduler import TaskSchedulerWorkerStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
@ -144,6 +145,7 @@ class GenericWorkerStore(
TransactionWorkerStore,
LockStore,
SessionStore,
TaskSchedulerWorkerStore,
):
# Properties that multiple storage classes define. Tell mypy what the
# expected type is.

View File

@ -173,6 +173,13 @@ class MSC3861:
("enable_registration",),
)
# We only need to test the user consent version, as if it must be set if the user_consent section was present in the config
if root.consent.user_consent_version is not None:
raise ConfigError(
"User consent cannot be enabled when OAuth delegation is enabled",
("user_consent",),
)
if (
root.oidc.oidc_enabled
or root.saml2.saml2_enabled
@ -216,6 +223,12 @@ class MSC3861:
("session_lifetime",),
)
if root.registration.enable_3pid_changes:
raise ConfigError(
"enable_3pid_changes cannot be enabled when OAuth delegation is enabled",
("enable_3pid_changes",),
)
@attr.s(auto_attribs=True, frozen=True, slots=True)
class MSC3866Config:

View File

@ -280,6 +280,20 @@ def _parse_oidc_config_dict(
for x in oidc_config.get("attribute_requirements", [])
]
# Read from either `client_secret_path` or `client_secret`. If both exist, error.
client_secret = oidc_config.get("client_secret")
client_secret_path = oidc_config.get("client_secret_path")
if client_secret_path is not None:
if client_secret is None:
client_secret = read_file(
client_secret_path, config_path + ("client_secret_path",)
).rstrip("\n")
else:
raise ConfigError(
"Cannot specify both client_secret and client_secret_path",
config_path + ("client_secret",),
)
return OidcProviderConfig(
idp_id=idp_id,
idp_name=oidc_config.get("idp_name", "OIDC"),
@ -288,7 +302,7 @@ def _parse_oidc_config_dict(
discover=oidc_config.get("discover", True),
issuer=oidc_config["issuer"],
client_id=oidc_config["client_id"],
client_secret=oidc_config.get("client_secret"),
client_secret=client_secret,
client_secret_jwt_key=client_secret_jwt_key,
client_auth_method=oidc_config.get("client_auth_method", "client_secret_basic"),
pkce_method=oidc_config.get("pkce_method", "auto"),

View File

@ -133,7 +133,16 @@ class RegistrationConfig(Config):
self.enable_set_displayname = config.get("enable_set_displayname", True)
self.enable_set_avatar_url = config.get("enable_set_avatar_url", True)
self.enable_3pid_changes = config.get("enable_3pid_changes", True)
# The default value of enable_3pid_changes is True, unless msc3861 is enabled.
msc3861_enabled = (
(config.get("experimental_features") or {})
.get("msc3861", {})
.get("enabled", False)
)
self.enable_3pid_changes = config.get(
"enable_3pid_changes", not msc3861_enabled
)
self.disable_msisdn_registration = config.get(
"disable_msisdn_registration", False

View File

@ -60,6 +60,7 @@ from synapse.events import EventBase
from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
from synapse.events.validator import EventValidator
from synapse.federation.federation_client import InvalidResponseError
from synapse.handlers.pagination import PURGE_PAGINATION_LOCK_NAME
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
@ -152,6 +153,7 @@ class FederationHandler:
self._device_handler = hs.get_device_handler()
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
self._notifier = hs.get_notifier()
self._worker_locks = hs.get_worker_locks_handler()
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
@ -200,7 +202,7 @@ class FederationHandler:
@trace
@tag_args
async def maybe_backfill(
self, room_id: str, current_depth: int, limit: int
self, room_id: str, current_depth: int, limit: int, record_time: bool = True
) -> bool:
"""Checks the database to see if we should backfill before paginating,
and if so do.
@ -213,21 +215,25 @@ class FederationHandler:
limit: The number of events that the pagination request will
return. This is used as part of the heuristic to decide if we
should back paginate.
record_time: Whether to record the time it takes to backfill.
Returns:
True if we actually tried to backfill something, otherwise False.
"""
# Starting the processing time here so we can include the room backfill
# linearizer lock queue in the timing
processing_start_time = self.clock.time_msec()
processing_start_time = self.clock.time_msec() if record_time else 0
async with self._room_backfill.queue(room_id):
return await self._maybe_backfill_inner(
room_id,
current_depth,
limit,
processing_start_time=processing_start_time,
)
async with self._worker_locks.acquire_read_write_lock(
PURGE_PAGINATION_LOCK_NAME, room_id, write=False
):
return await self._maybe_backfill_inner(
room_id,
current_depth,
limit,
processing_start_time=processing_start_time,
)
@trace
@tag_args
@ -305,12 +311,21 @@ class FederationHandler:
# of history that extends all the way back to where we are currently paginating
# and it's within the 100 events that are returned from `/backfill`.
if not sorted_backfill_points and current_depth != MAX_DEPTH:
# Check that we actually have later backfill points, if not just return.
have_later_backfill_points = await self.store.get_backfill_points_in_room(
room_id=room_id,
current_depth=MAX_DEPTH,
limit=1,
)
if not have_later_backfill_points:
return False
logger.debug(
"_maybe_backfill_inner: all backfill points are *after* current depth. Trying again with later backfill points."
)
run_as_background_process(
"_maybe_backfill_inner_anyway_with_max_depth",
self._maybe_backfill_inner,
self.maybe_backfill,
room_id=room_id,
# We use `MAX_DEPTH` so that we find all backfill points next
# time (all events are below the `MAX_DEPTH`)
@ -319,7 +334,7 @@ class FederationHandler:
# We don't want to start another timing observation from this
# nested recursive call. The top-most call can record the time
# overall otherwise the smaller one will throw off the results.
processing_start_time=None,
record_time=False,
)
# We return `False` because we're backfilling in the background and there is
# no new events immediately for the caller to know about yet.

View File

@ -487,155 +487,150 @@ class PaginationHandler:
room_token = from_token.room_key
async with self._worker_locks.acquire_read_write_lock(
PURGE_PAGINATION_LOCK_NAME, room_id, write=False
):
(membership, member_event_id) = (None, None)
if not use_admin_priviledge:
(
membership,
member_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
room_id, requester, allow_departed_users=True
)
if pagin_config.direction == Direction.BACKWARDS:
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
curr_topo = room_token.topological
else:
curr_topo = await self.store.get_current_topological_token(
room_id, room_token.stream
)
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
if (
pagin_config.direction == Direction.BACKWARDS
and not use_admin_priviledge
and membership == Membership.LEAVE
):
# This is only None if the room is world_readable, in which case
# "Membership.JOIN" would have been returned and we should never hit
# this branch.
assert member_event_id
leave_token = await self.store.get_topological_token_for_event(
member_event_id
)
assert leave_token.topological is not None
if leave_token.topological < curr_topo:
from_token = from_token.copy_and_replace(
StreamKeyType.ROOM, leave_token
)
to_room_key = None
if pagin_config.to_token:
to_room_key = pagin_config.to_token.room_key
# Initially fetch the events from the database. With any luck, we can return
# these without blocking on backfill (handled below).
events, next_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
(membership, member_event_id) = (None, None)
if not use_admin_priviledge:
(
membership,
member_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
room_id, requester, allow_departed_users=True
)
if pagin_config.direction == Direction.BACKWARDS:
# We use a `Set` because there can be multiple events at a given depth
# and we only care about looking at the unique continum of depths to
# find gaps.
event_depths: Set[int] = {event.depth for event in events}
sorted_event_depths = sorted(event_depths)
# Inspect the depths of the returned events to see if there are any gaps
found_big_gap = False
number_of_gaps = 0
previous_event_depth = (
sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
if pagin_config.direction == Direction.BACKWARDS:
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
curr_topo = room_token.topological
else:
curr_topo = await self.store.get_current_topological_token(
room_id, room_token.stream
)
for event_depth in sorted_event_depths:
# We don't expect a negative depth but we'll just deal with it in
# any case by taking the absolute value to get the true gap between
# any two integers.
depth_gap = abs(event_depth - previous_event_depth)
# A `depth_gap` of 1 is a normal continuous chain to the next event
# (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
# also possible there is no event at a given depth but we can't ever
# know that for sure)
if depth_gap > 1:
number_of_gaps += 1
# We only tolerate a small number single-event long gaps in the
# returned events because those are most likely just events we've
# failed to pull in the past. Anything longer than that is probably
# a sign that we're missing a decent chunk of history and we should
# try to backfill it.
#
# XXX: It's possible we could tolerate longer gaps if we checked
# that a given events `prev_events` is one that has failed pull
# attempts and we could just treat it like a dead branch of history
# for now or at least something that we don't need the block the
# client on to try pulling.
#
# XXX: If we had something like MSC3871 to indicate gaps in the
# timeline to the client, we could also get away with any sized gap
# and just have the client refetch the holes as they see fit.
if depth_gap > 2:
found_big_gap = True
break
previous_event_depth = event_depth
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
if (
pagin_config.direction == Direction.BACKWARDS
and not use_admin_priviledge
and membership == Membership.LEAVE
):
# This is only None if the room is world_readable, in which case
# "Membership.JOIN" would have been returned and we should never hit
# this branch.
assert member_event_id
# Backfill in the foreground if we found a big gap, have too many holes,
# or we don't have enough events to fill the limit that the client asked
# for.
missing_too_many_events = (
number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
leave_token = await self.store.get_topological_token_for_event(
member_event_id
)
assert leave_token.topological is not None
if leave_token.topological < curr_topo:
from_token = from_token.copy_and_replace(
StreamKeyType.ROOM, leave_token
)
not_enough_events_to_fill_response = len(events) < pagin_config.limit
if (
found_big_gap
or missing_too_many_events
or not_enough_events_to_fill_response
):
did_backfill = (
await self.hs.get_federation_handler().maybe_backfill(
room_id,
curr_topo,
limit=pagin_config.limit,
)
)
# If we did backfill something, refetch the events from the database to
# catch anything new that might have been added since we last fetched.
if did_backfill:
events, next_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
)
else:
# Otherwise, we can backfill in the background for eventual
# consistency's sake but we don't need to block the client waiting
# for a costly federation call and processing.
run_as_background_process(
"maybe_backfill_in_the_background",
self.hs.get_federation_handler().maybe_backfill,
room_id,
curr_topo,
to_room_key = None
if pagin_config.to_token:
to_room_key = pagin_config.to_token.room_key
# Initially fetch the events from the database. With any luck, we can return
# these without blocking on backfill (handled below).
events, next_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
)
if pagin_config.direction == Direction.BACKWARDS:
# We use a `Set` because there can be multiple events at a given depth
# and we only care about looking at the unique continum of depths to
# find gaps.
event_depths: Set[int] = {event.depth for event in events}
sorted_event_depths = sorted(event_depths)
# Inspect the depths of the returned events to see if there are any gaps
found_big_gap = False
number_of_gaps = 0
previous_event_depth = (
sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
)
for event_depth in sorted_event_depths:
# We don't expect a negative depth but we'll just deal with it in
# any case by taking the absolute value to get the true gap between
# any two integers.
depth_gap = abs(event_depth - previous_event_depth)
# A `depth_gap` of 1 is a normal continuous chain to the next event
# (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
# also possible there is no event at a given depth but we can't ever
# know that for sure)
if depth_gap > 1:
number_of_gaps += 1
# We only tolerate a small number single-event long gaps in the
# returned events because those are most likely just events we've
# failed to pull in the past. Anything longer than that is probably
# a sign that we're missing a decent chunk of history and we should
# try to backfill it.
#
# XXX: It's possible we could tolerate longer gaps if we checked
# that a given events `prev_events` is one that has failed pull
# attempts and we could just treat it like a dead branch of history
# for now or at least something that we don't need the block the
# client on to try pulling.
#
# XXX: If we had something like MSC3871 to indicate gaps in the
# timeline to the client, we could also get away with any sized gap
# and just have the client refetch the holes as they see fit.
if depth_gap > 2:
found_big_gap = True
break
previous_event_depth = event_depth
# Backfill in the foreground if we found a big gap, have too many holes,
# or we don't have enough events to fill the limit that the client asked
# for.
missing_too_many_events = (
number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
)
not_enough_events_to_fill_response = len(events) < pagin_config.limit
if (
found_big_gap
or missing_too_many_events
or not_enough_events_to_fill_response
):
did_backfill = await self.hs.get_federation_handler().maybe_backfill(
room_id,
curr_topo,
limit=pagin_config.limit,
)
# If we did backfill something, refetch the events from the database to
# catch anything new that might have been added since we last fetched.
if did_backfill:
events, next_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
)
else:
# Otherwise, we can backfill in the background for eventual
# consistency's sake but we don't need to block the client waiting
# for a costly federation call and processing.
run_as_background_process(
"maybe_backfill_in_the_background",
self.hs.get_federation_handler().maybe_backfill,
room_id,
curr_topo,
limit=pagin_config.limit,
)
next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
# if no events are returned from pagination, that implies
# we have reached the end of the available events.

View File

@ -14,7 +14,9 @@
"""A replication client for use by synapse workers.
"""
import logging
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Dict, Iterable, Optional, Set, Tuple
from sortedcontainers import SortedList
from twisted.internet import defer
from twisted.internet.defer import Deferred
@ -26,6 +28,7 @@ from synapse.logging.context import PreserveLoggingContext, make_deferred_yielda
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.streams import (
AccountDataStream,
CachesStream,
DeviceListsStream,
PushersStream,
PushRulesStream,
@ -73,6 +76,7 @@ class ReplicationDataHandler:
self._instance_name = hs.get_instance_name()
self._typing_handler = hs.get_typing_handler()
self._state_storage_controller = hs.get_storage_controllers().state
self.auth = hs.get_auth()
self._notify_pushers = hs.config.worker.start_pushers
self._pusher_pool = hs.get_pusherpool()
@ -84,7 +88,9 @@ class ReplicationDataHandler:
# Map from stream and instance to list of deferreds waiting for the stream to
# arrive at a particular position. The lists are sorted by stream position.
self._streams_to_waiters: Dict[Tuple[str, str], List[Tuple[int, Deferred]]] = {}
self._streams_to_waiters: Dict[
Tuple[str, str], SortedList[Tuple[int, Deferred]]
] = {}
async def on_rdata(
self, stream_name: str, instance_name: str, token: int, rows: list
@ -218,6 +224,16 @@ class ReplicationDataHandler:
self._state_storage_controller.notify_event_un_partial_stated(
row.event_id
)
# invalidate the introspection token cache
elif stream_name == CachesStream.NAME:
for row in rows:
if row.cache_func == "introspection_token_invalidation":
if row.keys[0] is None:
# invalidate the whole cache
# mypy ignore - the token cache is defined on MSC3861DelegatedAuth
self.auth.invalidate_token_cache() # type: ignore[attr-defined]
else:
self.auth.invalidate_cached_tokens(row.keys) # type: ignore[attr-defined]
await self._presence_handler.process_replication_rows(
stream_name, instance_name, token, rows
@ -226,7 +242,9 @@ class ReplicationDataHandler:
# Notify any waiting deferreds. The list is ordered by position so we
# just iterate through the list until we reach a position that is
# greater than the received row position.
waiting_list = self._streams_to_waiters.get((stream_name, instance_name), [])
waiting_list = self._streams_to_waiters.get((stream_name, instance_name))
if not waiting_list:
return
# Index of first item with a position after the current token, i.e we
# have called all deferreds before this index. If not overwritten by
@ -250,7 +268,7 @@ class ReplicationDataHandler:
# Drop all entries in the waiting list that were called in the above
# loop. (This maintains the order so no need to resort)
waiting_list[:] = waiting_list[index_of_first_deferred_not_called:]
del waiting_list[:index_of_first_deferred_not_called]
for deferred in deferreds_to_callback:
try:
@ -310,11 +328,10 @@ class ReplicationDataHandler:
)
waiting_list = self._streams_to_waiters.setdefault(
(stream_name, instance_name), []
(stream_name, instance_name), SortedList(key=lambda t: t[0])
)
waiting_list.append((position, deferred))
waiting_list.sort(key=lambda t: t[0])
waiting_list.add((position, deferred))
# We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"):

View File

@ -47,6 +47,7 @@ from synapse.rest.admin.federation import (
ListDestinationsRestServlet,
)
from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo
from synapse.rest.admin.oidc import OIDCTokenRevocationRestServlet
from synapse.rest.admin.registration_tokens import (
ListRegistrationTokensRestServlet,
NewRegistrationTokenRestServlet,
@ -297,6 +298,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
BackgroundUpdateRestServlet(hs).register(http_server)
BackgroundUpdateStartJobRestServlet(hs).register(http_server)
ExperimentalFeaturesRestServlet(hs).register(http_server)
if hs.config.experimental.msc3861.enabled:
OIDCTokenRevocationRestServlet(hs).register(http_server)
def register_servlets_for_client_rest_resource(

View File

@ -0,0 +1,55 @@
# Copyright 2023 The Matrix.org Foundation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from http import HTTPStatus
from typing import TYPE_CHECKING, Dict, Tuple
from synapse.http.servlet import RestServlet
from synapse.http.site import SynapseRequest
from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
if TYPE_CHECKING:
from synapse.server import HomeServer
class OIDCTokenRevocationRestServlet(RestServlet):
"""
Delete a given token introspection response - identified by the `jti` field - from the
introspection token cache when a token is revoked at the authorizing server
"""
PATTERNS = admin_patterns("/OIDC_token_revocation/(?P<token_id>[^/]*)")
def __init__(self, hs: "HomeServer"):
super().__init__()
auth = hs.get_auth()
# If this endpoint is loaded then we must have enabled delegated auth.
from synapse.api.auth.msc3861_delegated import MSC3861DelegatedAuth
assert isinstance(auth, MSC3861DelegatedAuth)
self.auth = auth
self.store = hs.get_datastores().main
async def on_DELETE(
self, request: SynapseRequest, token_id: str
) -> Tuple[HTTPStatus, Dict]:
await assert_requester_is_admin(self.auth, request)
self.auth._token_cache.invalidate(token_id)
# make sure we invalidate the cache on any workers
await self.store.stream_introspection_token_invalidation((token_id,))
return HTTPStatus.OK, {}

View File

@ -109,6 +109,8 @@ class UsersRestServletV2(RestServlet):
)
deactivated = parse_boolean(request, "deactivated", default=False)
admins = parse_boolean(request, "admins")
# If support for MSC3866 is not enabled, apply no filtering based on the
# `approved` column.
if self._msc3866_enabled:
@ -146,6 +148,7 @@ class UsersRestServletV2(RestServlet):
name,
guests,
deactivated,
admins,
order_by,
direction,
approved,

View File

@ -142,6 +142,7 @@ from synapse.util.distributor import Distributor
from synapse.util.macaroons import MacaroonGenerator
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.stringutils import random_string
from synapse.util.task_scheduler import TaskScheduler
logger = logging.getLogger(__name__)
@ -360,6 +361,7 @@ class HomeServer(metaclass=abc.ABCMeta):
"""
for i in self.REQUIRED_ON_BACKGROUND_TASK_STARTUP:
getattr(self, "get_" + i + "_handler")()
self.get_task_scheduler()
def get_reactor(self) -> ISynapseReactor:
"""
@ -912,6 +914,9 @@ class HomeServer(metaclass=abc.ABCMeta):
"""Usage metrics shared between phone home stats and the prometheus exporter."""
return CommonUsageMetricsManager(self)
@cache_in_self
def get_worker_locks_handler(self) -> WorkerLocksHandler:
return WorkerLocksHandler(self)
@cache_in_self
def get_task_scheduler(self) -> TaskScheduler:
return TaskScheduler(self)

View File

@ -70,6 +70,7 @@ from .state import StateStore
from .stats import StatsStore
from .stream import StreamWorkerStore
from .tags import TagsStore
from .task_scheduler import TaskSchedulerWorkerStore
from .transactions import TransactionWorkerStore
from .ui_auth import UIAuthStore
from .user_directory import UserDirectoryStore
@ -127,6 +128,7 @@ class DataStore(
CacheInvalidationWorkerStore,
LockStore,
SessionStore,
TaskSchedulerWorkerStore,
):
def __init__(
self,
@ -168,6 +170,7 @@ class DataStore(
name: Optional[str] = None,
guests: bool = True,
deactivated: bool = False,
admins: Optional[bool] = None,
order_by: str = UserSortOrder.NAME.value,
direction: Direction = Direction.FORWARDS,
approved: bool = True,
@ -184,6 +187,9 @@ class DataStore(
name: search for local part of user_id or display name
guests: whether to in include guest users
deactivated: whether to include deactivated users
admins: Optional flag to filter admins. If true, only admins are queried.
if false, admins are excluded from the query. When it is
none (the default), both admins and none-admins are queried.
order_by: the sort order of the returned list
direction: sort ascending or descending
approved: whether to include approved users
@ -220,6 +226,12 @@ class DataStore(
if not deactivated:
filters.append("deactivated = 0")
if admins is not None:
if admins:
filters.append("admin = 1")
else:
filters.append("admin = 0")
if not approved:
# We ignore NULL values for the approved flag because these should only
# be already existing users that we consider as already approved.

View File

@ -584,6 +584,19 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
else:
return 0
async def stream_introspection_token_invalidation(
self, key: Tuple[Optional[str]]
) -> None:
"""
Stream an invalidation request for the introspection token cache to workers
Args:
key: token_id of the introspection token to remove from the cache
"""
await self.send_invalidation_to_replication(
"introspection_token_invalidation", key
)
@wrap_as_background_process("clean_up_old_cache_invalidations")
async def _clean_up_cache_invalidation_wrapper(self) -> None:
"""

View File

@ -33,6 +33,7 @@ from typing_extensions import Literal
from synapse.api.constants import EduTypes
from synapse.api.errors import Codes, StoreError
from synapse.config.homeserver import HomeServerConfig
from synapse.logging.opentracing import (
get_active_span_text_map,
set_tag,
@ -1663,6 +1664,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
self.device_id_exists_cache: LruCache[
Tuple[str, str], Literal[True]
] = LruCache(cache_name="device_id_exists", max_size=10000)
self.config: HomeServerConfig = hs.config
async def store_device(
self,
@ -1784,6 +1786,13 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
for device_id in device_ids:
self.device_id_exists_cache.invalidate((user_id, device_id))
# TODO: don't nuke the entire cache once there is a way to associate
# device_id -> introspection_token
if self.config.experimental.msc3861.enabled:
# mypy ignore - the token cache is defined on MSC3861DelegatedAuth
self.auth._token_cache.invalidate_all() # type: ignore[attr-defined]
await self.stream_introspection_token_invalidation((None,))
async def update_device(
self, user_id: str, device_id: str, new_display_name: Optional[str] = None
) -> None:

View File

@ -452,33 +452,56 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
# sets.
seen_chains: Set[int] = set()
sql = """
SELECT event_id, chain_id, sequence_number
FROM event_auth_chains
WHERE %s
"""
for batch in batch_iter(initial_events, 1000):
clause, args = make_in_list_sql_clause(
txn.database_engine, "event_id", batch
)
txn.execute(sql % (clause,), args)
# Fetch the chain cover index for the initial set of events we're
# considering.
def fetch_chain_info(events_to_fetch: Collection[str]) -> None:
sql = """
SELECT event_id, chain_id, sequence_number
FROM event_auth_chains
WHERE %s
"""
for batch in batch_iter(events_to_fetch, 1000):
clause, args = make_in_list_sql_clause(
txn.database_engine, "event_id", batch
)
txn.execute(sql % (clause,), args)
for event_id, chain_id, sequence_number in txn:
chain_info[event_id] = (chain_id, sequence_number)
seen_chains.add(chain_id)
chain_to_event.setdefault(chain_id, {})[sequence_number] = event_id
for event_id, chain_id, sequence_number in txn:
chain_info[event_id] = (chain_id, sequence_number)
seen_chains.add(chain_id)
chain_to_event.setdefault(chain_id, {})[sequence_number] = event_id
fetch_chain_info(initial_events)
# Check that we actually have a chain ID for all the events.
events_missing_chain_info = initial_events.difference(chain_info)
# The result set to return, i.e. the auth chain difference.
result: Set[str] = set()
if events_missing_chain_info:
# This can happen due to e.g. downgrade/upgrade of the server. We
# raise an exception and fall back to the previous algorithm.
logger.info(
"Unexpectedly found that events don't have chain IDs in room %s: %s",
# For some reason we have events we haven't calculated the chain
# index for, so we need to handle those separately. This should only
# happen for older rooms where the server doesn't have all the auth
# events.
result = self._fixup_auth_chain_difference_sets(
txn,
room_id,
events_missing_chain_info,
state_sets=state_sets,
events_missing_chain_info=events_missing_chain_info,
events_that_have_chain_index=chain_info,
)
raise _NoChainCoverIndex(room_id)
# We now need to refetch any events that we have added to the state
# sets.
new_events_to_fetch = {
event_id
for state_set in state_sets
for event_id in state_set
if event_id not in initial_events
}
fetch_chain_info(new_events_to_fetch)
# Corresponds to `state_sets`, except as a map from chain ID to max
# sequence number reachable from the state set.
@ -487,8 +510,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
chains: Dict[int, int] = {}
set_to_chain.append(chains)
for event_id in state_set:
chain_id, seq_no = chain_info[event_id]
for state_id in state_set:
chain_id, seq_no = chain_info[state_id]
chains[chain_id] = max(seq_no, chains.get(chain_id, 0))
@ -532,7 +555,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
# from *any* state set and the minimum sequence number reachable from
# *all* state sets. Events in that range are in the auth chain
# difference.
result = set()
# Mapping from chain ID to the range of sequence numbers that should be
# pulled from the database.
@ -588,6 +610,122 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
return result
def _fixup_auth_chain_difference_sets(
self,
txn: LoggingTransaction,
room_id: str,
state_sets: List[Set[str]],
events_missing_chain_info: Set[str],
events_that_have_chain_index: Collection[str],
) -> Set[str]:
"""Helper for `_get_auth_chain_difference_using_cover_index_txn` to
handle the case where we haven't calculated the chain cover index for
all events.
This modifies `state_sets` so that they only include events that have a
chain cover index, and returns a set of event IDs that are part of the
auth difference.
"""
# This works similarly to the handling of unpersisted events in
# `synapse.state.v2_get_auth_chain_difference`. We uses the observation
# that if you can split the set of events into two classes X and Y,
# where no events in Y have events in X in their auth chain, then we can
# calculate the auth difference by considering X and Y separately.
#
# We do this in three steps:
# 1. Compute the set of events without chain cover index belonging to
# the auth difference.
# 2. Replacing the un-indexed events in the state_sets with their auth
# events, recursively, until the state_sets contain only indexed
# events. We can then calculate the auth difference of those state
# sets using the chain cover index.
# 3. Add the results of 1 and 2 together.
# By construction we know that all events that we haven't persisted the
# chain cover index for are contained in
# `event_auth_chain_to_calculate`, so we pull out the events from those
# rather than doing recursive queries to walk the auth chain.
#
# We pull out those events with their auth events, which gives us enough
# information to construct the auth chain of an event up to auth events
# that have the chain cover index.
sql = """
SELECT tc.event_id, ea.auth_id, eac.chain_id IS NOT NULL
FROM event_auth_chain_to_calculate AS tc
LEFT JOIN event_auth AS ea USING (event_id)
LEFT JOIN event_auth_chains AS eac ON (ea.auth_id = eac.event_id)
WHERE tc.room_id = ?
"""
txn.execute(sql, (room_id,))
event_to_auth_ids: Dict[str, Set[str]] = {}
events_that_have_chain_index = set(events_that_have_chain_index)
for event_id, auth_id, auth_id_has_chain in txn:
s = event_to_auth_ids.setdefault(event_id, set())
if auth_id is not None:
s.add(auth_id)
if auth_id_has_chain:
events_that_have_chain_index.add(auth_id)
if events_missing_chain_info - event_to_auth_ids.keys():
# Uh oh, we somehow haven't correctly done the chain cover index,
# bail and fall back to the old method.
logger.info(
"Unexpectedly found that events don't have chain IDs in room %s: %s",
room_id,
events_missing_chain_info - event_to_auth_ids.keys(),
)
raise _NoChainCoverIndex(room_id)
# Create a map from event IDs we care about to their partial auth chain.
event_id_to_partial_auth_chain: Dict[str, Set[str]] = {}
for event_id, auth_ids in event_to_auth_ids.items():
if not any(event_id in state_set for state_set in state_sets):
continue
processing = set(auth_ids)
to_add = set()
while processing:
auth_id = processing.pop()
to_add.add(auth_id)
sub_auth_ids = event_to_auth_ids.get(auth_id)
if sub_auth_ids is None:
continue
processing.update(sub_auth_ids - to_add)
event_id_to_partial_auth_chain[event_id] = to_add
# Now we do two things:
# 1. Update the state sets to only include indexed events; and
# 2. Create a new list containing the auth chains of the un-indexed
# events
unindexed_state_sets: List[Set[str]] = []
for state_set in state_sets:
unindexed_state_set = set()
for event_id, auth_chain in event_id_to_partial_auth_chain.items():
if event_id not in state_set:
continue
unindexed_state_set.add(event_id)
state_set.discard(event_id)
state_set.difference_update(auth_chain)
for auth_id in auth_chain:
if auth_id in events_that_have_chain_index:
state_set.add(auth_id)
else:
unindexed_state_set.add(auth_id)
unindexed_state_sets.append(unindexed_state_set)
# Calculate and return the auth difference of the un-indexed events.
union = unindexed_state_sets[0].union(*unindexed_state_sets[1:])
intersection = unindexed_state_sets[0].intersection(*unindexed_state_sets[1:])
return union - intersection
def _get_auth_chain_difference_txn(
self, txn: LoggingTransaction, state_sets: List[Set[str]]
) -> Set[str]:

View File

@ -0,0 +1,202 @@
# Copyright 2023 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
make_in_list_sql_clause,
)
from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus
from synapse.util import json_encoder
if TYPE_CHECKING:
from synapse.server import HomeServer
class TaskSchedulerWorkerStore(SQLBaseStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
@staticmethod
def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask:
row["status"] = TaskStatus(row["status"])
if row["params"] is not None:
row["params"] = db_to_json(row["params"])
if row["result"] is not None:
row["result"] = db_to_json(row["result"])
return ScheduledTask(**row)
async def get_scheduled_tasks(
self,
*,
actions: Optional[List[str]] = None,
resource_id: Optional[str] = None,
statuses: Optional[List[TaskStatus]] = None,
max_timestamp: Optional[int] = None,
) -> List[ScheduledTask]:
"""Get a list of scheduled tasks from the DB.
Args:
actions: Limit the returned tasks to those specific action names
resource_id: Limit the returned tasks to the specific resource id, if specified
statuses: Limit the returned tasks to the specific statuses
max_timestamp: Limit the returned tasks to the ones that have
a timestamp inferior to the specified one
Returns: a list of `ScheduledTask`, ordered by increasing timestamps
"""
def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]:
clauses: List[str] = []
args: List[Any] = []
if resource_id:
clauses.append("resource_id = ?")
args.append(resource_id)
if actions is not None:
clause, temp_args = make_in_list_sql_clause(
txn.database_engine, "action", actions
)
clauses.append(clause)
args.extend(temp_args)
if statuses is not None:
clause, temp_args = make_in_list_sql_clause(
txn.database_engine, "status", statuses
)
clauses.append(clause)
args.extend(temp_args)
if max_timestamp is not None:
clauses.append("timestamp <= ?")
args.append(max_timestamp)
sql = "SELECT * FROM scheduled_tasks"
if clauses:
sql = sql + " WHERE " + " AND ".join(clauses)
sql = sql + "ORDER BY timestamp"
txn.execute(sql, args)
return self.db_pool.cursor_to_dict(txn)
rows = await self.db_pool.runInteraction(
"get_scheduled_tasks", get_scheduled_tasks_txn
)
return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows]
async def insert_scheduled_task(self, task: ScheduledTask) -> None:
"""Insert a specified `ScheduledTask` in the DB.
Args:
task: the `ScheduledTask` to insert
"""
await self.db_pool.simple_insert(
"scheduled_tasks",
{
"id": task.id,
"action": task.action,
"status": task.status,
"timestamp": task.timestamp,
"resource_id": task.resource_id,
"params": None
if task.params is None
else json_encoder.encode(task.params),
"result": None
if task.result is None
else json_encoder.encode(task.result),
"error": task.error,
},
desc="insert_scheduled_task",
)
async def update_scheduled_task(
self,
id: str,
timestamp: int,
*,
status: Optional[TaskStatus] = None,
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
) -> bool:
"""Update a scheduled task in the DB with some new value(s).
Args:
id: id of the `ScheduledTask` to update
timestamp: new timestamp of the task
status: new status of the task
result: new result of the task
error: new error of the task
Returns: `False` if no matching row was found, `True` otherwise
"""
updatevalues: JsonDict = {"timestamp": timestamp}
if status is not None:
updatevalues["status"] = status
if result is not None:
updatevalues["result"] = json_encoder.encode(result)
if error is not None:
updatevalues["error"] = error
nb_rows = await self.db_pool.simple_update(
"scheduled_tasks",
{"id": id},
updatevalues,
desc="update_scheduled_task",
)
return nb_rows > 0
async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]:
"""Get a specific `ScheduledTask` from its id.
Args:
id: the id of the task to retrieve
Returns: the task if available, `None` otherwise
"""
row = await self.db_pool.simple_select_one(
table="scheduled_tasks",
keyvalues={"id": id},
retcols=(
"id",
"action",
"status",
"timestamp",
"resource_id",
"params",
"result",
"error",
),
allow_none=True,
desc="get_scheduled_task",
)
return TaskSchedulerWorkerStore._convert_row_to_task(row) if row else None
async def delete_scheduled_task(self, id: str) -> None:
"""Delete a specific task from its id.
Args:
id: the id of the task to delete
"""
await self.db_pool.simple_delete(
"scheduled_tasks",
keyvalues={"id": id},
desc="delete_scheduled_task",
)

View File

@ -242,6 +242,8 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
) -> None:
# Upsert retry time interval if retry_interval is zero (i.e. we're
# resetting it) or greater than the existing retry interval.
# We also upsert when the new retry interval is the same as the existing one,
# since it will be the case when `destination_max_retry_interval` is reached.
#
# WARNING: This is executed in autocommit, so we shouldn't add any more
# SQL calls in here (without being very careful).
@ -257,7 +259,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
WHERE
EXCLUDED.retry_interval = 0
OR destinations.retry_interval IS NULL
OR destinations.retry_interval < EXCLUDED.retry_interval
OR destinations.retry_interval <= EXCLUDED.retry_interval
"""
txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))

View File

@ -113,6 +113,7 @@ Changes in SCHEMA_VERSION = 79
Changes in SCHEMA_VERSION = 80
- The event_txn_id_device_id is always written to for new events.
- Add tables for the task scheduler.
"""

View File

@ -0,0 +1,30 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Mark the worker_read_write_locks* tables as UNLOGGED, to increase
-- performance. This means that we don't replicate the tables, and they get
-- truncated on a crash. This is acceptable as a) in those cases it's likely
-- that Synapse needs to be stopped/restarted anyway, and b) the locks are
-- considered best-effort anyway.
-- We need to remove and recreate the circular foreign key references, as
-- UNLOGGED tables can't reference normal tables.
ALTER TABLE worker_read_write_locks_mode DROP CONSTRAINT IF EXISTS worker_read_write_locks_mode_foreign;
ALTER TABLE worker_read_write_locks SET UNLOGGED;
ALTER TABLE worker_read_write_locks_mode SET UNLOGGED;
ALTER TABLE worker_read_write_locks_mode ADD CONSTRAINT worker_read_write_locks_mode_foreign
FOREIGN KEY (lock_name, lock_key, token) REFERENCES worker_read_write_locks(lock_name, lock_key, token) DEFERRABLE INITIALLY DEFERRED;

View File

@ -0,0 +1,28 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- cf ScheduledTask docstring for the meaning of the fields.
CREATE TABLE IF NOT EXISTS scheduled_tasks(
id TEXT PRIMARY KEY,
action TEXT NOT NULL,
status TEXT NOT NULL,
timestamp BIGINT NOT NULL,
resource_id TEXT,
params TEXT,
result TEXT,
error TEXT
);
CREATE INDEX IF NOT EXISTS scheduled_tasks_status ON scheduled_tasks(status);

View File

@ -15,6 +15,7 @@
import abc
import re
import string
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
@ -969,3 +970,41 @@ class UserProfile(TypedDict):
class RetentionPolicy:
min_lifetime: Optional[int] = None
max_lifetime: Optional[int] = None
class TaskStatus(str, Enum):
"""Status of a scheduled task"""
# Task is scheduled but not active
SCHEDULED = "scheduled"
# Task is active and probably running, and if not
# will be run on next scheduler loop run
ACTIVE = "active"
# Task has completed successfully
COMPLETE = "complete"
# Task is over and either returned a failed status, or had an exception
FAILED = "failed"
@attr.s(auto_attribs=True, frozen=True, slots=True)
class ScheduledTask:
"""Description of a scheduled task"""
# Id used to identify the task
id: str
# Name of the action to be run by this task
action: str
# Current status of this task
status: TaskStatus
# If the status is SCHEDULED then this represents when it should be launched,
# otherwise it represents the last time this task got a change of state.
# In milliseconds since epoch in system time timezone, usually UTC.
timestamp: int
# Optionally bind a task to some resource id for easy retrieval
resource_id: Optional[str]
# Optional parameters that will be passed to the function ran by the task
params: Optional[JsonMapping]
# Optional result that can be updated by the running task
result: Optional[JsonMapping]
# Optional error that should be assigned a value when the status is FAILED
error: Optional[str]

View File

@ -140,6 +140,20 @@ class ExpiringCache(Generic[KT, VT]):
return value.value
def invalidate(self, key: KT) -> None:
"""
Remove the given key from the cache.
"""
value = self._cache.pop(key, None)
if value:
if self.iterable:
self.metrics.inc_evictions(
EvictionReason.invalidation, len(value.value)
)
else:
self.metrics.inc_evictions(EvictionReason.invalidation)
def __contains__(self, key: KT) -> bool:
return key in self._cache
@ -193,6 +207,14 @@ class ExpiringCache(Generic[KT, VT]):
len(self),
)
def invalidate_all(self) -> None:
"""
Remove all items from the cache.
"""
keys = set(self._cache.keys())
for key in keys:
self._cache.pop(key)
def __len__(self) -> int:
if self.iterable:
return sum(len(entry.value) for entry in self._cache.values())

View File

@ -0,0 +1,364 @@
# Copyright 2023 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set, Tuple
from prometheus_client import Gauge
from twisted.python.failure import Failure
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import JsonMapping, ScheduledTask, TaskStatus
from synapse.util.stringutils import random_string
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
running_tasks_gauge = Gauge(
"synapse_scheduler_running_tasks",
"The number of concurrent running tasks handled by the TaskScheduler",
)
class TaskScheduler:
"""
This is a simple task sheduler aimed at resumable tasks: usually we use `run_in_background`
to launch a background task, or Twisted `deferLater` if we want to do so later on.
The problem with that is that the tasks will just stop and never be resumed if synapse
is stopped for whatever reason.
How this works:
- A function mapped to a named action should first be registered with `register_action`.
This function will be called when trying to resuming tasks after a synapse shutdown,
so this registration should happen when synapse is initialised, NOT right before scheduling
a task.
- A task can then be launched using this named action with `schedule_task`. A `params` dict
can be passed, and it will be available to the registered function when launched. This task
can be launch either now-ish, or later on by giving a `timestamp` parameter.
The function may call `update_task` at any time to update the `result` of the task,
and this can be used to resume the task at a specific point and/or to convey a result to
the code launching the task.
You can also specify the `result` (and/or an `error`) when returning from the function.
The reconciliation loop runs every 5 mns, so this is not a precise scheduler. When wanting
to launch now, the launch will still not happen before the next loop run.
Tasks will be run on the worker specified with `run_background_tasks_on` config,
or the main one by default.
There is a limit of 10 concurrent tasks, so tasks may be delayed if the pool is already
full. In this regard, please take great care that scheduled tasks can actually finished.
For now there is no mechanism to stop a running task if it is stuck.
"""
# Precision of the scheduler, evaluation of tasks to run will only happen
# every `SCHEDULE_INTERVAL_MS` ms
SCHEDULE_INTERVAL_MS = 1 * 60 * 1000 # 1mn
# Time before a complete or failed task is deleted from the DB
KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week
# Maximum number of tasks that can run at the same time
MAX_CONCURRENT_RUNNING_TASKS = 10
# Time from the last task update after which we will log a warning
LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs
def __init__(self, hs: "HomeServer"):
self._store = hs.get_datastores().main
self._clock = hs.get_clock()
self._running_tasks: Set[str] = set()
# A map between action names and their registered function
self._actions: Dict[
str,
Callable[
[ScheduledTask, bool],
Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]],
],
] = {}
self._run_background_tasks = hs.config.worker.run_background_tasks
if self._run_background_tasks:
self._clock.looping_call(
run_as_background_process,
TaskScheduler.SCHEDULE_INTERVAL_MS,
"handle_scheduled_tasks",
self._handle_scheduled_tasks,
)
def register_action(
self,
function: Callable[
[ScheduledTask, bool],
Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]],
],
action_name: str,
) -> None:
"""Register a function to be executed when an action is scheduled with
the specified action name.
Actions need to be registered as early as possible so that a resumed action
can find its matching function. It's usually better to NOT do that right before
calling `schedule_task` but rather in an `__init__` method.
Args:
function: The function to be executed for this action. The parameters
passed to the function when launched are the `ScheduledTask` being run,
and a `first_launch` boolean to signal if it's a resumed task or the first
launch of it. The function should return a tuple of new `status`, `result`
and `error` as specified in `ScheduledTask`.
action_name: The name of the action to be associated with the function
"""
self._actions[action_name] = function
async def schedule_task(
self,
action: str,
*,
resource_id: Optional[str] = None,
timestamp: Optional[int] = None,
params: Optional[JsonMapping] = None,
) -> str:
"""Schedule a new potentially resumable task. A function matching the specified
`action` should have been previously registered with `register_action`.
Args:
action: the name of a previously registered action
resource_id: a task can be associated with a resource id to facilitate
getting all tasks associated with a specific resource
timestamp: if `None`, the task will be launched as soon as possible, otherwise it
will be launch as soon as possible after the `timestamp` value.
Note that this scheduler is not meant to be precise, and the scheduling
could be delayed if too many tasks are already running
params: a set of parameters that can be easily accessed from inside the
executed function
Returns:
The id of the scheduled task
"""
if action not in self._actions:
raise Exception(
f"No function associated with action {action} of the scheduled task"
)
if timestamp is None or timestamp < self._clock.time_msec():
timestamp = self._clock.time_msec()
task = ScheduledTask(
random_string(16),
action,
TaskStatus.SCHEDULED,
timestamp,
resource_id,
params,
result=None,
error=None,
)
await self._store.insert_scheduled_task(task)
return task.id
async def update_task(
self,
id: str,
*,
timestamp: Optional[int] = None,
status: Optional[TaskStatus] = None,
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
) -> bool:
"""Update some task associated values. This is exposed publically so it can
be used inside task functions, mainly to update the result and be able to
resume a task at a specific step after a restart of synapse.
It can also be used to stage a task, by setting the `status` to `SCHEDULED` with
a new timestamp.
The `status` can only be set to `ACTIVE` or `SCHEDULED`, `COMPLETE` and `FAILED`
are terminal status and can only be set by returning it in the function.
Args:
id: the id of the task to update
timestamp: useful to schedule a new stage of the task at a later date
status: the new `TaskStatus` of the task
result: the new result of the task
error: the new error of the task
"""
if status == TaskStatus.COMPLETE or status == TaskStatus.FAILED:
raise Exception(
"update_task can't be called with a FAILED or COMPLETE status"
)
if timestamp is None:
timestamp = self._clock.time_msec()
return await self._store.update_scheduled_task(
id,
timestamp,
status=status,
result=result,
error=error,
)
async def get_task(self, id: str) -> Optional[ScheduledTask]:
"""Get a specific task description by id.
Args:
id: the id of the task to retrieve
Returns:
The task information or `None` if it doesn't exist or it has
already been removed because it's too old.
"""
return await self._store.get_scheduled_task(id)
async def get_tasks(
self,
*,
actions: Optional[List[str]] = None,
resource_id: Optional[str] = None,
statuses: Optional[List[TaskStatus]] = None,
max_timestamp: Optional[int] = None,
) -> List[ScheduledTask]:
"""Get a list of tasks. Returns all the tasks if no args is provided.
If an arg is `None` all tasks matching the other args will be selected.
If an arg is an empty list, the corresponding value of the task needs
to be `None` to be selected.
Args:
actions: Limit the returned tasks to those specific action names
resource_id: Limit the returned tasks to the specific resource id, if specified
statuses: Limit the returned tasks to the specific statuses
max_timestamp: Limit the returned tasks to the ones that have
a timestamp inferior to the specified one
Returns
A list of `ScheduledTask`, ordered by increasing timestamps
"""
return await self._store.get_scheduled_tasks(
actions=actions,
resource_id=resource_id,
statuses=statuses,
max_timestamp=max_timestamp,
)
async def delete_task(self, id: str) -> None:
"""Delete a task. Running tasks can't be deleted.
Can only be called from the worker handling the task scheduling.
Args:
id: id of the task to delete
"""
if self.task_is_running(id):
raise Exception(f"Task {id} is currently running and can't be deleted")
await self._store.delete_scheduled_task(id)
def task_is_running(self, id: str) -> bool:
"""Check if a task is currently running.
Can only be called from the worker handling the task scheduling.
Args:
id: id of the task to check
"""
assert self._run_background_tasks
return id in self._running_tasks
async def _handle_scheduled_tasks(self) -> None:
"""Main loop taking care of launching tasks and cleaning up old ones."""
await self._launch_scheduled_tasks()
await self._clean_scheduled_tasks()
async def _launch_scheduled_tasks(self) -> None:
"""Retrieve and launch scheduled tasks that should be running at that time."""
for task in await self.get_tasks(statuses=[TaskStatus.ACTIVE]):
if not self.task_is_running(task.id):
if (
len(self._running_tasks)
< TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS
):
await self._launch_task(task, first_launch=False)
else:
if (
self._clock.time_msec()
> task.timestamp + TaskScheduler.LAST_UPDATE_BEFORE_WARNING_MS
):
logger.warn(
f"Task {task.id} (action {task.action}) has seen no update for more than 24h and may be stuck"
)
for task in await self.get_tasks(
statuses=[TaskStatus.SCHEDULED], max_timestamp=self._clock.time_msec()
):
if (
not self.task_is_running(task.id)
and len(self._running_tasks)
< TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS
):
await self._launch_task(task, first_launch=True)
running_tasks_gauge.set(len(self._running_tasks))
async def _clean_scheduled_tasks(self) -> None:
"""Clean old complete or failed jobs to avoid clutter the DB."""
for task in await self._store.get_scheduled_tasks(
statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE]
):
# FAILED and COMPLETE tasks should never be running
assert not self.task_is_running(task.id)
if (
self._clock.time_msec()
> task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS
):
await self._store.delete_scheduled_task(task.id)
async def _launch_task(self, task: ScheduledTask, first_launch: bool) -> None:
"""Launch a scheduled task now.
Args:
task: the task to launch
first_launch: `True` if it's the first time is launched, `False` otherwise
"""
assert task.action in self._actions
function = self._actions[task.action]
async def wrapper() -> None:
try:
(status, result, error) = await function(task, first_launch)
except Exception:
f = Failure()
logger.error(
f"scheduled task {task.id} failed",
exc_info=(f.type, f.value, f.getTracebackObject()),
)
status = TaskStatus.FAILED
result = None
error = f.getErrorMessage()
await self._store.update_scheduled_task(
task.id,
self._clock.time_msec(),
status=status,
result=result,
error=error,
)
self._running_tasks.remove(task.id)
self._running_tasks.add(task.id)
await self.update_task(task.id, status=TaskStatus.ACTIVE)
description = f"{task.id}-{task.action}"
run_as_background_process(description, wrapper)

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from unittest.mock import Mock
from synapse.config import ConfigError
@ -167,6 +168,21 @@ class MSC3861OAuthDelegation(TestCase):
with self.assertRaises(ConfigError):
self.parse_config()
def test_user_consent_cannot_be_enabled(self) -> None:
tmpdir = self.mktemp()
os.mkdir(tmpdir)
self.config_dict["user_consent"] = {
"require_at_registration": True,
"version": "1",
"template_dir": tmpdir,
"server_notice_content": {
"msgtype": "m.text",
"body": "foo",
},
}
with self.assertRaises(ConfigError):
self.parse_config()
def test_password_config_cannot_be_enabled(self) -> None:
self.config_dict["password_config"] = {"enabled": True}
with self.assertRaises(ConfigError):
@ -255,3 +271,8 @@ class MSC3861OAuthDelegation(TestCase):
self.config_dict["session_lifetime"] = "24h"
with self.assertRaises(ConfigError):
self.parse_config()
def test_enable_3pid_changes_cannot_be_enabled(self) -> None:
self.config_dict["enable_3pid_changes"] = True
with self.assertRaises(ConfigError):
self.parse_config()

View File

@ -14,7 +14,7 @@
from http import HTTPStatus
from typing import Any, Dict, Union
from unittest.mock import ANY, Mock
from unittest.mock import ANY, AsyncMock, Mock
from urllib.parse import parse_qs
from signedjson.key import (
@ -340,6 +340,41 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
get_awaitable_result(self.auth.is_server_admin(requester)), False
)
def test_active_user_admin_impersonation(self) -> None:
"""The handler should return a requester with normal user rights
and an user ID matching the one specified in query param `user_id`"""
self.http_client.request = simple_async_mock(
return_value=FakeResponse.json(
code=200,
payload={
"active": True,
"sub": SUBJECT,
"scope": " ".join([SYNAPSE_ADMIN_SCOPE, MATRIX_USER_SCOPE]),
"username": USERNAME,
},
)
)
request = Mock(args={})
request.args[b"access_token"] = [b"mockAccessToken"]
impersonated_user_id = f"@{USERNAME}:{SERVER_NAME}"
request.args[b"_oidc_admin_impersonate_user_id"] = [
impersonated_user_id.encode("ascii")
]
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
requester = self.get_success(self.auth.get_user_by_req(request))
self.http_client.get_json.assert_called_once_with(WELL_KNOWN)
self.http_client.request.assert_called_once_with(
method="POST", uri=INTROSPECTION_ENDPOINT, data=ANY, headers=ANY
)
self._assertParams()
self.assertEqual(requester.user.to_string(), impersonated_user_id)
self.assertEqual(requester.is_guest, False)
self.assertEqual(requester.device_id, None)
self.assertEqual(
get_awaitable_result(self.auth.is_server_admin(requester)), False
)
def test_active_user_with_device(self) -> None:
"""The handler should return a requester with normal user rights and a device ID."""
@ -553,6 +588,38 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
)
self.assertEqual(self.http_client.request.call_count, 2)
def test_revocation_endpoint(self) -> None:
# mock introspection response and then admin verification response
self.http_client.request = AsyncMock(
side_effect=[
FakeResponse.json(
code=200, payload={"active": True, "jti": "open_sesame"}
),
FakeResponse.json(
code=200,
payload={
"active": True,
"sub": SUBJECT,
"scope": " ".join([SYNAPSE_ADMIN_SCOPE, MATRIX_USER_SCOPE]),
"username": USERNAME,
},
),
]
)
# cache a token to delete
introspection_token = self.get_success(
self.auth._introspect_token("open_sesame") # type: ignore[attr-defined]
)
self.assertEqual(self.auth._token_cache.get("open_sesame"), introspection_token) # type: ignore[attr-defined]
# delete the revoked token
introspection_token_id = "open_sesame"
url = f"/_synapse/admin/v1/OIDC_token_revocation/{introspection_token_id}"
channel = self.make_request("DELETE", url, access_token="mockAccessToken")
self.assertEqual(channel.code, 200)
self.assertEqual(self.auth._token_cache.get("open_sesame"), None) # type: ignore[attr-defined]
def make_device_keys(self, user_id: str, device_id: str) -> JsonDict:
# We only generate a master key to simplify the test.
master_signing_key = generate_signing_key(device_id)

View File

@ -514,6 +514,9 @@ class PresenceTimeoutTestCase(unittest.TestCase):
class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
user_id = "@test:server"
user_id_obj = UserID.from_string(user_id)
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.presence_handler = hs.get_presence_handler()
self.clock = hs.get_clock()
@ -523,12 +526,11 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
we time out their syncing users presence.
"""
process_id = "1"
user_id = "@test:server"
# Notify handler that a user is now syncing.
self.get_success(
self.presence_handler.update_external_syncs_row(
process_id, user_id, True, self.clock.time_msec()
process_id, self.user_id, True, self.clock.time_msec()
)
)
@ -536,48 +538,37 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
# stopped syncing that their presence state doesn't get timed out.
self.reactor.advance(EXTERNAL_PROCESS_EXPIRY / 2)
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
self.assertEqual(state.state, PresenceState.ONLINE)
# Check that if the external process timeout fires, then the syncing
# user gets timed out
self.reactor.advance(EXTERNAL_PROCESS_EXPIRY)
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
self.assertEqual(state.state, PresenceState.OFFLINE)
def test_user_goes_offline_by_timeout_status_msg_remain(self) -> None:
"""Test that if a user doesn't update the records for a while
users presence goes `OFFLINE` because of timeout and `status_msg` remains.
"""
user_id = "@test:server"
status_msg = "I'm here!"
# Mark user as online
self._set_presencestate_with_status_msg(
user_id, PresenceState.ONLINE, status_msg
)
self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
# Check that if we wait a while without telling the handler the user has
# stopped syncing that their presence state doesn't get timed out.
self.reactor.advance(SYNC_ONLINE_TIMEOUT / 2)
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
self.assertEqual(state.state, PresenceState.ONLINE)
self.assertEqual(state.status_msg, status_msg)
# Check that if the timeout fires, then the syncing user gets timed out
self.reactor.advance(SYNC_ONLINE_TIMEOUT)
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
# status_msg should remain even after going offline
self.assertEqual(state.state, PresenceState.OFFLINE)
self.assertEqual(state.status_msg, status_msg)
@ -586,24 +577,19 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
"""Test that if a user change presence manually to `OFFLINE`
and no status is set, that `status_msg` is `None`.
"""
user_id = "@test:server"
status_msg = "I'm here!"
# Mark user as online
self._set_presencestate_with_status_msg(
user_id, PresenceState.ONLINE, status_msg
)
self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
# Mark user as offline
self.get_success(
self.presence_handler.set_state(
UserID.from_string(user_id), {"presence": PresenceState.OFFLINE}
self.user_id_obj, {"presence": PresenceState.OFFLINE}
)
)
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
self.assertEqual(state.state, PresenceState.OFFLINE)
self.assertEqual(state.status_msg, None)
@ -611,41 +597,31 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
"""Test that if a user change presence manually to `OFFLINE`
and a status is set, that `status_msg` appears.
"""
user_id = "@test:server"
status_msg = "I'm here!"
# Mark user as online
self._set_presencestate_with_status_msg(
user_id, PresenceState.ONLINE, status_msg
)
self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
# Mark user as offline
self._set_presencestate_with_status_msg(
user_id, PresenceState.OFFLINE, "And now here."
)
self._set_presencestate_with_status_msg(PresenceState.OFFLINE, "And now here.")
def test_user_reset_online_with_no_status(self) -> None:
"""Test that if a user set again the presence manually
and no status is set, that `status_msg` is `None`.
"""
user_id = "@test:server"
status_msg = "I'm here!"
# Mark user as online
self._set_presencestate_with_status_msg(
user_id, PresenceState.ONLINE, status_msg
)
self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
# Mark user as online again
self.get_success(
self.presence_handler.set_state(
UserID.from_string(user_id), {"presence": PresenceState.ONLINE}
self.user_id_obj, {"presence": PresenceState.ONLINE}
)
)
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
# status_msg should remain even after going offline
self.assertEqual(state.state, PresenceState.ONLINE)
self.assertEqual(state.status_msg, None)
@ -654,33 +630,27 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
"""Test that if a user set again the presence manually
and status is `None`, that `status_msg` is `None`.
"""
user_id = "@test:server"
status_msg = "I'm here!"
# Mark user as online
self._set_presencestate_with_status_msg(
user_id, PresenceState.ONLINE, status_msg
)
self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
# Mark user as online and `status_msg = None`
self._set_presencestate_with_status_msg(user_id, PresenceState.ONLINE, None)
self._set_presencestate_with_status_msg(PresenceState.ONLINE, None)
def test_set_presence_from_syncing_not_set(self) -> None:
"""Test that presence is not set by syncing if affect_presence is false"""
user_id = "@test:server"
status_msg = "I'm here!"
self._set_presencestate_with_status_msg(
user_id, PresenceState.UNAVAILABLE, status_msg
)
self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg)
self.get_success(
self.presence_handler.user_syncing(user_id, False, PresenceState.ONLINE)
self.presence_handler.user_syncing(
self.user_id, False, PresenceState.ONLINE
)
)
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
# we should still be unavailable
self.assertEqual(state.state, PresenceState.UNAVAILABLE)
# and status message should still be the same
@ -688,50 +658,34 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
def test_set_presence_from_syncing_is_set(self) -> None:
"""Test that presence is set by syncing if affect_presence is true"""
user_id = "@test:server"
status_msg = "I'm here!"
self._set_presencestate_with_status_msg(
user_id, PresenceState.UNAVAILABLE, status_msg
)
self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg)
self.get_success(
self.presence_handler.user_syncing(user_id, True, PresenceState.ONLINE)
self.presence_handler.user_syncing(self.user_id, True, PresenceState.ONLINE)
)
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
# we should now be online
self.assertEqual(state.state, PresenceState.ONLINE)
def test_set_presence_from_syncing_keeps_status(self) -> None:
"""Test that presence set by syncing retains status message"""
user_id = "@test:server"
status_msg = "I'm here!"
self._set_presencestate_with_status_msg(
user_id, PresenceState.UNAVAILABLE, status_msg
)
self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg)
self.get_success(
self.presence_handler.user_syncing(user_id, True, PresenceState.ONLINE)
self.presence_handler.user_syncing(self.user_id, True, PresenceState.ONLINE)
)
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
# our status message should be the same as it was before
self.assertEqual(state.status_msg, status_msg)
@parameterized.expand([(False,), (True,)])
@unittest.override_config(
{
"experimental_features": {
"msc3026_enabled": True,
},
}
)
@unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
def test_set_presence_from_syncing_keeps_busy(
self, test_with_workers: bool
) -> None:
@ -741,7 +695,6 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
test_with_workers: If True, check the presence state of the user by calling
/sync against a worker, rather than the main process.
"""
user_id = "@test:server"
status_msg = "I'm busy!"
# By default, we call /sync against the main process.
@ -755,44 +708,39 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
)
# Set presence to BUSY
self._set_presencestate_with_status_msg(user_id, PresenceState.BUSY, status_msg)
self._set_presencestate_with_status_msg(PresenceState.BUSY, status_msg)
# Perform a sync with a presence state other than busy. This should NOT change
# our presence status; we only change from busy if we explicitly set it via
# /presence/*.
self.get_success(
worker_to_sync_against.get_presence_handler().user_syncing(
user_id, True, PresenceState.ONLINE
self.user_id, True, PresenceState.ONLINE
)
)
# Check against the main process that the user's presence did not change.
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
# we should still be busy
self.assertEqual(state.state, PresenceState.BUSY)
def _set_presencestate_with_status_msg(
self, user_id: str, state: str, status_msg: Optional[str]
self, state: str, status_msg: Optional[str]
) -> None:
"""Set a PresenceState and status_msg and check the result.
Args:
user_id: User for that the status is to be set.
state: The new PresenceState.
status_msg: Status message that is to be set.
"""
self.get_success(
self.presence_handler.set_state(
UserID.from_string(user_id),
self.user_id_obj,
{"presence": state, "status_msg": status_msg},
)
)
new_state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
new_state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
self.assertEqual(new_state.state, state)
self.assertEqual(new_state.status_msg, status_msg)
@ -952,9 +900,6 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
self.assertEqual(upto_token, now_token)
self.assertFalse(limited)
expected_rows = [
(2, ("dest3", "@user3:test")),
]
self.assertCountEqual(rows, [])
prev_token = self.queue.get_current_token(self.instance_name)

View File

@ -446,6 +446,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
self.assertIsNone(profile)
def test_handle_user_deactivated_support_user(self) -> None:
"""Ensure a support user doesn't get added to the user directory after deactivation."""
s_user_id = "@support:test"
self.get_success(
self.store.register_user(
@ -453,14 +454,16 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
)
)
mock_remove_from_user_dir = Mock(return_value=make_awaitable(None))
with patch.object(
self.store, "remove_from_user_dir", mock_remove_from_user_dir
):
self.get_success(self.handler.handle_local_user_deactivated(s_user_id))
# BUG: the correct spelling is assert_not_called, but that makes the test fail
# and it's not clear that this is actually the behaviour we want.
mock_remove_from_user_dir.not_called()
# The profile should not be in the directory.
profile = self.get_success(self.store._get_user_in_directory(s_user_id))
self.assertIsNone(profile)
# Remove the user from the directory.
self.get_success(self.handler.handle_local_user_deactivated(s_user_id))
# The profile should still not be in the user directory.
profile = self.get_success(self.store._get_user_in_directory(s_user_id))
self.assertIsNone(profile)
def test_handle_user_deactivated_regular_user(self) -> None:
r_user_id = "@regular:test"

View File

@ -0,0 +1,62 @@
# Copyright 2023 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict
import synapse.rest.admin._base
from tests.replication._base import BaseMultiWorkerStreamTestCase
class IntrospectionTokenCacheInvalidationTestCase(BaseMultiWorkerStreamTestCase):
servlets = [synapse.rest.admin.register_servlets]
def default_config(self) -> Dict[str, Any]:
config = super().default_config()
config["disable_registration"] = True
config["experimental_features"] = {
"msc3861": {
"enabled": True,
"issuer": "some_dude",
"client_id": "ID",
"client_auth_method": "client_secret_post",
"client_secret": "secret",
}
}
return config
def test_stream_introspection_token_invalidation(self) -> None:
worker_hs = self.make_worker_hs("synapse.app.generic_worker")
auth = worker_hs.get_auth()
store = self.hs.get_datastores().main
# add a token to the cache on the worker
auth._token_cache["open_sesame"] = "intro_token" # type: ignore[attr-defined]
# stream the invalidation from the master
self.get_success(
store.stream_introspection_token_invalidation(("open_sesame",))
)
# check that the cache on the worker was invalidated
self.assertEqual(auth._token_cache.get("open_sesame"), None) # type: ignore[attr-defined]
# test invalidating whole cache
for i in range(0, 5):
auth._token_cache[f"open_sesame_{i}"] = f"intro_token_{i}" # type: ignore[attr-defined]
self.assertEqual(len(auth._token_cache), 5) # type: ignore[attr-defined]
self.get_success(store.stream_introspection_token_invalidation((None,)))
self.assertEqual(len(auth._token_cache), 0) # type: ignore[attr-defined]

View File

@ -879,6 +879,44 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self._order_test([self.admin_user, user1, user2], "creation_ts", "f")
self._order_test([user2, user1, self.admin_user], "creation_ts", "b")
def test_filter_admins(self) -> None:
"""
Tests whether the various values of the query parameter `admins` lead to the
expected result set.
"""
# Register an additional non admin user
self.register_user("user", "pass", admin=False)
# Query all users
channel = self.make_request(
"GET",
f"{self.url}",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, channel.result)
self.assertEqual(2, channel.json_body["total"])
# Query only admin users
channel = self.make_request(
"GET",
f"{self.url}?admins=true",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, channel.result)
self.assertEqual(1, channel.json_body["total"])
self.assertEqual(1, channel.json_body["users"][0]["admin"])
# Query only non admin users
channel = self.make_request(
"GET",
f"{self.url}?admins=false",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, channel.result)
self.assertEqual(1, channel.json_body["total"])
self.assertFalse(channel.json_body["users"][0]["admin"])
@override_config(
{
"experimental_features": {

View File

@ -1000,8 +1000,6 @@ def setup_test_homeserver(
hs.tls_server_context_factory = Mock()
hs.setup()
if homeserver_to_use == TestHomeServer:
hs.setup_background_tasks()
if isinstance(db_engine, PostgresEngine):
database_pool = hs.get_datastores().databases[0]

View File

@ -13,7 +13,19 @@
# limitations under the License.
import datetime
from typing import Dict, List, Tuple, Union, cast
from typing import (
Collection,
Dict,
FrozenSet,
Iterable,
List,
Mapping,
Set,
Tuple,
TypeVar,
Union,
cast,
)
import attr
from parameterized import parameterized
@ -38,6 +50,138 @@ from synapse.util import Clock, json_encoder
import tests.unittest
import tests.utils
# The silly auth graph we use to test the auth difference algorithm,
# where the top are the most recent events.
#
# A B
# \ /
# D E
# \ |
# ` F C
# | /|
# G ´ |
# | \ |
# H I
# | |
# K J
AUTH_GRAPH: Dict[str, List[str]] = {
"a": ["e"],
"b": ["e"],
"c": ["g", "i"],
"d": ["f"],
"e": ["f"],
"f": ["g"],
"g": ["h", "i"],
"h": ["k"],
"i": ["j"],
"k": [],
"j": [],
}
DEPTH_GRAPH = {
"a": 7,
"b": 7,
"c": 4,
"d": 6,
"e": 6,
"f": 5,
"g": 3,
"h": 2,
"i": 2,
"k": 1,
"j": 1,
}
T = TypeVar("T")
def get_all_topologically_sorted_orders(
nodes: Iterable[T],
graph: Mapping[T, Collection[T]],
) -> List[List[T]]:
"""Given a set of nodes and a graph, return all possible topological
orderings.
"""
# This is implemented by Kahn's algorithm, and forking execution each time
# we have a choice over which node to consider next.
degree_map = {node: 0 for node in nodes}
reverse_graph: Dict[T, Set[T]] = {}
for node, edges in graph.items():
if node not in degree_map:
continue
for edge in set(edges):
if edge in degree_map:
degree_map[node] += 1
reverse_graph.setdefault(edge, set()).add(node)
reverse_graph.setdefault(node, set())
zero_degree = [node for node, degree in degree_map.items() if degree == 0]
return _get_all_topologically_sorted_orders_inner(
reverse_graph, zero_degree, degree_map
)
def _get_all_topologically_sorted_orders_inner(
reverse_graph: Dict[T, Set[T]],
zero_degree: List[T],
degree_map: Dict[T, int],
) -> List[List[T]]:
new_paths = []
# Rather than only choosing *one* item from the list of nodes with zero
# degree, we "fork" execution and run the algorithm for each node in the
# zero degree.
for node in zero_degree:
new_degree_map = degree_map.copy()
new_zero_degree = zero_degree.copy()
new_zero_degree.remove(node)
for edge in reverse_graph.get(node, []):
if edge in new_degree_map:
new_degree_map[edge] -= 1
if new_degree_map[edge] == 0:
new_zero_degree.append(edge)
paths = _get_all_topologically_sorted_orders_inner(
reverse_graph, new_zero_degree, new_degree_map
)
for path in paths:
path.insert(0, node)
new_paths.extend(paths)
if not new_paths:
return [[]]
return new_paths
def get_all_topologically_consistent_subsets(
nodes: Iterable[T],
graph: Mapping[T, Collection[T]],
) -> Set[FrozenSet[T]]:
"""Get all subsets of the graph where if node N is in the subgraph, then all
nodes that can reach that node (i.e. for all X there exists a path X -> N)
are in the subgraph.
"""
all_topological_orderings = get_all_topologically_sorted_orders(nodes, graph)
graph_subsets = set()
for ordering in all_topological_orderings:
ordering.reverse()
for idx in range(len(ordering)):
graph_subsets.add(frozenset(ordering[:idx]))
return graph_subsets
@attr.s(auto_attribs=True, frozen=True, slots=True)
class _BackfillSetupInfo:
@ -172,49 +316,6 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
def _setup_auth_chain(self, use_chain_cover_index: bool) -> str:
room_id = "@ROOM:local"
# The silly auth graph we use to test the auth difference algorithm,
# where the top are the most recent events.
#
# A B
# \ /
# D E
# \ |
# ` F C
# | /|
# G ´ |
# | \ |
# H I
# | |
# K J
auth_graph: Dict[str, List[str]] = {
"a": ["e"],
"b": ["e"],
"c": ["g", "i"],
"d": ["f"],
"e": ["f"],
"f": ["g"],
"g": ["h", "i"],
"h": ["k"],
"i": ["j"],
"k": [],
"j": [],
}
depth_map = {
"a": 7,
"b": 7,
"c": 4,
"d": 6,
"e": 6,
"f": 5,
"g": 3,
"h": 2,
"i": 2,
"k": 1,
"j": 1,
}
# Mark the room as maybe having a cover index.
def store_room(txn: LoggingTransaction) -> None:
@ -238,9 +339,9 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
def insert_event(txn: LoggingTransaction) -> None:
stream_ordering = 0
for event_id in auth_graph:
for event_id in AUTH_GRAPH:
stream_ordering += 1
depth = depth_map[event_id]
depth = DEPTH_GRAPH[event_id]
self.store.db_pool.simple_insert_txn(
txn,
@ -260,8 +361,8 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
self.persist_events._persist_event_auth_chain_txn(
txn,
[
cast(EventBase, FakeEvent(event_id, room_id, auth_graph[event_id]))
for event_id in auth_graph
cast(EventBase, FakeEvent(event_id, room_id, AUTH_GRAPH[event_id]))
for event_id in AUTH_GRAPH
],
)
@ -344,7 +445,51 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
room_id = self._setup_auth_chain(use_chain_cover_index)
# Now actually test that various combinations give the right result:
self.assert_auth_diff_is_expected(room_id)
@parameterized.expand(
[
[graph_subset]
for graph_subset in get_all_topologically_consistent_subsets(
AUTH_GRAPH, AUTH_GRAPH
)
]
)
def test_auth_difference_partial(self, graph_subset: Collection[str]) -> None:
"""Test that if we only have a chain cover index on a partial subset of
the room we still get the correct auth chain difference.
We do this by removing the chain cover index for every valid subset of the
graph.
"""
room_id = self._setup_auth_chain(True)
for event_id in graph_subset:
# Remove chain cover from that event.
self.get_success(
self.store.db_pool.simple_delete(
table="event_auth_chains",
keyvalues={"event_id": event_id},
desc="test_auth_difference_partial_remove",
)
)
self.get_success(
self.store.db_pool.simple_insert(
table="event_auth_chain_to_calculate",
values={
"event_id": event_id,
"room_id": room_id,
"type": "",
"state_key": "",
},
desc="test_auth_difference_partial_remove",
)
)
self.assert_auth_diff_is_expected(room_id)
def assert_auth_diff_is_expected(self, room_id: str) -> None:
"""Assert the auth chain difference returns the correct answers."""
difference = self.get_success(
self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}])
)

View File

@ -108,3 +108,54 @@ class RetryLimiterTestCase(HomeserverTestCase):
new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
self.assertIsNone(new_timings)
def test_max_retry_interval(self) -> None:
"""Test that `destination_max_retry_interval` setting works as expected"""
store = self.hs.get_datastores().main
destination_max_retry_interval_ms = (
self.hs.config.federation.destination_max_retry_interval_ms
)
self.get_success(get_retry_limiter("test_dest", self.clock, store))
self.pump(1)
failure_ts = self.clock.time_msec()
# Simulate reaching destination_max_retry_interval
self.get_success(
store.set_destination_retry_timings(
"test_dest",
failure_ts=failure_ts,
retry_last_ts=failure_ts,
retry_interval=destination_max_retry_interval_ms,
)
)
# Check it fails
self.get_failure(
get_retry_limiter("test_dest", self.clock, store), NotRetryingDestination
)
# Get past retry_interval and we can try again, and still throw an error to continue the backoff
self.reactor.advance(destination_max_retry_interval_ms / 1000 + 1)
limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store))
self.pump(1)
try:
with limiter:
self.pump(1)
raise AssertionError("argh")
except AssertionError:
pass
self.pump()
# retry_interval does not increase and stays at destination_max_retry_interval_ms
new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
assert new_timings is not None
self.assertEqual(new_timings.retry_interval, destination_max_retry_interval_ms)
# Check it fails
self.get_failure(
get_retry_limiter("test_dest", self.clock, store), NotRetryingDestination
)

View File

@ -0,0 +1,186 @@
# Copyright 2023 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional, Tuple
from twisted.internet.task import deferLater
from twisted.test.proto_helpers import MemoryReactor
from synapse.server import HomeServer
from synapse.types import JsonMapping, ScheduledTask, TaskStatus
from synapse.util import Clock
from synapse.util.task_scheduler import TaskScheduler
from tests import unittest
class TestTaskScheduler(unittest.HomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.task_scheduler = hs.get_task_scheduler()
self.task_scheduler.register_action(self._test_task, "_test_task")
self.task_scheduler.register_action(self._sleeping_task, "_sleeping_task")
self.task_scheduler.register_action(self._raising_task, "_raising_task")
self.task_scheduler.register_action(self._resumable_task, "_resumable_task")
async def _test_task(
self, task: ScheduledTask, first_launch: bool
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
# This test task will copy the parameters to the result
result = None
if task.params:
result = task.params
return (TaskStatus.COMPLETE, result, None)
def test_schedule_task(self) -> None:
"""Schedule a task in the future with some parameters to be copied as a result and check it executed correctly.
Also check that it get removed after `KEEP_TASKS_FOR_MS`."""
timestamp = self.clock.time_msec() + 30 * 1000
task_id = self.get_success(
self.task_scheduler.schedule_task(
"_test_task",
timestamp=timestamp,
params={"val": 1},
)
)
task = self.get_success(self.task_scheduler.get_task(task_id))
assert task is not None
self.assertEqual(task.status, TaskStatus.SCHEDULED)
self.assertIsNone(task.result)
# The timestamp being 30s after now the task should been executed
# after the first scheduling loop is run
self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS / 1000)
task = self.get_success(self.task_scheduler.get_task(task_id))
assert task is not None
self.assertEqual(task.status, TaskStatus.COMPLETE)
assert task.result is not None
# The passed parameter should have been copied to the result
self.assertTrue(task.result.get("val") == 1)
# Let's wait for the complete task to be deleted and hence unavailable
self.reactor.advance((TaskScheduler.KEEP_TASKS_FOR_MS / 1000) + 1)
task = self.get_success(self.task_scheduler.get_task(task_id))
self.assertIsNone(task)
async def _sleeping_task(
self, task: ScheduledTask, first_launch: bool
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
# Sleep for a second
await deferLater(self.reactor, 1, lambda: None)
return TaskStatus.COMPLETE, None, None
def test_schedule_lot_of_tasks(self) -> None:
"""Schedule more than `TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS` tasks and check the behavior."""
timestamp = self.clock.time_msec() + 30 * 1000
task_ids = []
for i in range(TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS + 1):
task_ids.append(
self.get_success(
self.task_scheduler.schedule_task(
"_sleeping_task",
timestamp=timestamp,
params={"val": i},
)
)
)
# The timestamp being 30s after now the task should been executed
# after the first scheduling loop is run
self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
# This is to give the time to the sleeping tasks to finish
self.reactor.advance(1)
# Check that only MAX_CONCURRENT_RUNNING_TASKS tasks has run and that one
# is still scheduled.
tasks = [
self.get_success(self.task_scheduler.get_task(task_id))
for task_id in task_ids
]
self.assertEquals(
len(
[t for t in tasks if t is not None and t.status == TaskStatus.COMPLETE]
),
TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS,
)
scheduled_tasks = [
t for t in tasks if t is not None and t.status == TaskStatus.SCHEDULED
]
self.assertEquals(len(scheduled_tasks), 1)
self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
self.reactor.advance(1)
# Check that the last task has been properly executed after the next scheduler loop run
prev_scheduled_task = self.get_success(
self.task_scheduler.get_task(scheduled_tasks[0].id)
)
assert prev_scheduled_task is not None
self.assertEquals(
prev_scheduled_task.status,
TaskStatus.COMPLETE,
)
async def _raising_task(
self, task: ScheduledTask, first_launch: bool
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
raise Exception("raising")
def test_schedule_raising_task(self) -> None:
"""Schedule a task raising an exception and check it runs to failure and report exception content."""
task_id = self.get_success(self.task_scheduler.schedule_task("_raising_task"))
self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
task = self.get_success(self.task_scheduler.get_task(task_id))
assert task is not None
self.assertEqual(task.status, TaskStatus.FAILED)
self.assertEqual(task.error, "raising")
async def _resumable_task(
self, task: ScheduledTask, first_launch: bool
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
if task.result and "in_progress" in task.result:
return TaskStatus.COMPLETE, {"success": True}, None
else:
await self.task_scheduler.update_task(task.id, result={"in_progress": True})
# Await forever to simulate an aborted task because of a restart
await deferLater(self.reactor, 2**16, lambda: None)
# This should never been called
return TaskStatus.ACTIVE, None, None
def test_schedule_resumable_task(self) -> None:
"""Schedule a resumable task and check that it gets properly resumed and complete after simulating a synapse restart."""
task_id = self.get_success(self.task_scheduler.schedule_task("_resumable_task"))
self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
task = self.get_success(self.task_scheduler.get_task(task_id))
assert task is not None
self.assertEqual(task.status, TaskStatus.ACTIVE)
# Simulate a synapse restart by emptying the list of running tasks
self.task_scheduler._running_tasks = set()
self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
task = self.get_success(self.task_scheduler.get_task(task_id))
assert task is not None
self.assertEqual(task.status, TaskStatus.COMPLETE)
assert task.result is not None
self.assertTrue(task.result.get("success"))